Giter VIP home page Giter VIP logo

kafka's Introduction

kafka's People

Contributors

bmatheny avatar cburroughs avatar cdegroot avatar javasoze avatar jdamick avatar jghoman avatar jkreps avatar junrao avatar kafka-dev avatar nehanarkhede avatar noj avatar quipo avatar rsumbaly avatar spmallette avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka's Issues

Kafka size per message

Hi all, we want to monitor the storage used in kafka broker per topic if possible per message, in order to do that we are creating some dashboards using grafana (influxdb and prometheus), but we want to me sure the metrics we are using are showing the right values so we want to validate if the storage bytes used per kafka message matches with the values displayed in the dashboards, do you know how to check how many storage bytes are used per message? I know there are some kafka wrappers that use extra bytes we would like to know if there is a way to know the total bytes used per message and the total storage used per topic, do you know if that's possible in kafka?

Kafka transaction can't catch RecordTooLargeException

Issue Summary:
When i send message with transaction and the message size exceeds the maximum limit, it can't catche RecordTooLargeException!And Kafka Server does receive messages.

What is the Business Impact you are facing?

Observed Behavior:
When i send message over size limit ,it has no any Exception,and there are no any message Kafka Server received.I expect an exception log to be printed.but nothing.

Please include screenshots. Yes
My code is the same as the example of the description on the official website;
version:3.1.0

image

__consumer_offsets topic with very big partitions

I am using Kafka 2.0.0
There are some partitions of the __consumer_offsets topic that are 500-700 GB and more than 5000-7000 segments. These segments are older than 2-3 months.
There aren't errors in the logs and that topic is COMPACT as default.

What could be the problem?
Maybe a config or a consumer problem? or maybe a bug of kafka 2.0.0?
What checks could I do?

My settings:

log.cleaner.enable=true
log.cleanup.policy = [delete]
log.retention.bytes = -1
log.segment.bytes = 268435456
log.retention.hours = 72
log.retention.check.interval.ms = 300000
...
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 10080
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 3
offsets.topic.segment.bytes = 104857600

KafkaKAFKA-3147 Memory records is not writable in MirrorMaker

Hi,
I am using uReplicator with kafka client version 0.9.0.1 in production and faced "Memory records is not writable in MirrorMaker".

I understood that this issue is resolved by providing the solution for issue KAFKA-3147 and released a new version of kafka client jar "0.9.0-kafka-2.0.2".

I have updated uReplicator with kafka client jar version "0.9.0-kafka-2.0.2" and still i am facing the issue.

Could you please help me in resolving this issue.

Exception Details:

java.lang.IllegalStateException: Memory records is not writable
org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93)
org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69)
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168)
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435)
kafka.mirrormaker.MirrorMakerWorker$MirrorMakerProducer.send(MirrorMakerWorker.scala:357)
kafka.mirrormaker.MirrorMakerWorker$MirrorMakerThread.run(MirrorMakerWorker.scala:298)

Change log level for Kafka Authorizer in log4j causes RequestHandlerPool and NetworkHandlerPool to crash.

I have come across this issue in my apache kafka environment where if I turn ON Kafka Authorizer in log4j and then work with logging level INFO or DEBUG this causes RequestHandlerPool and NetworkHandlerPool to crash. Our monitors show no activity on these pool handlers. As soon as Authorizer logging is turned OFF these handlers come back up and monitors start to show activity.
Please let me know if you need any further information I will be more than happy to provide.

how to delete consumer group name from 0.10.0.1

The kafka version is 0.10.0.1, has two way to save offset are zookeeper and kafka, depend from your kafka client. If you are choice connect kafka broker, The actually, consumer group name save in default kafka topic '__consumer_offsets', at least I think so. delete or reset offset from zookeeper is easy, but delete from '__consumer_offsets' is difficult. how to delete it from '__consumer_offsets' ?

Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -7843832

org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 31
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -7843832

Getting this issue while deserializing avro msg with Generic record. while msg contain some special character. for
one type of schema. ex-

{
"op": "create",
"id": "test-l广告oc-378621",
"provenance": {
"sourceDatasetId": "TEST"
},
"properties": {
"mg_display_label": "LOCA广告广告TION",
"mg_entity_type": "location"
},
"label": "location"
}

but same process for another schema not throwing any exception.
ex-
{
"op": "crea广告te",
"id": "test-ind-lc广告-372386213",
"provenance": {
"sourceDatasetId": "TEST"
},
"properties": {
"mg_display_label": "hasLocation",
"type": "home"
},
"src": {
"id": "test-ind-378621",
"label": "individual"
},
"dst": {
"id": "test-loc-378621",
"label": "location"
},
"label": "hasLocation"
}

what could be the reason,
Can anybody help me on this. stocked here from last couple of days.

MirrorMaker2 failed to sync to target kafka

when i used mirrormaker2 to sync, if the topic count is small, no issues,
but if topic count is 200+, connect.log shows
Request cannot contain more than 5 topics.
[2023-04-20 10:05:42,382] WARN [MirrorSourceConnector|worker] Could not create topic S17_inner_transcode_response. (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
java.util.concurrent.CompletionException: org.apache.kafka.common.errors.InvalidRequestException: Request cannot contain more than 5 topics.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaCompleteExceptionally(KafkaCompletableFuture.java:49)
at org.apache.kafka.common.internals.KafkaFutureImpl.completeExceptionally(KafkaFutureImpl.java:130)
at org.apache.kafka.clients.admin.KafkaAdminClient$1.handleResponse(KafkaAdminClient.java:1616)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1255)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1408)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1331)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.errors.InvalidRequestException: Request cannot contain more than 5 topics.
[2023-04-20 10:05:42,382] WARN [MirrorSourceConnector|worker] Could not create topic N9M_0_MEDIASTREAMMODEL_MEDIATASKSTART. (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)

Distribute topics between consumers

I have many topics which can not have more than 1 partition and many consumers with the same consumer group. Apparently only one consumer reads messages from all topics, other consumers just do nothing.

How kafka broker can weight consumer load and distribute topics between them evenly?

kafka_2.13:2.6.1 throws NoSuchMethodError when running against scala-sdk-2.13.4

The below snippet runs without issues with scala-sdk-2.13.3, but throws NoSuchMethodError for scala-sdk-2.13.4:

    val authorize = new AclAuthorizer()
    val acls = authorize.acls(AclBindingFilter.ANY)

The error is:

Exception in thread "main" java.lang.NoSuchMethodError: 'scala.collection.immutable.RedBlackTree$Tree scala.collection.immutable.TreeMap.scala$collection$immutable$TreeMap$$tree()'
	at kafka.security.authorizer.AclAuthorizer.acls(AclAuthorizer.scala:293)

Don't use /bin/bash in scripts

On small container installations (such as alpine), /bin/bash is not installed. It appears the scripts in the /bin directory would mostly work with /bin/sh. Please use a simpler shell for shell scripts so that they are more portable.

