Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Unable to produce message in kafka ?

avatar
Explorer

Running

At producer:

./kafka-console-producer.sh --broker-list localhost:9092 --topic girishtp

or

./kafka-console-producer.sh --broker-list localhost:6667 --topic girishtp

At consumer:

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic girishtp --from-beginning --consumer.config /usr/hdp/2.4.3.0-227/kafka/config/consumer.properties --delete-consumer-offsets

Note:

Modified [ listeners : PLAINTEXT://xxx.domain:6667 ]

localhost -> xxx.domain

capture.png

[2018-03-21 13:51:52,062] WARN Fetching topic metadata with correlation id 8 for topics [Set(girishtp)] from broker [BrokerEndPoint(0,localhost,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:76) at kafka.producer.SyncProducer.send(SyncProducer.scala:121) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2018-03-21 13:51:52,063] ERROR fetching topic metadata for topics [Set(girishtp)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,6667))] failed (kafka.utils.CoreUtils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(girishtp)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,6667))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:76) at kafka.producer.SyncProducer.send(SyncProducer.scala:121) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 12 more [2018-03-21 13:51:52,065] ERROR Failed to send requests for topics girishtp with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) [2018-03-21 13:51:52,065] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

1 ACCEPTED SOLUTION

avatar
@Girish Mallula

Instead of localhost, please use the hostname provided in listener.

./kafka-console-producer.sh --broker-list xxx.domain:6667 --topic girishtp

View solution in original post

3 REPLIES 3

avatar
@Girish Mallula

Instead of localhost, please use the hostname provided in listener.

./kafka-console-producer.sh --broker-list xxx.domain:6667 --topic girishtp

avatar
Explorer

Thanks @Sandeep Nemuri It worked!!

avatar
Master Mentor

@Girish Mallula

Below is the sequences to test Kafka please try to open a console for each command I am assuming your localhost value in bold [ listeners : PLAINTEXT://xxx.domain:6667 ] should be the output of FQDN

$ hostname -f or the host's IP

Step 1: Start the zookeeper server

Start a ZooKeeper server that's packaged with Kafka

bin/zookeeper-server-start.sh config/zookeeper.properties 

Step 2: Start the kafka broker server

bin/kafka-server-start.sh config/server.properties 

Step 3: Create a topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic girishtp List the topics bin/kafka-topics.sh --list --zookeeper localhost:2181 

output

__consumer_offsets 
girishtp 

Step 4: Send some messages

By default, each line will be sent as a separate message. Run the producer and then type a few messages into the console to send to the server.

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic girishtp 

{type some random message here }

Step 5: Start a consumer

Kafka consumer will dump out messages to standard output(console).

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic girishtp--from-beginning 

{you will see the messages typed in step 4 }

Step 6 Delete a topic

# bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --delete --topic girishtp 

Output

Topic girishtp is marked for deletion.