Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Unable to produce or consum in kafka

Solved Go to solution
Highlighted

Unable to produce or consum in kafka

Explorer

Hi all,

After "sucessfully" install kafka on a host with HDP 2.5.3, started the service from ambari, we are not able to produce or consum topics.

We can create and describe a topic:

-bash-4.2$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper zoo.host.com:2181 --create --topic newmikl --partition 1 --replication-factor 1
Created topic "newmikl".


-bash-4.2$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper zoo.host.com:2181 --describe --topic newmikl                  Topic:newmikl   PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: newmikl  Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

Unfortunately in the logs.dir we can't see the tpoic files:

-bash-4.2$ ls -l /hadoop/kafka/kafka-logs/
total 4
-rwxrwxrwx 1 kafkauser kafkagroup  0 Mar 17 09:45 cleaner-offset-checkpoint
-rwxrwxrwx 1 kafkauser kafkagroup 57 Mar 17 09:45 meta.properties
-rwxrwxrwx 1 kafkauser kafkagroup  0 Mar 17 09:45 recovery-point-offset-checkpoint
-rwxrwxrwx 1 kafkauser kafkagroup  0 Mar 17 09:45 replication-offset-checkpoint

I case we want to produce ad consum messages, we're facing an error:

-bash-4.2$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list kafka.host.com:6667 --topic newmikl --security-protocol SASL_PLAINTEXT
test
test1
test2
test3
^C[2017-03-17 11:14:24,025] WARN TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin)




-bash-4.2$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper zoo.host.com:2181  --topic newmikl --security-protocol SASL_PLAINTEXT --from-beginning
{metadata.broker.list=kafka.host.com:6667, request.timeout.ms=30000, client.id=console-consumer-56271, security.protocol=SASL_PLAINTEXT}
[2017-03-17 11:14:35,594] WARN Fetching topic metadata with correlation id 0 for topics [Set(newmikl)] from broker [BrokerEndPoint(1001,kafka.host.com,6667)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:122)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:81)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:126)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:96)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2017-03-17 11:14:35,597] WARN [console-consumer-56271_slbifregkhn01.fr.intranet-1489745675236-1a4e34e3-leader-finder-thread], Failed to find leader for Set([newmikl,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(newmikl)] from broker [ArrayBuffer(BrokerEndPoint(1001,kafka.host.com,6667))] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:96)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:122)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:81)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:126)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        ... 3 more
{metadata.broker.list=kafka.host.com:6667, request.timeout.ms=30000, client.id=console-consumer-56271, security.protocol=SASL_PLAINTEXT}
[...]
etc...

Looking kafka logs, we can see errors since the start of kafka:

# tail -f /var/log/kafka/controller.log
java.io.IOException: Connection to kafka.host.com:6667 (id: 1003 rack: null) failed
        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
        at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
        at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
        at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
        at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:233)
        at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:182)
        at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:181)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2017-03-18 15:15:29,340] WARN [Controller-1003-to-broker-1003-send-thread], Controller 1003's connection to broker kafka.host.com:6667 (id: 1003 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to kafka.host.com:6667 (id: 1003 rack: null) failed
        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
        at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
        at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
        at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
        at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:233)
        at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:182)
        at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:181)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)


Anyone have an idea about what can be wrong in our configuration?

Thanks.

1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: Unable to produce or consum in kafka

Explorer

Hi guys,

Today we finally find what's wrong. Our kafka keytab have (like others keytabs in our clusters) a key encrypted in aes256-cts-hmac-sha1-96. This keytab works as expected to authenticate in zookeeper but is problematic for communicate with broker.

Replacing this keytab by one with arcfour-hmac key encryption resolve our issue. For security reasons this solution is not acceptable so we will make some other tests in the next days. If I have some update related to this issue I'll post here our finds.

Thanks @schandhok and @Jay SenSharma

View solution in original post

8 REPLIES 8
Highlighted

Re: Unable to produce or consum in kafka

Super Mentor

@Micaël Dias

Are you able to access the port where you are getting the "connection to broker kafka.host.com:6667 (id: 1003 rack: null) was unsuccessful" error "Failed to find leader for Set([newmikl,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)" ?

telnet  kafka.host.com 6667 

.

On the kafka broker host please verify if the port is opened.