MM2 doesn't work as expected -- created a lot of duplicated topics

Hi,
I am new to mirrormaker2, and i did some test but it seems mm2 doesn't work very well. Hope someone can help me solve the issue, thanks!

I set up two one node kafka clusters, kafka version: 3.4.0.
I created two topics (test1 , test3) in TEST1 cluster, and created one topic (test3) in TEST3 cluster.
Then I run "bin/connect-mirror-maker.sh mm2.properties", I expect there would be one new topic TEST3.test3 topic in TEST1 cluster, and there would be two new topics (TEST1.test1 and TEST1.test3) in TEST3 cluster.

However, I found mm2 created a lot duplicated TEST1.test3 in both clusters which is not correct.
in TEST1 cluster:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
TEST1.TEST1.TEST1.TEST1.test3
TEST1.TEST1.TEST1.heartbeats
TEST1.TEST1.TEST1.test3
TEST1.TEST1.heartbeats
TEST1.TEST1.test3
TEST1.heartbeats
TEST1.test3
__consumer_offsets
heartbeats
mm2-configs.TEST3.internal
mm2-offsets.TEST3.internal
mm2-status.TEST3.internal
test1
test3

in TEST3 cluster:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
TEST1.TEST1.TEST1.TEST1.test3
TEST1.TEST1.TEST1.heartbeats
TEST1.TEST1.TEST1.test3
TEST1.TEST1.heartbeats
TEST1.TEST1.test3
TEST1.checkpoints.internal
TEST1.heartbeats
TEST1.test3
__consumer_offsets
heartbeats
mm2-configs.TEST1.internal
mm2-configs.TEST3.internal
mm2-offset-syncs.TEST3.internal
mm2-offsets.TEST1.internal
mm2-offsets.TEST3.internal
mm2-status.TEST1.internal
mm2-status.TEST3.internal
test3

And mm2 was creating more and more TEST1.test3, then I killed mm2 process. So anyone can explain me why this happened?
Thanks!

========================= mm2 properties file=========================

clusters = TEST3, TEST1

TEST3.bootstrap.servers = 192.168.122.172:9092
TEST1.bootstrap.servers = 192.168.122.126:9092

TEST1->TEST3.enabled = true

TEST1->TEST3.topics = .*

replication.factor=1

checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

After altering the partition in source Kafka cluster MM2 version (2.7.2) is not replicating the data in target cluster

We have an MM cluster in our environment (2.7.2) that replicates data from the source to the target cluster. Whenever we alter any partition in the source, MM2 replicates the newer partition in the target cluster, but not the data. Sometimes we will have to perform a MM2 restart to get the data synced to the target. Is it the usual way to connect clusters, or is there a bug? Sometimes it is replicated, and sometimes not. And also, when checked in MM, jconsole tasks are not getting spinned up for the newer partitions unless we restart MM2.

Why Kafka Streams require DESCRIBE access to other topics when kafka cluster is configured with SASL_SSL?

I have a 3 Node Kafka cluster (Kafka Version 1.1.1) with SASL_SSL security enabled using pluggable JAAS configurations. When Kafka Producers and consumers with proper security configurations communicate with secured broker then write/read operations works without any issue, when I use Kafka streams then I am seeing a weird behaviour even if the Stream application is working fine on client side, I see an error log in the server side where threads corresponding to stream application are making a describe request to all other topics on the broker. My Stream application is very simple which copies data from a topic "testA" to another topic "testB". Why are Kafka stream threads on broker side trying to describe other topics which steam application is not even authorised to describe. Ideally the stream application should only describe the topics "testA" and "testB", if my understanding is correct.

The error log is :

[2020-01-16 09:28:02,235] ERROR Requested Operation : DESCRIBE for
Resource : Topic:dmp-tgd8uye1am cannot be authorized due to
invalid/missing claims in the JWT_TOKEN claims : {
Topic:testA=[READ, WRITE, DESCRIBE],
Topic:testB=[READ, WRITE, DESCRIBE],
Group:*=[READ, WRITE, DESCRIBE] } (com.fico.dmp.core.security.providers.kafka.authorization.DmpJwtAuthorizer)

The JWT_TOKEN is set by the Client side with permissions for "testA" and "testB". Why in the above error log stream thread is making describe call to "dmp-tgd8uye1am" topic ?

This token is validated on the server side involving 2 steps a) Authentication b) Authorisation

a) Authentication with SaslServer class : This step is successful.

b) Authorisation with DmpJwtAuthorizer class (implementation details below) : This is the step where the weird logs show up. I can see that in the broker process logs there are multiple threads corresponding to stream application doing authorisation. The threads which request describe access for "testA" and "testB" passes. And the remaining threads request for DESCRIBE access on other topics like "dmp-tgd8uye1am" which stream application has nothing to do with.

The broker security configurations are as follows :

  1. Kafka-server-start.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-1.1.1/kafka-config/kafka_server_jaas.conf"
  1. kafka_server_jaas.conf
KafkaServer {
   com.fico.dmp.core.security.providers.kafka.authentication.DmpJwtLoginModule required
   username="kafkabroker"
   password="HWBcKgfL5bQN"
   user_kafkabroker="HWBcKgfL5bQN";
};
Client {
   org.apache.zookeeper.server.auth.DigestLoginModule required
   username="kafka"
   password="HWBcKgfL5bQN";
};
  1. Server.properties
listeners=SSL://:9093,SASL_SSL://:9094
advertised.listeners=SSL://:9093,SASL_SSL://:9094
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=com.core.security.providers.kafka.authorization.DmpJwtAuthorizer
principal.builder.class=com.core.security.providers.kafka.authentication.DmpJwtPrincipalBuilder
security.inter.broker.protocol=SSL
ssl.client.auth=required
ssl.truststore.location=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.amzn2.0.1.x86_64/jre/lib/security/cacerts
ssl.truststore.password=******
ssl.keystore.location=/opt/kafka_2.11-1.1.1/kafka-config/keystore
ssl.keystore.password=*****

default.replication.factor=3
min.insync.replicas=2
auto.leader.rebalance.enable=true

The KafkaStream client configs and working logic are as follows :

configStreamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().configStreamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
configStreamProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * configStreamProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, configStreamProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, configStreamProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
configProperties.put("security.protocol", "SASL_SSL");
configProperties.put("sasl.mechanism", "PLAIN");
configProperties.put("sasl.jaas.config", "com.core.security.providers.kafka.authentication.DmpJwtClient required;");

		    
final StreamsBuilder builder = new StreamsBuilder();

String inputTopic = (String) inputOptions.get("testA");
String outputTopic = (String) outputOptions.get("testB");
		    
builder.stream(inputTopic).to(outputTopic);
		    
final KafkaStreams streams = new KafkaStreams(builder.build(), configStreamProperties);
		    
