Created 06-17-2016 03:40 PM
I am using the below commands to start the producer and consumers. They work fine when done from the server running kafka broker (am1dlccmrhde01) but when I start the producer from another linux server, it gives the mentioned error.
Commands used:
./kafka-console-producer.sh --broker-list am1dlccmrhde01:6667 --topic shashangTest
./kafka-console-consumer.sh --zookeeper am1dlccmrhde01:2181 --topic shashangTest -from-beginning
Error produced when starting the producer from a remote server (tlccmrhds01):
[kafka@tlccmrhds01 bin]$ ./kafka-console-producer.sh --broker-list am1dlccmrhde01:6667 --topic shashangTest
Hello
[2016-06-17 09:35:06,781] WARN Fetching topic metadata with correlation id 0 for topics [Set(shashangTest)] from broker [BrokerEndPoint(0,am1dlccmrhde01,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2016-06-17 09:35:06,790] ERROR fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed (kafka.utils.CoreUtils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 12 more [2016-06-17 09:35:06,798] WARN Fetching topic metadata with correlation id 1 for topics [Set(shashangTest)] from broker [BrokerEndPoint(0,am1dlccmrhde01,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$getPartitionListForTopic(DefaultEventHandler.scala:188) at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152) at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2016-06-17 09:35:06,804] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed (kafka.producer.async.DefaultEventHandler) [2016-06-17 09:35:06,911] WARN Fetching topic metadata with correlation id 2 for topics [Set(shashangTest)] from broker [BrokerEndPoint(0,am1dlccmrhde01,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2016-06-17 09:35:06,913] ERROR fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed (kafka.utils.CoreUtils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 12 more
Created 06-20-2016 06:07 PM
Issue resolved. The kafka broker service was running on internal ip address of the server whereas the systems initiating the producer process was trying to connect to the external ip address of the Kafka Broker server. Restarted Kafka Broker service after modifying the "listener" property with the correct ip address.
Created 06-17-2016 05:03 PM
The broker tells the client which hostname should be used to produce/consume messages. By default Kafka uses the hostname of the system it runs on. If this hostname can not be resolved by the client side you get this exception.
You can try setting advertised.host.name in the Kafka configuration to an hostname/address which the clients should use.
Created 06-20-2016 10:12 AM
@Kuldeep,
I wasn't able to find this property in Ambari. Can you tell me where can I find it?
Created 06-20-2016 06:07 PM
Issue resolved. The kafka broker service was running on internal ip address of the server whereas the systems initiating the producer process was trying to connect to the external ip address of the Kafka Broker server. Restarted Kafka Broker service after modifying the "listener" property with the correct ip address.