Example:

# netstat -tnlpa | grep 6667
tcp        0      0 ::ffff:172.26.70.154:6667   :::*                        LISTEN      15573/java          
tcp        0      0 ::ffff:172.26.70.154:6667   ::ffff:172.26.70.154:48623  ESTABLISHED 15573/java          
tcp        0      0 ::ffff:172.26.70.154:48623  ::ffff:172.26.70.154:6667   ESTABLISHED 15573/java         

.

Highlighted

Re: Unable to produce or consum in kafka

Explorer

@Jay SenSharma

Thanks for your reply. Yes the port is in LISTEN state:

# nc -v kafka.host.com 6667
Ncat: Version 6.40 ( http://nmap.org/ncat )
Ncat: Connected to 10.194.231.5:6667.
^C


# lsof -i :6667
COMMAND     PID             USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
java    2289859 kafkauser  141u  IPv4 675950500      0t0  TCP kafka.host.com:ircu-3 (LISTEN)


# pstree -anlp 2289859
java,2289859 -Xmx4G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/var/log/kafka/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/log/kafka -Dlog4j.configuration=file:/usr/hdp/2.5.3.0-37/kafka/bin/../config/log4j.properties -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_jaas.conf -cp :/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar:/usr/lib/ambari-metrics-kafka-sink/lib/*:/export/home/kafkauser:/etc/kafka/conf:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/etc/hadoop/conf:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar:/usr/lib/ambari-metrics-kafka-sink/lib/*:/usr/hdp/current/kafka-broker/bin:/etc/kafka/conf:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/etc/hadoop/conf:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar:/usr/lib/ambari-metrics-kafka-sink/lib/*:/usr/hdp/2.5.3.0-37/kafka/bin:/etc/kafka/conf:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/etc/hadoop/conf:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/aopalliance-repackaged-2.4.0-b34.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/argparse4j-0.5.0.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/connect-api-0.10.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/connect-file-0.10.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/connect-json-0.10.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/connect-runtime-0.10.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/guava-18.0.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/hk2-api-2.4.0-b34.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/hk2-locator-2.4.0-b34.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/hk2-utils-2.4.0-b34.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jackson-annotations-2.6.0.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jackson-core-2.6.3.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jackson-databind-2.6.3.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jackson-jaxrs-base-2.6.3.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jackson-jaxrs-json-provider-2.6.3.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jackson-module-jaxb-annotations-2.6.3.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/javassist-3.18.2-GA.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/javax.annotation-api-1.2.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/javax.inject-1.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/javax.inject-2.4.0-b34.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/javax.ws.rs-api-2.0.1.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jersey-client-2.22.2.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jersey-common-2.22.2.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jersey-container-servlet-2.22.2.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jersey-container-servlet-core-2.22.2.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jersey-guava-2.22.2.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jersey-media-jaxb-2.22.2.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jersey-server-2.22.2.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jetty-http-9.2.15.v20160210.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jetty-io-9.2.15.v20160210.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jetty-security-9.2.15.v20160210.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jetty-server-9.2.15.v20160210.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jetty-servlet-9.2.15.v20160210.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jetty-servlets-9.2.15.v20160210.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jetty-util-9.2.15.v20160210.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/jopt-simple-4.9.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/kafka_2.10-0.10.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/kafka-clients-0.10.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/kafka-ganglia-0.10.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/kafka-log4j-appender-0.10.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/kafka-streams-0.10.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/kafka-streams-examples-0.10.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/kafka-tools-0.10.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/log4j-1.2.17.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/lz4-1.3.0.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/metrics-ganglia-2.2.0.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/ojdbc6.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/osgi-resource-locator-1.0.1.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/ranger-kafka-plugin-impl:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/ranger-kafka-plugin-shim-0.6.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/ranger-plugin-classloader-0.6.0.2.5.3.0-37.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/reflections-0.9.10.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/rocksdbjni-4.8.0.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/scala-library-2.10.4.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/slf4j-api-1.7.21.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/slf4j-log4j12-1.7.21.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/snappy-java-1.1.2.6.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/validation-api-1.1.0.Final.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/zkclient-0.8.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/zookeeper-3.4.6.jar:/usr/hdp/2.5.3.0-37/kafka/bin/../libs/zookeeper.jar kafka.Kafka /usr/hdp/2.5.3.0-37/kafka/config/server.properties
  ââ{java},2290073
  ââ{java},2290074
  ââ{java},2290075



Re: Unable to produce or consum in kafka

Contributor

@Micaël Dias

Error messages in Controller.log appear to be from a different timestamp as compared to the timestamp of your producers and consumers. Even if we ignore the mismatched timestamp, I see

Controller1003's connection to broker kafka.host.com:6667 (id: 1003 rack: null) was unsuccessful

so we should check server.log for the same timestamp. Please perform following steps, which might help in debugging the issue.

Also, if you do not see any directories created for the topic and partition, that might be an issue.

- Restart brokers.

- Keep monitoring server.log and controller.log for any error messages.

- Create a topic, while monitoring the logs and see if the topic and partitions are created successfully. Additionally check state-change.log

- If there are no errors, try producing and consuming now while monitoring the logs.

Highlighted

Re: Unable to produce or consum in kafka

Explorer

Hi guys,

Today we finally find what's wrong. Our kafka keytab have (like others keytabs in our clusters) a key encrypted in aes256-cts-hmac-sha1-96. This keytab works as expected to authenticate in zookeeper but is problematic for communicate with broker.

Replacing this keytab by one with arcfour-hmac key encryption resolve our issue. For security reasons this solution is not acceptable so we will make some other tests in the next days. If I have some update related to this issue I'll post here our finds.

Thanks @schandhok and @Jay SenSharma

View solution in original post

Highlighted

Re: Unable to produce or consum in kafka

New Contributor

@Micaël Dias

The same problem occurred to me, it can create topic sucessfully but failed to produce:

[root@server30 kafka]# bin/kafka-topics.sh --create --zookeeper server30:2181 --topic S1_MME --replication-factor 1 --partition 2
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "S1_MME".
[root@server30 kafka]# bin/kafka-console-producer.sh --broker-list server30:9096 --topic S1_MME  --producer.config config/producer.properties 
adsa
wqdada
xasa
[2017-08-26 01:38:43,346] ERROR Error when sending message to topic S1_MME with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

Then I follow your steps, relace old keytab by new one which create with arcfour-hmac encryption, and it changes nothing.

The zk authed kafka broker successful:

2017-08-26 01:34:07,215 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:SaslServerCallbackHandler@118] - Successfully authenticated client: authenticationID=kafka/server30@KRBKDC;  authorizationID=kafka/server30@KRBKDC.
2017-08-26 01:34:07,215 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:SaslServerCallbackHandler@134] - Setting authorizedID: kafka
2017-08-26 01:34:07,215 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@964] - adding SASL authorization for authorizationID: kafka

But the kafka controller still can't connect broker:

[2017-08-26 01:51:32,962] WARN [Controller-0-to-broker-0-send-thread], Controller 0's connection to broker localhost:9096 (id: 0 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to localhost:9096 (id: 0 rack: null) failed
	at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:83)
	at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:93)
	at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:230)
	at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:182)
	at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:181)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

Do you have any new find about the issue?

I run my zk and kafka on one host, single broker, centos 7

Highlighted

Re: Unable to produce or consum in kafka

Mentor

@Micaël Dias

I think you might need to update kafka-env.sh with additional configuration.

Go to Ambari UI--->Kafka--->config--->Advanced Kafka-env--->kafka-env template

Add the below line under

# The java implementation to use.

export KAFKA_KERBEROS_PARAMS="-Djava.security.auth.login.config=/etc/kafka/conf/kafka_jaas.conf" 

Save the changes and restart the Kafka service and any stale configurations.

Keep me posted.

Highlighted

Re: Unable to produce or consum in kafka

New Contributor

@Geoffrey Shelton Okot

After i set $KAFKA_KERBEROS_PARAMS in my kafka-env.sh file then source kafka-env from kafka-server-start.sh, my problem solved, it seems the param KAFKA_KERBEROS_PARAMS is set to be used by controller.

Thanks Geoffrey Shelton Okot.

Highlighted

Re: Unable to produce or consum in kafka

Mentor

@yu clyonce

Nice to know my solution helped. Maybe you have accept the answer and hence reward me.

Don't have an account?
Coming from Hortonworks? Activate your account here