streams.cleanUp();
streams.start();	
		    
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

DmpJwtAuthorizer Class on Server Side :

public class DmpJwtAuthorizer extends kafka.security.auth.SimpleAclAuthorizer {

// SimpleAclAuthorizer is part of Kafka libraries and extends the kafka.security.auth.Authorizer class
    
    private static final Logger logger = LoggerFactory.getLogger(DmpJwtAuthorizer.class);
    
    @Override
    public boolean authorize(Session session, Operation operation, Resource resource) {
        boolean authorized = false;
            
        DmpKafkaPrincipal principal = (DmpKafkaPrincipal) session.principal();
        String user = principal.getName();
        Map<Resource, List<AclOperation>> allowedResources = principal.getAllowedResources();
        
        if ((user != null) && (!user.isEmpty())) {
            Iterator<Entry<Resource, List<AclOperation>>> iterator = allowedResources.entrySet().iterator();
            
            while(iterator.hasNext() && !authorized) {
            	Entry<Resource, List<AclOperation>> entry = iterator.next();
            	Resource allowedResource = entry.getKey();
            	List<AclOperation> allowedOperations = entry.getValue();
            	
            	AclOperation requestedACLOperation = operation.toJava();
            	
            	if(resource.resourceType().name().equals(allowedResource.resourceType().name())
                        && isMatchingResourceName(resource, allowedResource)
            			&& allowedOperations.contains(requestedACLOperation)){
            		
            		authorized = true;
            	}
            }
            
            if (!authorized) {
              
              logger.error("User : {} Requested Operation : {} for Resource : {} cannot be authorized due to invalid/missing claims in the JWT_TOKEN claims : {} THREAD NAME : {} THREAD ID : {}", user, operation.toJava(), resource, allowedResources, Thread.currentThread().getName(), Thread.currentThread().getId());
            }
        }
        
        return authorized;
    }
    
    public boolean isMatchingResourceName(Resource requestedResource, Resource allowedResource){
        String requestedResourceName = requestedResource.name();
        String allowedResourceName = allowedResource.name();
        
        String regex = ("\\Q" + allowedResourceName + "\\E").replace("*", "\\E.*\\Q");
        return requestedResourceName.matches(regex);
    }
}

Kafka Memory consumption, when there is no traffic

Hi,

After deploying a 3 node Kafka 2.4.0 cluster, with Strimzi operator in Kubernetes, we've observed that the memory consumption increases for a while and then stabilizes with that much consumed memory. Find below the screenshot for the same. And there is no traffic sent to the cluster.
image
image
And another observation is that,

  1. sent traffic to the cluster - produced and consumed messages.
  2. updated the Kafka broker pods, with the JVM and they have been restarted, but the zookeeper pods didn't restart,
    the memory consumption increased to certain limit and was stabilized after that, but the memory consumed by broker pod 0, was more as compared to broker 1 and 2.
    Can I please get some help in this regard. Is it the expected behavior of Kafka or how it is.

Error running compile: Compilation failed

Hi,

I cloned this repo with

git clone [email protected]:kafka-dev/kafka.git kafka

cd kafka

./sbt

project java-examples
current
Current project is java-examples 0.7
Current Scala version is 2.8.0
Current log level is info
Stack traces are enabled
when I run
run
I am seeing this error
"/root/kafka/core/src/main/scala/kafka/Kafka.scala:20: value log4j is not a member of package org.apache"

and compilation failed,

I am Installed kafka through clouderamanager, is it makes any problem to run this code ?

Could anybody help me to rid from this ?

Thanks.

StringDecoder throws BufferUnderflowException

I was attempting to use the StringDecoder to pull some JSON strings off a queue. During testing I am sending the same message through 1k times and see the following exception about 80% of the time:

java.nio.BufferUnderflowException
        at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127)
        at kafka.serializer.StringDecoder.toEvent(Decoder.scala:34)
        at com.tumblr.motherboy.workers.Consumer$$anonfun$act$1$$anonfun$apply$2$$anonfun$apply$3.apply(Main.scala:17)
        at com.tumblr.motherboy.workers.Consumer$$anonfun$act$1$$anonfun$apply$2$$anonfun$apply$3.apply(Main.scala:15)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:29)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
        at kafka.consumer.KafkaMessageStream.foreach(KafkaMessageStream.scala:28)
        at com.tumblr.motherboy.workers.Consumer$$anonfun$act$1$$anonfun$apply$2.apply(Main.scala:15)
        at com.tumblr.motherboy.workers.Consumer$$anonfun$act$1$$anonfun$apply$2.apply(Main.scala:13)
        at scala.actors.ReactorTask.run(ReactorTask.scala:34)
        at scala.actors.ReactorTask.compute(ReactorTask.scala:66)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:147)
        at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)

The remaining 20% of the time, the string deserializes just fine (again, all messages are identical). Discussion on IRC led me to implement my own StringDecoder as follows:

class StringDecoder extends Decoder[String] {
  def toEvent(message: Message):String = {                                                  
    val buf = message.payload
    val arr = new Array[Byte](buf.remaining)
    buf.get(arr)
    new String(arr)
  }                                       
}  

The above code does not throw an exception.

Platform: OS X 10.6.8

Java Version:
java version "1.6.0_26"
Java(TM) SE Runtime Environment (build 1.6.0_26-b03-384-10M3425)
Java HotSpot(TM) 64-Bit Server VM (build 20.1-b02-384, mixed mode)

Scala Version: Scala code runner version 2.8.1.final -- Copyright 2002-2010, LAMP/EPFL

Kafka Version: master/trunk

KafkaProducer -org.apache.kafka.clients.producer.internals.Sender - NETWORK_EXCEPTION. Error Message: Disconnected from node 0

Our Application is Flink application , There is a kafka connector is using internally by flink . We are using below dependency

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>1.15.0</version>
    </dependency>

Which contains Kafka-client 3.0.0. So Flink sink will send msg to topic by kafka producer internally , this case we are getting the below error. Because of that we are getting the delay (30-60 secs) in processing . This will causes the performance issue. This error is intermittent not every time.

Please help me out if you have any solution for this.

2022-10-19 07:33:14.240 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 38 on topic-partition ccs-fsm-ingress-0, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION. Error Message: Disconnected from node 0

Tag legacy release versions

Could we publish git tags for legacy release versions, like 0.5.2, to make it easier to find these old versions?

kerberos ticket is not getting reloaded, neither sasl kinit command is working

Hi,

I am using kafka appender library to push logs to logstash.
I am using kerberos authentication method to connect to kafka broker. At the start of my app, everything works fine and I am able to connect to kafka brokers. Below are few producer configs I am providing.

sasl.jaas.config=com.sun.security.auth.module.krb5LoginModule
required
ticketCache="path-to-my-ticket-cache"
useTicketCache="true"
refreshKrb5Config="true"
doNotPrompt="true"
principal="myprincipal@realm";

