Support Questions

Find answers, ask questions, and share your expertise

Kafka producer giving error when running from a different host then the one running Kafka broker.

avatar
New Contributor

I am using the below commands to start the producer and consumers. They work fine when done from the server running kafka broker (am1dlccmrhde01) but when I start the producer from another linux server, it gives the mentioned error.

Commands used:

./kafka-console-producer.sh --broker-list am1dlccmrhde01:6667 --topic shashangTest

./kafka-console-consumer.sh --zookeeper am1dlccmrhde01:2181 --topic shashangTest -from-beginning

Error produced when starting the producer from a remote server (tlccmrhds01):

[kafka@tlccmrhds01 bin]$ ./kafka-console-producer.sh --broker-list am1dlccmrhde01:6667 --topic shashangTest

Hello

[2016-06-17 09:35:06,781] WARN Fetching topic metadata with correlation id 0 for topics [Set(shashangTest)] from broker [BrokerEndPoint(0,am1dlccmrhde01,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2016-06-17 09:35:06,790] ERROR fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed (kafka.utils.CoreUtils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 12 more [2016-06-17 09:35:06,798] WARN Fetching topic metadata with correlation id 1 for topics [Set(shashangTest)] from broker [BrokerEndPoint(0,am1dlccmrhde01,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$getPartitionListForTopic(DefaultEventHandler.scala:188) at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152) at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2016-06-17 09:35:06,804] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed (kafka.producer.async.DefaultEventHandler) [2016-06-17 09:35:06,911] WARN Fetching topic metadata with correlation id 2 for topics [Set(shashangTest)] from broker [BrokerEndPoint(0,am1dlccmrhde01,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2016-06-17 09:35:06,913] ERROR fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed (kafka.utils.CoreUtils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 12 more

1 ACCEPTED SOLUTION

avatar

Issue resolved. The kafka broker service was running on internal ip address of the server whereas the systems initiating the producer process was trying to connect to the external ip address of the Kafka Broker server. Restarted Kafka Broker service after modifying the "listener" property with the correct ip address.

View solution in original post

3 REPLIES 3

avatar
Master Guru
@Shashang Sheth

The broker tells the client which hostname should be used to produce/consume messages. By default Kafka uses the hostname of the system it runs on. If this hostname can not be resolved by the client side you get this exception.

You can try setting advertised.host.name in the Kafka configuration to an hostname/address which the clients should use.

avatar

@Kuldeep,

I wasn't able to find this property in Ambari. Can you tell me where can I find it?

avatar

Issue resolved. The kafka broker service was running on internal ip address of the server whereas the systems initiating the producer process was trying to connect to the external ip address of the Kafka Broker server. Restarted Kafka Broker service after modifying the "listener" property with the correct ip address.