Created 03-22-2018 04:19 AM
Running
At producer:
./kafka-console-producer.sh --broker-list localhost:9092 --topic girishtp
or
./kafka-console-producer.sh --broker-list localhost:6667 --topic girishtp
At consumer:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic girishtp --from-beginning --consumer.config /usr/hdp/2.4.3.0-227/kafka/config/consumer.properties --delete-consumer-offsets
Note:
Modified [ listeners : PLAINTEXT://xxx.domain:6667 ]
localhost -> xxx.domain
[2018-03-21 13:51:52,062] WARN Fetching topic metadata with correlation id 8 for topics [Set(girishtp)] from broker [BrokerEndPoint(0,localhost,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:76) at kafka.producer.SyncProducer.send(SyncProducer.scala:121) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2018-03-21 13:51:52,063] ERROR fetching topic metadata for topics [Set(girishtp)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,6667))] failed (kafka.utils.CoreUtils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(girishtp)] from broker [ArrayBuffer(BrokerEndPoint(0,localhost,6667))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:76) at kafka.producer.SyncProducer.send(SyncProducer.scala:121) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 12 more [2018-03-21 13:51:52,065] ERROR Failed to send requests for topics girishtp with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) [2018-03-21 13:51:52,065] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Created 03-22-2018 07:48 AM
Instead of localhost, please use the hostname provided in listener.
./kafka-console-producer.sh --broker-list xxx.domain:6667 --topic girishtp
Created 03-22-2018 07:48 AM
Instead of localhost, please use the hostname provided in listener.
./kafka-console-producer.sh --broker-list xxx.domain:6667 --topic girishtp
Created 03-23-2018 06:18 AM
Thanks @Sandeep Nemuri It worked!!
Created 03-22-2018 09:17 AM
Below is the sequences to test Kafka please try to open a console for each command I am assuming your localhost value in bold [ listeners : PLAINTEXT://xxx.domain:6667 ] should be the output of FQDN
$ hostname -f or the host's IP
Step 1: Start the zookeeper server
Start a ZooKeeper server that's packaged with Kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Step 2: Start the kafka broker server
bin/kafka-server-start.sh config/server.properties
Step 3: Create a topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic girishtp List the topics bin/kafka-topics.sh --list --zookeeper localhost:2181
output
__consumer_offsets girishtp
Step 4: Send some messages
By default, each line will be sent as a separate message. Run the producer and then type a few messages into the console to send to the server.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic girishtp
{type some random message here }
Step 5: Start a consumer
Kafka consumer will dump out messages to standard output(console).
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic girishtp--from-beginning
{you will see the messages typed in step 4 }
Step 6 Delete a topic
# bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --delete --topic girishtp
Output
Topic girishtp is marked for deletion.