When I hit this command - klist -c path-to-my-ticket-cache , I see ticket is getting renewed after every 2 hours but the same renewed time is not reflecting in my java app. The app still shows the initial expiry time loaded at startup and it destroys the ticket after expiry and authentication starts failing.

Few Things I tried,

Added renewTgt=true and sasl.kerberos.kinit.cmd=/usr/bin/kinit -c path-to-my-ticket-cache -R property to producer config, it tried to renew but failed with error-IOException: error=2, No such file or directory. When i hit same command from shell, it works fine and renew ticket. how I can pass cache location parameter in this command?
How we can force the app to read the ticket from the main cache which is getting renewed but kafka client seems not aware of it?

Unexpected behavior by fluentd-collector from one Elastic search index to kafka

We have implemented fluentd-kafka plugin to collect information from kafka topics and post the formatted data to Elastic search database.
However, recently, after upgrading from kafka-6.0.0-1090 to kafka-7.0.0-1250 (these RPMs are part of our organization's internal packaging), we have observed that the custom micro-service alarm-collector build using fluentd-kafka plugin 0.16.0 version, was able to process many indexes but one of them - alarmhistoryindex - keeps failing to connect to kafka.
But after restarting the micro-service pods, the connection was successful.
We request a possible explanation for this.
This feels some bug to us - if true, may we know the possible fix please?
Thank you all in advance.

Here is the log from our microservice...
alarm-collector-belk-fluentd-statefulset-0(1).log

Kafka Producer Metrics Explanation

Context :
Kafka Producer 0.8.x

Problem:
Kafka Producer emits metrics regarding request size stats, request latency and rate stats.
But the inherent meaning of the these metrics are not clear. What does this measure?
Is for each producer send request(which contains batches of messages per broker)? OR Is it for a batch of messages defined according to user batching policy? What happens when some application code has multiple async producers to increase performance (how are rate and percentiles measured?)?

kafka java client

trying to run kafka client from windows client throws exception

i am trying to run kafka client from windows maching both from java 7 and 6
throws exception :
Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 10000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
at kafka.producer.ZKBrokerPartitionInfo.(ZKBrokerPartitionInfo.scala:62)
at kafka.producer.Producer.(Producer.scala:47)
at kafka.javaapi.producer.Producer.(Producer.scala:33)
at kafka.javaapi.producer.Producer.(Producer.scala:40)
at kafka.examples.Test.main(Test.java:16)

about CVE-2023-25194

hello! i need help. we use 2.5.0 of kafka, but we don't want to update the version of kafka, is there any other way to solve this problem?

Confluent Kafka C# 1.3.0 - Local: Maximum application poll interval (max.poll.interval.ms) exceeded

Hi

We are regularly receiving the below error in our Kafka consumer. We are unable to fix this.

Local: Maximum application poll interval (max.poll.interval.ms) exceeded

Can you please help us in resolving this. Consumer config

BootstrapServers = xxxx,
GroupId = xxxxx,
EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
MaxPollIntervalMs = 30000,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true

Code Snippet

using (var consumer = new ConsumerBuilder<Ignore, string>(KafkaConfig).SetErrorHandler(ErrorHandler)
.Build())
{
consumer.Subscribe(KafkaTopic);

Issue with closing Kafka broker.

I was testing Kafka's load tolerance. I am running Kafka in a multi-node, multi-broker setup. I have 3 machines (with Windows OS) connected in a cluster with one broker in each machine. When I close node 3, it can still send and receive messages, while node 2 cannot receive messages but can send messages. Why is this happening? The leader is node 1.

Memory increases without any load

I have deployed Kafka 2.5.0 with Strimzi in a Kubernetes cluster.
In the last 4 days I have seen an unexpected memory increase as there was very low activity to no activity at all
image

image
To be mentioned that end2end latency increased as well.
I am not certain that the memory increase alone justifies the latency increase and I would also like to hear some other ideas.

Could you please help me out?

Error: "The input line is too long" on Windows

I was testing Kafka on Windows and found a limitation / problem in how the classpath is constructed with the batch file "kafka-run-class.bat" (which is used when starting zookeeper/kafka). In this batch file the classpath is constructed jar by jar with the absolute path for each jar. Because of the huge number of jar files, the Windows cmd.exe environment limit of 8191 character is reached very fast under real life installations (see: https://support.microsoft.com/de-de/help/830473/command-prompt-cmd-exe-command-line-string-limitation).

If this batch-file would use the java classpath syntax with wildcards, the problem should be solvable.
e.g changing the following lines in the "kafka-run-class.bat" from

rem Classpath addition for release
for %%i in ("%BASE_DIR%\libs\*") do (
	call :concat "%%i"
)

to

rem Classpath addition for release
call :concat "%BASE_DIR%\libs\*;"

would solve the problem.

Thanks,
Frank

tier storage uploaded segments not recycled

I tried latest tier storage from apache 3.6 branch, one thing I notice is broker local segments are not cleaned up after uploaded to remote storage. This is show as local disk usage is keep increasing and local segment never get deleted. Anyone seeing similar behavior? I wonder if there are some new configurations I should add, thanks

topic configuration: bin/kafka-topics.sh --bootstrap-server kafka-test-51-kafka-bootstrap:9092 --topic dfi-test6 --create --config remote.storage.enable=true --config local.retention.bytes=104857600

local disk usage: [kafka@kafka-test-51-kafka-7 /]$ du -sh /var/lib/kafka/data-0/kafka-log7/dfi*
12G /var/lib/kafka/data-0/kafka-log7/dfi-test-10

offset 0 still exist and disk available is decreasing with incoming traffic.

Kafka connectors Log

Hi everyone
I would like to know if is there's any way to have log for per connector that's mean each connector with its own log file

Dataflow-Kafka Consumer Group - UnAuthorized Consumer Group

Issue Summary:
the warning in data flow worker is misleading, I tried to dig deep into Beam KafkaIO, looks like this is a dataflow runner

What is the Business Impact you are facing?

Observed Behavior:
I m able to read the data from confluent Kafka with the provided Consumer group development.opp-ds.svc-app-event-ingest.omni-cg But the warning I'm getting is misleading but the Kafka Consumer Lag is going down as intended.

I was successfully able to read the 98,637+1,190,217 events from two different dataflow jobs and the consumer group Lag went down to Zero.

But the warning in the data flow worker is misleading, I tried to dig deep into Beam KafkaIO.
Warning :
"org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: Reader-7_offset_consumer_136115595_development.opp-ds.svc-app-event-ingest.omni-cg-kafka-test-14

Expected Behavior (jobID if applicable): We don't have a JobID but I should not be seeing these unauthorized exceptions.

Please include screenshots. Yes

If Applicable:

  • Beam SDK (Language and Version): 2.32.0
  • Apache Kafka Version - 2.3.1
    NotAuthorizedAccessError

how to set kafka's logger dir

how to set kafka's logger dir

I want to read logger when i run producer and consumer .but i don't know how to do it!

kafka can't produce when using remote zookeeper, connection refused by zookeeper

Hi,
I am using kafka 8.0( kafka_2.8.0-0.8.0), zookeeper 3.4.5. If kafka and zookeeper exist in same machine, it's ok.

But if kafka in 192.168.195.178, zookeeper in 192.168.195.177, by using 'bin/kafka-console-producer.sh --broker-list 192.168.195.178:9092 --topic test' in kafka server, it genetates following error messages:

[2014-01-17 17:55:58,468] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:192.168.195.177,port:9092)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:192.168.195.177,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:187)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
... 12 more
[2014-01-17 17:55:58,471] ERROR Producer connection to 192.168.195.177:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2014-01-17 17:55:58,472] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [id:0,host:192.168.195.177,port:9092] failed (kafka.client.ClientUtils$)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2014-01-17 17:55:58,474] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:192.168.195.177,port:9092)] failed (kafka.producer.async.DefaultEventHandler)
[2014-01-17 17:55:58,605] ERROR Producer connection to 192.168.195.177:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:187)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2014-01-17 17:55:58,606] WARN Fetching topic metadata with correlation id 2 for topics [Set(test)] from broker [id:0,host:192.168.195.177,port:9092] failed (kafka.client.ClientUtils$)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:187)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)

