Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Kafka: Cannot Produce Messages

avatar
Expert Contributor

Using Hortonworks Sandbox 2.3.2

Running Listener Port on 9092 (see last statement in below log file of kafka-server-start.sh

----

[2016-02-28 16:20:33,691] INFO Client environment:java.class.path=:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar:/usr/lib/ambari-metrics-kafka-sink/lib/commons-codec-1.2.jar:/usr/lib/ambari-metrics-kafka-sink/lib/jackson-xc-1.9.13.jar:/usr/lib/ambari-metrics-kafka-sink/lib/commons-httpclient-3.1.jar:/usr/lib/ambari-metrics-kafka-sink/lib/jackson-mapper-asl-1.9.13.jar:/usr/lib/ambari-metrics-kafka-sink/lib/commons-lang-2.6.jar:/usr/lib/ambari-metrics-kafka-sink/lib/jackson-core-asl-1.9.13.jar:/usr/lib/ambari-metrics-kafka-sink/lib/commons-logging-1.1.1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../core/build/dependant-libs-2.10.5*/*.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../examples/build/libs//kafka-examples*.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../clients/build/libs/kafka-clients*.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/bcprov-jdk16-1.46.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/commons-collections-3.2.1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/commons-configuration-1.10.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/commons-io-2.4.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/commons-lang-2.6.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/commons-logging-1.2.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/eclipselink-2.5.2-M1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/gson-2.2.4.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/guava-11.0.2.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/hadoop-auth-2.7.1.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/hadoop-common-2.7.1.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/httpclient-4.2.5.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/httpcore-4.2.5.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/httpmime-4.2.5.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jackson-core-asl-1.9.13.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jackson-jaxrs-1.8.1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jackson-jaxrs-1.8.3.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jackson-mapper-asl-1.9.13.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jackson-xc-1.8.3.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/javax.persistence-2.1.0.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jersey-bundle-1.17.1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/jopt-simple-3.2.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka_2.10-0.8.2.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka_2.10-0.8.2.2.3.2.0-2950-javadoc.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka_2.10-0.8.2.2.3.2.0-2950-scaladoc.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka_2.10-0.8.2.2.3.2.0-2950-sources.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka_2.10-0.8.2.2.3.2.0-2950-test.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka-clients-0.8.2.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka-ganglia-0.8.2.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/kafka-ganglia-0.8.2.2.3.2.0-2950-javadoc.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/log4j-1.2.16.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/lz4-1.2.0.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/metrics-ganglia-2.2.0.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/noggit-0.6.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ojdbc6.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ranger-kafka-plugin-0.5.0.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ranger-plugins-audit-0.5.0.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ranger-plugins-common-0.5.0.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ranger-plugins-cred-0.5.0.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/ranger_solrj-0.5.0.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/scala-library-2.10.4.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/slf4j-api-1.7.6.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/slf4j-log4j12-1.6.1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/snappy-java-1.1.1.6.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/zkclient-0.5-1.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/zookeeper-3.4.6.2.3.2.0-2950.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../libs/zookeeper.jar:/usr/hdp/2.3.2.0-2950/kafka/bin/../core/build/libs/kafka_2.10*.jar (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:os.version=2.6.32-573.7.1.el6.x86_64 (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,691] INFO Client environment:user.dir=/usr/hdp/2.3.2.0-2950/kafka (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,692] INFO Initiating client connection, connectString=sandbox.hortonworks.com:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@44a8253a (org.apache.zookeeper.ZooKeeper)

[2016-02-28 16:20:33,707] INFO Opening socket connection to server sandbox.hortonworks.com/192.168.0.109:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)

[2016-02-28 16:20:33,713] INFO Socket connection established to sandbox.hortonworks.com/192.168.0.109:2181, initiating session (org.apache.zookeeper.ClientCnxn)

[2016-02-28 16:20:33,719] INFO Session establishment complete on server sandbox.hortonworks.com/192.168.0.109:2181, sessionid = 0x153280fd58f002c, negotiated timeout = 30000 (org.apache.zookeeper.ClientCnxn)

[2016-02-28 16:20:33,720] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)

[2016-02-28 16:20:33,942] INFO Loading logs. (kafka.log.LogManager)

[2016-02-28 16:20:33,977] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log)

[2016-02-28 16:20:33,986] INFO Completed load of log test1-0 with log end offset 0 (kafka.log.Log)

[2016-02-28 16:20:33,989] INFO Logs loading complete. (kafka.log.LogManager)

[2016-02-28 16:20:33,989] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)

[2016-02-28 16:20:33,991] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)

[2016-02-28 16:20:34,019] INFO Awaiting socket connections on sandbox.hortonworks.com:9092. (kafka.network.Acceptor)----

Created Topic test successfully

Whenever I try to push messages to the Kafka topic through e.g. console-producer

"kafka-console-producer.sh --broker-list localhost:9092 --topic test"

I get the following error messages:

-------

[root@sandbox kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

t

[2016-02-28 16:32:17,945] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed (kafka.client.ClientUtils$)

java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)

at kafka.producer.SyncProducer.send(SyncProducer.scala:115)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)

at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)

at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)

at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)

at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)

at scala.collection.immutable.Stream.foreach(Stream.scala:547)

at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

[2016-02-28 16:32:17,951] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed (kafka.utils.CoreUtils$)

kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)

at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)

at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)

at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)

at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)

at scala.collection.immutable.Stream.foreach(Stream.scala:547)

at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Caused by: java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)

at kafka.producer.SyncProducer.send(SyncProducer.scala:115)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)

... 12 more

[2016-02-28 16:32:17,955] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed (kafka.client.ClientUtils$)

java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)

at kafka.producer.SyncProducer.send(SyncProducer.scala:115)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)

at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)

at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188)

at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152)

at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151)

at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)

at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)

at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)

at scala.collection.immutable.Stream.foreach(Stream.scala:547)

at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

[2016-02-28 16:32:17,961] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed (kafka.producer.async.DefaultEventHandler)

[2016-02-28 16:32:18,063] WARN Fetching topic metadata with correlation id 2 for topics [Set(test)] from broker [BrokerEndPoint(0,localhost,9092)] failed (kafka.client.ClientUtils$)

java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)

at kafka.producer.SyncProducer.send(SyncProducer.scala:115)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)

at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)

at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)

at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)

at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)

at scala.collection.immutable.Stream.foreach(Stream.scala:547)

at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

[2016-02-28 16:32:18,064] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed (kafka.utils.CoreUtils$)

kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,9092))] failed

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)

at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)

at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)

at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)

at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)

at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)

at scala.collection.immutable.Stream.foreach(Stream.scala:547)

at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Caused by: java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)

at kafka.producer.SyncProducer.send(SyncProducer.scala:115)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)

... 12 more

------------

Any Idea on what I am doing wrong?

Thanks,

Rainer

1 ACCEPTED SOLUTION

avatar
Master Mentor
@Rainer Geissendoerfer

Try this

./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic test

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

View solution in original post

4 REPLIES 4

avatar
Master Mentor
@Rainer Geissendoerfer

Try this

./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic test

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

avatar
Master Mentor

@Rainer Geissendoerfer

If not then

./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:9092 --topic test

avatar
Expert Contributor

Works, thanks ... thought I tried it already ... 🙂

avatar
Master Mentor

@Rainer Geissendoerfer Please accept the answer to close the thread