- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Kafka producer giving error when running from a different host then the one running Kafka broker.
- Labels:
-
Apache Kafka
Created ‎06-17-2016 03:40 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am using the below commands to start the producer and consumers. They work fine when done from the server running kafka broker (am1dlccmrhde01) but when I start the producer from another linux server, it gives the mentioned error.
Commands used:
./kafka-console-producer.sh --broker-list am1dlccmrhde01:6667 --topic shashangTest
./kafka-console-consumer.sh --zookeeper am1dlccmrhde01:2181 --topic shashangTest -from-beginning
Error produced when starting the producer from a remote server (tlccmrhds01):
[kafka@tlccmrhds01 bin]$ ./kafka-console-producer.sh --broker-list am1dlccmrhde01:6667 --topic shashangTest
Hello
[2016-06-17 09:35:06,781] WARN Fetching topic metadata with correlation id 0 for topics [Set(shashangTest)] from broker [BrokerEndPoint(0,am1dlccmrhde01,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2016-06-17 09:35:06,790] ERROR fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed (kafka.utils.CoreUtils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 12 more [2016-06-17 09:35:06,798] WARN Fetching topic metadata with correlation id 1 for topics [Set(shashangTest)] from broker [BrokerEndPoint(0,am1dlccmrhde01,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$getPartitionListForTopic(DefaultEventHandler.scala:188) at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152) at kafka.producer.async.DefaultEventHandler$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2016-06-17 09:35:06,804] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed (kafka.producer.async.DefaultEventHandler) [2016-06-17 09:35:06,911] WARN Fetching topic metadata with correlation id 2 for topics [Set(shashangTest)] from broker [BrokerEndPoint(0,am1dlccmrhde01,6667)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2016-06-17 09:35:06,913] ERROR fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed (kafka.utils.CoreUtils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(shashangTest)] from broker [ArrayBuffer(BrokerEndPoint(0,am1dlccmrhde01,6667))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:120) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:115) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 12 more
Created ‎06-20-2016 06:07 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Issue resolved. The kafka broker service was running on internal ip address of the server whereas the systems initiating the producer process was trying to connect to the external ip address of the Kafka Broker server. Restarted Kafka Broker service after modifying the "listener" property with the correct ip address.
Created ‎06-17-2016 05:03 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The broker tells the client which hostname should be used to produce/consume messages. By default Kafka uses the hostname of the system it runs on. If this hostname can not be resolved by the client side you get this exception.
You can try setting advertised.host.name in the Kafka configuration to an hostname/address which the clients should use.
Created ‎06-20-2016 10:12 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Kuldeep,
I wasn't able to find this property in Ambari. Can you tell me where can I find it?
Created ‎06-20-2016 06:07 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Issue resolved. The kafka broker service was running on internal ip address of the server whereas the systems initiating the producer process was trying to connect to the external ip address of the Kafka Broker server. Restarted Kafka Broker service after modifying the "listener" property with the correct ip address.