Getting InvalidRecordException while using kafka

In my application, I get data which gets queued using kafka and saved on the disk and the consumer which gets this data from kafka and does the processing. But When my consumer is trying to read data from kafka I am getting below exceptions :

2017-06-09 10:57:24,733 ERROR NetworkClient Uncaught error in request completion:
org.apache.kafka.common.KafkaException: Error deserializing key/value for partition TcpMessage-1 at offset 155884487
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:628) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:566) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) [kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) [kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) [kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) [kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) [kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) [kafka-clients-0.9.0.1.jar:?]
at com.affirmed.mediation.edr.kafka.tcpMessage.TcpMessageConsumer.doWork(TcpMessageConsumer.java:190) [EdrServer.jar:?]
at com.affirmed.mediation.edr.kafka.tcpMessage.TcpMessageConsumer.run(TcpMessageConsumer.java:248) [EdrServer.jar:?]
## Caused by: org.apache.kafka.common.record.InvalidRecordException: Record is corrupt (stored crc = 2016852547, computed crc = 1399853379)
at org.apache.kafka.common.record.Record.ensureValid(Record.java:226) ~[kafka-clients-0.9.0.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:617) ~[kafka-clients-0.9.0.1.jar:?]
... 15 more

Could anyone please help me with this. I got stuck with it and not able to figure out the root.

When this occurs is there any way to catch this exception and move the offset? Currently, consumer is keep polling for the same range of records in the next poll as
result never moving forward.

KAFKA-topics.sh --list TimeoutException

ERROR org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1706136118301, tries=1, nextAllowedTryMs=1706136118444) timed out at 1706136118344 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled listTopics request with correlation id 3 due to node 1 being disconnected
(kafka.admin.TopicCommand$)

This the error I get when I try to list the commands
the history of the situation I have a server standalone VM on OCI ( oracle cloud ) this is the source and the destination is normal 3 linux servers as a cluster with apache kafka 3.6 , when I try to list i get timeout, but what is confusing me when I try to create or produce or consume it works well , there is a firewall between OCI and servers but network team confirms that everything is allowed.

  • listing topics from the servers to OCI wasn't working also but when we changed the etc/hosts file in the servers, it could list the topics on OCI

  • I will post below the debugging I tried when I ran the command

  • Registered kafka:type=kafka.Log4jController MBean
    AdminClientConfig values:
    auto.include.jmx.reporter = true
    bootstrap.servers = [xx.x.xxx:9092, xx.x.xxx:9092]
    client.dns.lookup = use_all_dns_ips
    client.id =
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

