Support Questions

Find answers, ask questions, and share your expertise

Kafka: Cannot Produce Messages

avatar
Expert Contributor

Using Hortonworks Sandbox 2.3.2

Running Listener Port on 9092 (see last statement in below log file of kafka-server-start.sh

----

[2016-02-28 16:20:33,691] INFO Client environment:java.class.path=:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar:/usr/lib/ambari-metrics-kafka-sink/lib/commons-codec-1.2.jar:/usr/lib/ambari-metrics-kafka-sink/lib/jackson-xc-1.9.13.jar:/usr/lib/ambari-metrics-kafka-sink/lib/commons-httpclient-3.1.jar:/usr/lib/ambari-metrics-kafka-sink/lib/jackson-mapper-asl-1.9.13.jar:/usr/lib/ambari-metrics-kafka-sink/lib/commons-lang-2.6.jar:/usr/lib/ambari-metrics-kafka-sink/lib/jackson-core-asl-1.9.13.jar:/usr/lib/ambari-metrics-kafka-sink/lib/commons-logging-1.1.1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../core/build/dependant-libs-2.10.5*/*.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../examples/build/libs//kafka-examples*.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../clients/build/libs/kafka-clients*.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/bcprov-jdk16-1.46.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/commons-collections-3.2.1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/commons-configuration-1.10.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/commons-io-2.4.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/commons-lang-2.6.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/commons-logging-1.2.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/eclipselink-2.5.2-M1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/gson-2.2.4.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/guava-11.0.2.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/hadoop-auth-2.7.1.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/hadoop-common-2.7.1.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/httpclient-4.2.5.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/httpcore-4.2.5.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/httpmime-4.2.5.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jackson-core-asl-1.9.13.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jackson-jaxrs-1.8.1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jackson-jaxrs-1.8.3.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jackson-mapper-asl-1.9.13.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jackson-xc-1.8.3.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/javax.persistence-2.1.0.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jersey-bundle-1.17.1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jopt-simple-3.2.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka_2.10-0.8.2.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka_2.10-0.8.2.2.3.2.0-2950-javadoc.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka_2.10-0.8.2.2.3.2.0-2950-scaladoc.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka_2.10-0.8.2.2.3.2.0-2950-sources.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka_2.10-0.8.2.2.3.2.0-2950-test.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka-clients-0.8.2.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka-ganglia-0.8.2.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka-ganglia-0.8.2.2.3.2.0-2950-javadoc.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/log4j-1.2.16.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/lz4-1.2.0.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/metrics-ganglia-2.2.0.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/noggit-0.6.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ojdbc6.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ranger-kafka-plugin-0.5.0.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ranger-plugins-audit-0.5.0.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ranger-plugins-common-0.5.0.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ranger-plugins-cred-0.5.0.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ranger_solrj-0.5.0.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/scala-library-2.10.4.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/slf4j-api-1.7.6.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/slf4j-log4j12-1.6.1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/snappy-java-1.1.1.6.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/zkclient-0.5-1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/zookeeper-3.4.6.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/zookeeper.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../core/build/libs/kafka_2.10*.jar (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:os.version=2.6.32-573.7.1.el6.x86_64 (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:user.dir=/usr/hdp/2.3.2.0-2950/kafka (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,692] INFO Initiating client connection, connectString=sandbox.hortonworks.com:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@44a8253a (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,707] INFO Opening socket connection to server sandbox.hortonworks.com/192.168.0.109:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)

[2016-02-28 16:20:33,713] INFO Socket connection established to sandbox.hortonworks.com/192.168.0.109:2181, initiating session (org.apache.zookeeper.ClientCnxn)

[2016-02-28 16:20:33,719] INFO Session establishment complete on server sandbox.hortonworks.com/192.168.0.109:2181, sessionid = 0x153280fd58f002c, negotiated timeout = 30000 (org.apache.zookeeper.ClientCnxn)

[2016-02-28 16:20:33,720] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)

[2016-02-28 16:20:33,942] INFO Loading logs. (kafka.log.LogManager)

[2016-02-28 16:20:33,977] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log)

[2016-02-28 16:20:33,986] INFO Completed load of log test1-0 with log end offset 0 (kafka.log.Log)

[2016-02-28 16:20:33,989] INFO Logs loading complete. (kafka.log.LogManager)

[2016-02-28 16:20:33,989] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)

[2016-02-28 16:20:33,991] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)

[2016-02-28 16:20:34,019] INFO Awaiting socket connections on sandbox.hortonworks.com:9092. (kafka.network.Acceptor)----

Created Topic test successfully

Whenever I try to push messages to the Kafka topic through e.g. console-producer

"kafka-console-producer.sh --broker-list localhost:9092 --topic test"

I get the following error messages:

-------

[root@sandbox kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

t

[2016-02-28 16:32:17,945] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed (kafka.client.ClientUtils$)

java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)

at kafka.producer.SyncProducer.send(SyncProducer.scala:115)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)

at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)

at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)

at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)

at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)

at scala.collection.immutable.Stream.foreach(Stream.scala:547)

at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

[2016-02-28 16:32:17,951] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed (kafka.utils.CoreUtils$)

kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)

at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)

at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)

at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)

at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)

at scala.collection.immutable.Stream.foreach(Stream.scala:547)

at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Caused by: java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)

at kafka.producer.SyncProducer.send(SyncProducer.scala:115)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)

... 12 more

[2016-02-28 16:32:17,955] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed (kafka.client.ClientUtils$)

java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)

at kafka.producer.SyncProducer.send(SyncProducer.scala:115)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)

at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)

at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188)

at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152)

at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151)

at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)

at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)

at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)

at scala.collection.immutable.Stream.foreach(Stream.scala:547)

at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

[2016-02-28 16:32:17,961] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed (kafka.producer.async.DefaultEventHandler)

[2016-02-28 16:32:18,063] WARN Fetching topic metadata with correlation id 2 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed (kafka.client.ClientUtils$)

java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)

at kafka.producer.SyncProducer.send(SyncProducer.scala:115)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)

at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)

at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)

at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)

at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)

at scala.collection.immutable.Stream.foreach(Stream.scala:547)

at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

[2016-02-28 16:32:18,064] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed (kafka.utils.CoreUtils$)

kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)

at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)

at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)

at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)

at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)

at scala.collection.immutable.Stream.foreach(Stream.scala:547)

at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Caused by: java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)

at kafka.producer.SyncProducer.send(SyncProducer.scala:115)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)

... 12 more

------------

Any Idea on what I am doing wrong?

Thanks,

Rainer

1 ACCEPTED SOLUTION

avatar
Master Mentor
@Rainer Geissendoerfer

Try this

./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic test

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

View solution in original post

4 REPLIES 4

avatar
Master Mentor
@Rainer Geissendoerfer

Try this

./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic test

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

avatar
Master Mentor

@Rainer Geissendoerfer

If not then

./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:9092 --topic test

avatar
Expert Contributor

Works, thanks ... thought I tried it already ... 🙂

avatar
Master Mentor

@Rainer Geissendoerfer Please accept the answer to close the thread