Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Unable to produce message in kafka ?

avatar
Explorer

Running

At producer:

./kafka-console-producer.sh --broker-list localhost:9092 --topic girishtp

or

./kafka-console-producer.sh --broker-list localhost:6667 --topic girishtp

At consumer:

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic girishtp --from-beginning --consumer.config /usr/hdp/2.4.3.0-227/kafka/config/consumer.properties --delete-consumer-offsets

Note:

Modified [ listeners : PLAINTEXT://xxx.domain:6667 ]

localhost -> xxx.domain

capture.png

[2018-03-21 13:51:52,062] WARN Fetching topic metadata with correlation id 8 for topics [Set(girishtp)] from broker [BrokerEndPoint(0,localhost,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:76) at kafka.producer.SyncProducer.send(SyncProducer.scala:121) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2018-03-21 13:51:52,063] ERROR fetching topic metadata for topics [Set(girishtp)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,6667))] failed (kafka.utils.CoreUtils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(girishtp)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,6667))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:76) at kafka.producer.SyncProducer.send(SyncProducer.scala:121) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 12 more [2018-03-21 13:51:52,065] ERROR Failed to send requests for topics girishtp with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) [2018-03-21 13:51:52,065] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

1 ACCEPTED SOLUTION

avatar
@Girish Mallula

Instead of localhost, please use the hostname provided in listener.

./kafka-console-producer.sh --broker-list xxx.domain:6667 --topic girishtp

View solution in original post

3 REPLIES 3

avatar
@Girish Mallula

Instead of localhost, please use the hostname provided in listener.

./kafka-console-producer.sh --broker-list xxx.domain:6667 --topic girishtp

avatar
Explorer

Thanks @Sandeep Nemuri It worked!!

avatar
Master Mentor

@Girish Mallula

Below is the sequences to test Kafka please try to open a console for each command I am assuming your localhost value in bold [ listeners : PLAINTEXT://xxx.domain:6667 ] should be the output of FQDN

$ hostname -f or the host's IP

Step 1: Start the zookeeper server

Start a ZooKeeper server that's packaged with Kafka

bin/zookeeper-server-start.sh config/zookeeper.properties 

Step 2: Start the kafka broker server

bin/kafka-server-start.sh config/server.properties 

Step 3: Create a topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic girishtp List the topics bin/kafka-topics.sh --list --zookeeper localhost:2181 

output

__consumer_offsets 
girishtp 

Step 4: Send some messages

By default, each line will be sent as a separate message. Run the producer and then type a few messages into the console to send to the server.

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic girishtp 

{type some random message here }

Step 5: Start a consumer

Kafka consumer will dump out messages to standard output(console).

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic girishtp--from-beginning 

{you will see the messages typed in step 4 }

Step 6 Delete a topic

# bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --delete --topic girishtp 

Output

Topic girishtp is marked for deletion.