[AdminClient clientId=adminclient-1] Setting bootstrap cluster metadata Cluster(id = null, nodes = [xx.x.xxx:9092 (id: -2 rack: null), xx.x.xxx:9092 (id: -1 rack: null)], partitions = [], controller = null).
Registered metric named MetricName [name=count, group=kafka-metrics-count, description=total number of registered metrics, tags={client-id=adminclient-1}]
Added sensor with name connections-closed:
Registered metric named MetricName [name=connection-close-total, group=admin-client-metrics, description=The total number of connections closed, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=connection-close-rate, group=admin-client-metrics, description=The number of connections closed per second, tags={client-id=adminclient-1}]
Added sensor with name connections-created:
Registered metric named MetricName [name=connection-creation-total, group=admin-client-metrics, description=The total number of new connections established, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=connection-creation-rate, group=admin-client-metrics, description=The number of new connections established per second, tags={client-id=adminclient-1}]
Added sensor with name successful-authentication:
Registered metric named MetricName [name=successful-authentication-total, group=admin-client-metrics, description=The total number of connections with successful authentication, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=successful-authentication-rate, group=admin-client-metrics, description=The number of connections with successful authentication per second, tags={client-id=adminclient-1}]
Added sensor with name successful-reauthentication:
Registered metric named MetricName [name=successful-reauthentication-total, group=admin-client-metrics, description=The total number of successful re-authentication of connections, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=successful-reauthentication-rate, group=admin-client-metrics, description=The number of successful re-authentication of connections per second, tags={client-id=adminclient-1}]
Added sensor with name successful-authentication-no-reauth:
Registered metric named MetricName [name=successful-authentication-no-reauth-total, group=admin-client-metrics, description=The total number of connections with successful authentication where the client does not support re-authentication, tags={client-id=adminclient-1}]
Added sensor with name failed-authentication:
Registered metric named MetricName [name=failed-authentication-total, group=admin-client-metrics, description=The total number of connections with failed authentication, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=failed-authentication-rate, group=admin-client-metrics, description=The number of connections with failed authentication per second, tags={client-id=adminclient-1}]
Added sensor with name failed-reauthentication:
Registered metric named MetricName [name=failed-reauthentication-total, group=admin-client-metrics, description=The total number of failed re-authentication of connections, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=failed-reauthentication-rate, group=admin-client-metrics, description=The number of failed re-authentication of connections per second, tags={client-id=adminclient-1}]
Added sensor with name reauthentication-latency:
Registered metric named MetricName [name=reauthentication-latency-max, group=admin-client-metrics, description=The max latency observed due to re-authentication, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=reauthentication-latency-avg, group=admin-client-metrics, description=The average latency observed due to re-authentication, tags={client-id=adminclient-1}]
Added sensor with name bytes-sent-received:
Registered metric named MetricName [name=network-io-total, group=admin-client-metrics, description=The total number of network operations (reads or writes) on all connections, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=network-io-rate, group=admin-client-metrics, description=The number of network operations (reads or writes) on all connections per second, tags={client-id=adminclient-1}]
Added sensor with name bytes-sent:
Registered metric named MetricName [name=outgoing-byte-total, group=admin-client-metrics, description=The total number of outgoing bytes sent to all servers, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=outgoing-byte-rate, group=admin-client-metrics, description=The number of outgoing bytes sent to all servers per second, tags={client-id=adminclient-1}]
Added sensor with name requests-sent:
Registered metric named MetricName [name=request-total, group=admin-client-metrics, description=The total number of requests sent, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=request-rate, group=admin-client-metrics, description=The number of requests sent per second, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=request-size-avg, group=admin-client-metrics, description=The average size of requests sent., tags={client-id=adminclient-1}]
Registered metric named MetricName [name=request-size-max, group=admin-client-metrics, description=The maximum size of any request sent., tags={client-id=adminclient-1}]
Added sensor with name bytes-received:
Registered metric named MetricName [name=incoming-byte-total, group=admin-client-metrics, description=The total number of bytes read off all sockets, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=incoming-byte-rate, group=admin-client-metrics, description=The number of bytes read off all sockets per second, tags={client-id=adminclient-1}]
Added sensor with name responses-received:
Registered metric named MetricName [name=response-total, group=admin-client-metrics, description=The total number of responses received, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=response-rate, group=admin-client-metrics, description=The number of responses received per second, tags={client-id=adminclient-1}]
Added sensor with name select-time:
Registered metric named MetricName [name=select-total, group=admin-client-metrics, description=The total number of times the I/O layer checked for new I/O to perform, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=select-rate, group=admin-client-metrics, description=The number of times the I/O layer checked for new I/O to perform per second, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-wait-time-ns-avg, group=admin-client-metrics, description=The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds., tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-waittime-total, group=admin-client-metrics, description=Deprecated The total time the I/O thread spent waiting, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-wait-ratio, group=admin-client-metrics, description=Deprecated The fraction of time the I/O thread spent waiting, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-wait-time-ns-total, group=admin-client-metrics, description=The total time the I/O thread spent waiting, tags={client-id=adminclient-1}]
Added sensor with name io-time:
Registered metric named MetricName [name=io-time-ns-avg, group=admin-client-metrics, description=The average length of time for I/O per select call in nanoseconds., tags={client-id=adminclient-1}]
Registered metric named MetricName [name=iotime-total, group=admin-client-metrics, description=Deprecated The total time the I/O thread spent doing I/O, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-ratio, group=admin-client-metrics, description=Deprecated The fraction of time the I/O thread spent doing I/O, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=io-time-ns-total, group=admin-client-metrics, description=The total time the I/O thread spent doing I/O, tags={client-id=adminclient-1}]
[AdminClient clientId=adminclient-1] sslCiphers: created new gauge suite with maxEntries = 100.
[AdminClient clientId=adminclient-1] clients: created new gauge suite with maxEntries = 100.
Registered metric named MetricName [name=connection-count, group=admin-client-metrics, description=The current number of active connections., tags={client-id=adminclient-1}]
Kafka version: 3.6.1
Kafka commitId: 5e3c2b738d253ff5
Kafka startTimeMs: 1706133998344
Registered metric named MetricName [name=version, group=app-info, description=Metric indicating version, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=commit-id, group=app-info, description=Metric indicating commit-id, tags={client-id=adminclient-1}]
Registered metric named MetricName [name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={client-id=adminclient-1}]
[AdminClient clientId=adminclient-1] Kafka admin client initialized
[AdminClient clientId=adminclient-1] Thread starting
[AdminClient clientId=adminclient-1] Trying to choose nodes for [] at 1706133998348
[AdminClient clientId=adminclient-1] Found least loaded node xx.x.xxx:9092 (id: -1 rack: null) with no active connection
[AdminClient clientId=adminclient-1] Assigned Call(callName=fetchMetadata, deadlineMs=1706134028348, tries=0, nextAllowedTryMs=0) to node xx.x.xxx:9092 (id: -1 rack: null)
Resolved host xx.x.xxx as xx.x.xxx
[AdminClient clientId=adminclient-1] Initiating connection to node xx.x.xxx:9092 (id: -1 rack: null) using address /xx.x.xxx
[AdminClient clientId=adminclient-1] Queueing Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) with a timeout 30000 ms from now.
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: -1 rack: null). Must delay 11950 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=11950)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998358
[AdminClient clientId=adminclient-1] Metadata is not ready: we have not fetched metadata from the bootstrap nodes yet.
[AdminClient clientId=adminclient-1] Unable to assign Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to a node.
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: -1 rack: null). Must delay 11950 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=100)
Added sensor with name node--1.requests-sent
Registered metric named MetricName [name=request-total, group=admin-client-node-metrics, description=The total number of requests sent, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=request-rate, group=admin-client-node-metrics, description=The number of requests sent per second, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=request-size-avg, group=admin-client-node-metrics, description=The average size of requests sent., tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=request-size-max, group=admin-client-node-metrics, description=The maximum size of any request sent., tags={client-id=adminclient-1, node-id=node--1}]
Added sensor with name node--1.bytes-sent
Registered metric named MetricName [name=outgoing-byte-total, group=admin-client-node-metrics, description=The total number of outgoing bytes, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=outgoing-byte-rate, group=admin-client-node-metrics, description=The number of outgoing bytes per second, tags={client-id=adminclient-1, node-id=node--1}]
Added sensor with name node--1.responses-received
Registered metric named MetricName [name=response-total, group=admin-client-node-metrics, description=The total number of responses received, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=response-rate, group=admin-client-node-metrics, description=The number of responses received per second, tags={client-id=adminclient-1, node-id=node--1}]
Added sensor with name node--1.bytes-received
Registered metric named MetricName [name=incoming-byte-total, group=admin-client-node-metrics, description=The total number of incoming bytes, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=incoming-byte-rate, group=admin-client-node-metrics, description=The number of incoming bytes per second, tags={client-id=adminclient-1, node-id=node--1}]
Added sensor with name node--1.latency
Registered metric named MetricName [name=request-latency-avg, group=admin-client-node-metrics, description=, tags={client-id=adminclient-1, node-id=node--1}]
Registered metric named MetricName [name=request-latency-max, group=admin-client-node-metrics, description=, tags={client-id=adminclient-1, node-id=node--1}]
[AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
[AdminClient clientId=adminclient-1] Completed connection to node -1. Fetching API versions.
[AdminClient clientId=adminclient-1] Initiating API versions fetch from node -1.
[AdminClient clientId=adminclient-1] No version information found when sending API_VERSIONS with correlation id 0 to node -1. Assuming version 3.
[AdminClient clientId=adminclient-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=adminclient-1, correlationId=0, headerVersion=2) and timeout 3600000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.6.1')
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998652
[AdminClient clientId=adminclient-1] Metadata is not ready: we have not fetched metadata from the bootstrap nodes yet.
[AdminClient clientId=adminclient-1] Unable to assign Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to a node.
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: -1 rack: null). Must delay 9223372036854775807 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=100)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998654
[AdminClient clientId=adminclient-1] Metadata is not ready: we have not fetched metadata from the bootstrap nodes yet.
[AdminClient clientId=adminclient-1] Unable to assign Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to a node.
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: -1 rack: null). Must delay 9223372036854775807 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=100)
[AdminClient clientId=adminclient-1] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=adminclient-1, correlationId=0, headerVersion=2): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, maxVersion=15), ApiVersion(apiKey=2, minVersion=0, maxVersion=8), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, minVersion=0, maxVersion=7), ApiVersion(apiKey=5, minVersion=0, maxVersion=4), ApiVersion(apiKey=6, minVersion=0, maxVersion=8), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=4), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=4), ApiVersion(apiKey=25, minVersion=0, maxVersion=3), ApiVersion(apiKey=26, minVersion=0, maxVersion=3), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=3), ApiVersion(apiKey=30, minVersion=0, maxVersion=3), ApiVersion(apiKey=31, minVersion=0, maxVersion=3), ApiVersion(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=4), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=3), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=3), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersion=0, maxVersion=0), ApiVersion(apiKey=56, minVersion=0, maxVersion=3), ApiVersion(apiKey=57, minVersion=0, maxVersion=1), ApiVersion(apiKey=58, minVersion=0, maxVersion=0), ApiVersion(apiKey=60, minVersion=0, maxVersion=0), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=0), ApiVersion(apiKey=67, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=0, finalizedFeatures=[], zkMigrationReady=false)
[AdminClient clientId=adminclient-1] Node -1 has finalized features epoch: 0, finalized features: [], supported features: [], ZK migration ready: false, API versions: (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 15 [usable: 15], ListOffsets(2): 0 to 8 [usable: 8], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): 0 to 7 [usable: 7], StopReplica(5): 0 to 4 [usable: 4], UpdateMetadata(6): 0 to 8 [usable: 8], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 8], FindCoordinator(10): 0 to 4 [usable: 4], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 4], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 4 [usable: 4], AddOffsetsToTxn(25): 0 to 3 [usable: 3], EndTxn(26): 0 to 3 [usable: 3], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 4 [usable: 4], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 3 [usable: 3], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 3 [usable: 3], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], DescribeQuorum(55): UNSUPPORTED, AlterPartition(56): 0 to 3 [usable: 3], UpdateFeatures(57): 0 to 1 [usable: 1], Envelope(58): 0 [usable: 0], DescribeCluster(60): 0 [usable: 0], DescribeProducers(61): 0 [usable: 0], UnregisterBroker(64): UNSUPPORTED, DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 [usable: 0], AllocateProducerIds(67): 0 [usable: 0], ConsumerGroupHeartbeat(68): UNSUPPORTED).
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998742
[AdminClient clientId=adminclient-1] Metadata is not ready: we have not fetched metadata from the bootstrap nodes yet.
[AdminClient clientId=adminclient-1] Unable to assign Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to a node.
[AdminClient clientId=adminclient-1] Sending MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to xx.x.xxx:9092 (id: -1 rack: null). correlationId=1, timeoutMs=29606
[AdminClient clientId=adminclient-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=adminclient-1, correlationId=1, headerVersion=2) and timeout 29606 to node -1: MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=100)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998744
[AdminClient clientId=adminclient-1] Metadata is not ready: we have not fetched metadata from the bootstrap nodes yet.
[AdminClient clientId=adminclient-1] Unable to assign Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to a node.
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=100)
[AdminClient clientId=adminclient-1] Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=adminclient-1, correlationId=1, headerVersion=2): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=0, host='xx.x.xxx', port=9092, rack=null), MetadataResponseBroker(nodeId=1, host='xx.x.xxx', port=9092, rack=null)], clusterId='xCDtlxlOT-uxomG0SSnBFQ', controllerId=0, topics=[], clusterAuthorizedOperations=-2147483648)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 1 response(s)
[AdminClient clientId=adminclient-1] Updating cluster metadata to Cluster(id = xCDtlxlOT-uxomG0SSnBFQ, nodes = [xx.x.xxx:9092 (id: 0 rack: null), xx.x.xxx:9092 (id: 1 rack: null)], partitions = [], controller = xx.x.xxx:9092 (id: 0 rack: null))
[AdminClient clientId=adminclient-1] Call(callName=fetchMetadata, deadlineMs=1706134028348, tries=0, nextAllowedTryMs=0) got response MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=0, host='xx.x.xxx', port=9092, rack=null), MetadataResponseBroker(nodeId=1, host='xx.x.xxx', port=9092, rack=null)], clusterId='xCDtlxlOT-uxomG0SSnBFQ', controllerId=0, topics=[], clusterAuthorizedOperations=-2147483648)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0)] at 1706133998769
[AdminClient clientId=adminclient-1] Metadata is ready to use.
[AdminClient clientId=adminclient-1] Found least loaded node xx.x.xxx:9092 (id: 1 rack: null) with no active connection
[AdminClient clientId=adminclient-1] Assigned Call(callName=listTopics, deadlineMs=1706134058350, tries=0, nextAllowedTryMs=0) to node xx.x.xxx:9092 (id: 1 rack: null)
Resolved host xx.x.xxx as xx.x.xxx
[AdminClient clientId=adminclient-1] Initiating connection to node xx.x.xxx:9092 (id: 1 rack: null) using address /xx.x.xxx
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: 1 rack: null). Must delay 9698 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=9698)
Added sensor with name node-1.requests-sent
Registered metric named MetricName [name=request-total, group=admin-client-node-metrics, description=The total number of requests sent, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=request-rate, group=admin-client-node-metrics, description=The number of requests sent per second, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=request-size-avg, group=admin-client-node-metrics, description=The average size of requests sent., tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=request-size-max, group=admin-client-node-metrics, description=The maximum size of any request sent., tags={client-id=adminclient-1, node-id=node-1}]
Added sensor with name node-1.bytes-sent
Registered metric named MetricName [name=outgoing-byte-total, group=admin-client-node-metrics, description=The total number of outgoing bytes, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=outgoing-byte-rate, group=admin-client-node-metrics, description=The number of outgoing bytes per second, tags={client-id=adminclient-1, node-id=node-1}]
Added sensor with name node-1.responses-received
Registered metric named MetricName [name=response-total, group=admin-client-node-metrics, description=The total number of responses received, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=response-rate, group=admin-client-node-metrics, description=The number of responses received per second, tags={client-id=adminclient-1, node-id=node-1}]
Added sensor with name node-1.bytes-received
Registered metric named MetricName [name=incoming-byte-total, group=admin-client-node-metrics, description=The total number of incoming bytes, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=incoming-byte-rate, group=admin-client-node-metrics, description=The number of incoming bytes per second, tags={client-id=adminclient-1, node-id=node-1}]
Added sensor with name node-1.latency
Registered metric named MetricName [name=request-latency-avg, group=admin-client-node-metrics, description=, tags={client-id=adminclient-1, node-id=node-1}]
Registered metric named MetricName [name=request-latency-max, group=admin-client-node-metrics, description=, tags={client-id=adminclient-1, node-id=node-1}]
[AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1
[AdminClient clientId=adminclient-1] Completed connection to node 1. Fetching API versions.
[AdminClient clientId=adminclient-1] Initiating API versions fetch from node 1.
[AdminClient clientId=adminclient-1] No version information found when sending API_VERSIONS with correlation id 2 to node 1. Assuming version 3.
[AdminClient clientId=adminclient-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=adminclient-1, correlationId=2, headerVersion=2) and timeout 3600000 to node 1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.6.1')
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [] at 1706133998795
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: 1 rack: null). Must delay 9223372036854775807 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=29974)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [] at 1706133998796
[AdminClient clientId=adminclient-1] Client is not ready to send to xx.x.xxx:9092 (id: 1 rack: null). Must delay 9223372036854775807 ms
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=29973)
[AdminClient clientId=adminclient-1] Received API_VERSIONS response from node 1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=adminclient-1, correlationId=2, headerVersion=2): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, maxVersion=15), ApiVersion(apiKey=2, minVersion=0, maxVersion=8), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, minVersion=0, maxVersion=7), ApiVersion(apiKey=5, minVersion=0, maxVersion=4), ApiVersion(apiKey=6, minVersion=0, maxVersion=8), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=4), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=4), ApiVersion(apiKey=25, minVersion=0, maxVersion=3), ApiVersion(apiKey=26, minVersion=0, maxVersion=3), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=3), ApiVersion(apiKey=30, minVersion=0, maxVersion=3), ApiVersion(apiKey=31, minVersion=0, maxVersion=3), ApiVersion(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=4), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=3), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=3), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersion=0, maxVersion=0), ApiVersion(apiKey=56, minVersion=0, maxVersion=3), ApiVersion(apiKey=57, minVersion=0, maxVersion=1), ApiVersion(apiKey=58, minVersion=0, maxVersion=0), ApiVersion(apiKey=60, minVersion=0, maxVersion=0), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=0), ApiVersion(apiKey=67, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=0, finalizedFeatures=[], zkMigrationReady=false)
[AdminClient clientId=adminclient-1] Node 1 has finalized features epoch: 0, finalized features: [], supported features: [], ZK migration ready: false, API versions: (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 15 [usable: 15], ListOffsets(2): 0 to 8 [usable: 8], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): 0 to 7 [usable: 7], StopReplica(5): 0 to 4 [usable: 4], UpdateMetadata(6): 0 to 8 [usable: 8], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 8], FindCoordinator(10): 0 to 4 [usable: 4], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 4], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 4 [usable: 4], AddOffsetsToTxn(25): 0 to 3 [usable: 3], EndTxn(26): 0 to 3 [usable: 3], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 4 [usable: 4], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 3 [usable: 3], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 3 [usable: 3], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], DescribeQuorum(55): UNSUPPORTED, AlterPartition(56): 0 to 3 [usable: 3], UpdateFeatures(57): 0 to 1 [usable: 1], Envelope(58): 0 [usable: 0], DescribeCluster(60): 0 [usable: 0], DescribeProducers(61): 0 [usable: 0], UnregisterBroker(64): UNSUPPORTED, DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 [usable: 0], AllocateProducerIds(67): 0 [usable: 0], ConsumerGroupHeartbeat(68): UNSUPPORTED).
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [] at 1706133998819
[AdminClient clientId=adminclient-1] Sending MetadataRequestData(topics=null, allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to xx.x.xxx:9092 (id: 1 rack: null). correlationId=3, timeoutMs=29950
[AdminClient clientId=adminclient-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=adminclient-1, correlationId=3, headerVersion=2) and timeout 29950 to node 1: MetadataRequestData(topics=null, allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=59531)
[AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s)
[AdminClient clientId=adminclient-1] Trying to choose nodes for [] at 1706133998820
[AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=59530)

so any help on that or something i could discover more ?

Kafka `offset` in version 1.1.0 and `offsets` in version 2.3.0

While performing a rolling upgrading in kafka, from 1.1.0 to 2.3.0, I faced this particular issue where I could see that the consumer group path in zookeeper has been changed from offset to offsets which is causing the entire data reprocess from the beginning.

Is there a simple solution to resolve this?

Is there a way to hardcode the zookeeper consumer path?

Does KAFKA's custom quota callback provide a mechanism to enforce Quotas per KAFKA topic / client and to throw Exception instead of throttling.

Hi All,

I have a use case that I want to enforce the KAFKA Quotas per client but on per topic basis. The documentations say that the built-in kafka quotas cannot be applied on per topic basis, instead they will be applied on per broker basis only.

So we also have custom quota callback mechanism in place through which we can customize the Quota mechanism, as part of this KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management but I can't figure out 2 things:

  1. Right now, KAFKA throttles the client if it has reached the quota limit but my use case is not to throttle but to throw the exception for the produce request if the request is violating the quota. Can this be implemented using custom quota callback?
  2. Can custom quota callback be used to enforce quota on per topic basis?

I am unable to figure out answers to above 2 questions and any help would be really really appreciated. Thanks in advance.

cannot invoke consumerconnector.shutdown() from a thread different from the consuming thread

Hi,

I am using kafka 0.8 in an application where the client initiates kafka message consumption. The client does not have a handle to the consumerconnector instance created in the server application. I need a way for the client to initiate a shutdown call to the server, which then tries to look up the connector by consumergroup id and invoke shutdown on the consumerconnector. Is this possible with kafka 0.8 APIs?

Create topic replica on controller node. kafka 3.5.1 whith kraft

I have configuration 5 nodes, 4 broker+controller and 1 controller.
Create topic with replication factor 5, and it is created, and describe show that topic partition have 5 replicas.
/opt/kafka/latest/bin/kafka-topics.sh --create --bootstrap-server=dc1-prod-sep-kafka-001-vs:9092,dc2-prod-sep-kafka-001-vs:9092 --replication-factor 5 --partitions 1 --topic test5

/opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 --bootstrap-server=dc1-prod-sep-kafka-001-vs:9092
Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 ReplicationFactor: 5 Configs: segment.bytes=1073741824
Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2

Replicas 5 but ISR 4.
Why does kafka initially allow you to create a replica on the controller node, although in reality the replica is not created on the controller node and there are no topic files in the log directory.

Is this expected behavior or not? Thanks.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.