Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

​how to config 3+ brokers to be one cluster?

avatar
New Contributor

how to config 3+ brokers to be one cluster,how to config the listeners on ambari configs?

If I config the listeners=PLAINTEXT://0.0.0.0:6667,it can success create topic :

[root@iZ252i660a7Z current]# ./kafka-broker/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test3 --replication-factor 2 --partitions 1

Created topic "test3".,

[root@iZ252i660a7Z current]# /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list 0.0.0.0:6667 --topic test3 sdf [2016-05-06 18:13:35,972] WARN Error while fetching metadata [{TopicMetadata for topic test3 -> No partition metadata for topic test3 due to kafka.common.LeaderNotAvailableException}] for topic [test3]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-05-06 18:13:35,980] WARN Error while fetching metadata [{TopicMetadata for topic test3 -> No partition metadata for topic test3 due to kafka.common.LeaderNotAvailableException}] for topic [test3]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-05-06 18:13:35,981] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test3 (kafka.producer.async.DefaultEventHandler) [2016-05-06 18:13:36,087] WARN Error while fetching metadata [{TopicMetadata for topic test3 -> No partition metadata for topic test3 due to kafka.common.LeaderNotAvailableException}] for topic [test3]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-05-06 18:13:36,093] WARN Error while fetching metadata [{TopicMetadata for topic test3 -> No partition metadata for topic test3 due to kafka.common.LeaderNotAvailableException}] for topic [test3]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-05-06 18:13:36,094] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test3 (kafka.producer.async.DefaultEventHandler) [2016-05-06 18:13:36,200] WARN Error while fetching metadata [{TopicMetadata for topic test3 -> No partition metadata for topic test3 due to kafka.common.LeaderNotAvailableException}] for topic [test3]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-05-06 18:13:36,204] WARN Error while fetching metadata [{TopicMetadata for topic test3 -> No partition metadata for topic test3 due to kafka.common.LeaderNotAvailableException}] for topic [test3]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-05-06 18:13:36,204] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test3 (kafka.producer.async.DefaultEventHandler) [2016-05-06 18:13:36,309] WARN Error while fetching metadata [{TopicMetadata for topic test3 -> No partition metadata for topic test3 due to kafka.common.LeaderNotAvailableException}] for topic [test3]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-05-06 18:13:36,315] WARN Error while fetching metadata [{TopicMetadata for topic test3 -> No partition metadata for topic test3 due to kafka.common.LeaderNotAvailableException}] for topic [test3]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-05-06 18:13:36,315] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test3 (kafka.producer.async.DefaultEventHandler) [2016-05-06 18:13:36,420] WARN Error while fetching metadata [{TopicMetadata for topic test3 -> No partition metadata for topic test3 due to kafka.common.LeaderNotAvailableException}] for topic [test3]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2016-05-06 18:13:36,422] ERROR Failed to send requests for topics test3 with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) [2016-05-06 18:13:36,423] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

consumer:

[root@iZ25rwnt33wZ kafka]# /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test3 --from-beginning {metadata.broker.list=0.0.0.0:6667,0.0.0.0:6667,0.0.0.0:6667, request.timeout.ms=30000, client.id=console-consumer-99026, security.protocol=PLAINTEXT} {metadata.broker.list=0.0.0.0:6667,0.0.0.0:6667,0.0.0.0:6667, request.timeout.ms=30000, client.id=console-consumer-99026, security.protocol=PLAINTEXT} {metadata.broker.list=0.0.0.0:6667,0.0.0.0:6667,0.0.0.0:6667, request.timeout.ms=30000, client.id=console-consumer-99026, security.protocol=PLAINTEXT} {metadata.broker.list=0.0.0.0:6667,0.0.0.0:6667,0.0.0.0:6667, request.timeout.ms=30000, client.id=console-consumer-99026, security.protocol=PLAINTEXT}

1 ACCEPTED SOLUTION

avatar
New Contributor

Thanks,solved in another way. create 3+ config,and each have a node,then config listenres to its ip address like 192.168.1.21:6667

View solution in original post

3 REPLIES 3

avatar
Master Guru

Just set "listeners=PLAINTEXT://localhost:6667", Ambari and Kafka will replace each localhost with a respective broker node FQDN. No change when creating new topics. When running a producer, list all brokers, you can define them separately as you will need them over and over:

export BK="broker1.fqdn:6667,broker2.fqdn:6667,broker3.fqdn:6667"
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BK --topic test3

avatar
New Contributor

Thanks,solved in another way. create 3+ config,and each have a node,then config listenres to its ip address like 192.168.1.21:6667

avatar
Master Guru

Okay, that's exactly what Ambari does for you when you use "localhost" (though Ambari is using FQDNs not IPs).