Support Questions

Find answers, ask questions, and share your expertise

Not able to read from kafka topic

avatar
Explorer

 I am running Metron using Ambari in single node and also set up three node. Few months back , we are able to send messages to  Kafka topic and see in Metron dashboard. But now , I am getting the following error message "Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)". For three node also getting the same warning and not able to see any messages. Installed kafka broker in all three nodes.  What are all the properties needs to be configured in Kafka server.properties.

 

listeners=PLAINTEXT://node1:6667

 

7 REPLIES 7

avatar
Expert Contributor

Hey,

 

Can you try adding "advertised.listeners" in Kafka server.properties?

 

advertised.listeners=PLAINTEXT://node1:6667

avatar
Explorer

yes i gave . still it is not working . For three node set up using HDP in ambari , what should i configure as listeners and bootstrap server 

avatar
Expert Contributor

Hello @nhemamalini 

 

Are you able to send/consume data to/from the topics by using the Kafka command line?

 

Producer:

bin/kafka-console-producer.sh --broker-list node1:6667 --topic <topicName>
 
Consumer:
bin/kafka-console-consumer.sh --bootstrap-server node1:6667 --topic <topicName> --from-beginning
 
Thanks.
 

avatar
Explorer

Hi Now i am able to read the messages from the Kafka topic but in the storm i am getting the below error. How can i delete the messages that storm trying to process.

Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
2019-09-12 12:38:10.786 o.a.s.d.executor Thread-6-errorMessageWriter-executor[3 3] [ERROR] 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
	at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:730) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:483) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353) ~[stormjar.jar:?]
	at org.apache.metron.writer.kafka.KafkaWriter.write(KafkaWriter.java:257) ~[stormjar.jar:?]
	at org.apache.metron.writer.BulkWriterComponent.flush(BulkWriterComponent.java:123) [stormjar.jar:?]
	at org.apache.metron.writer.BulkWriterComponent.applyShouldFlush(BulkWriterComponent.java:179) [stormjar.jar:?]
	at org.apache.metron.writer.BulkWriterComponent.write(BulkWriterComponent.java:99) [stormjar.jar:?]
	at org.apache.metron.parsers.bolt.WriterHandler.write(WriterHandler.java:90) [stormjar.jar:?]
	at org.apache.metron.parsers.bolt.WriterBolt.execute(WriterBolt.java:90) [stormjar.jar:?]

 

avatar
Explorer

How can i delete the entire messages from the storm reprocessing it again and again

avatar
New Contributor

Did you tried using FQDN to produce kafka and also add them in /etc/hosts file with proper FQDN as same as you given in kafka config

 

avatar
Explorer

I didnt use FQDN, instead i just added ip in /etc/hosts file. i used the same host ip in the kafka config