Support Questions

Find answers, ask questions, and share your expertise

Not able to read from kafka topic

Explorer

 I am running Metron using Ambari in single node and also set up three node. Few months back , we are able to send messages to  Kafka topic and see in Metron dashboard. But now , I am getting the following error message "Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)". For three node also getting the same warning and not able to see any messages. Installed kafka broker in all three nodes.  What are all the properties needs to be configured in Kafka server.properties.

 

listeners=PLAINTEXT://node1:6667

 

7 REPLIES 7

Contributor

Hey,

 

Can you try adding "advertised.listeners" in Kafka server.properties?

 

advertised.listeners=PLAINTEXT://node1:6667

Explorer

yes i gave . still it is not working . For three node set up using HDP in ambari , what should i configure as listeners and bootstrap server 

Expert Contributor

Hello @nhemamalini 

 

Are you able to send/consume data to/from the topics by using the Kafka command line?

 

Producer:

bin/kafka-console-producer.sh --broker-list node1:6667 --topic <topicName>
 
Consumer:
bin/kafka-console-consumer.sh --bootstrap-server node1:6667 --topic <topicName> --from-beginning
 
Thanks.
 

Explorer

Hi Now i am able to read the messages from the Kafka topic but in the storm i am getting the below error. How can i delete the messages that storm trying to process.

Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
2019-09-12 12:38:10.786 o.a.s.d.executor Thread-6-errorMessageWriter-executor[3 3] [ERROR] 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
	at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:730) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:483) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353) ~[stormjar.jar:?]
	at org.apache.metron.writer.kafka.KafkaWriter.write(KafkaWriter.java:257) ~[stormjar.jar:?]
	at org.apache.metron.writer.BulkWriterComponent.flush(BulkWriterComponent.java:123) [stormjar.jar:?]
	at org.apache.metron.writer.BulkWriterComponent.applyShouldFlush(BulkWriterComponent.java:179) [stormjar.jar:?]
	at org.apache.metron.writer.BulkWriterComponent.write(BulkWriterComponent.java:99) [stormjar.jar:?]
	at org.apache.metron.parsers.bolt.WriterHandler.write(WriterHandler.java:90) [stormjar.jar:?]
	at org.apache.metron.parsers.bolt.WriterBolt.execute(WriterBolt.java:90) [stormjar.jar:?]

 

Explorer

How can i delete the entire messages from the storm reprocessing it again and again

New Contributor

Did you tried using FQDN to produce kafka and also add them in /etc/hosts file with proper FQDN as same as you given in kafka config

 

Explorer

I didnt use FQDN, instead i just added ip in /etc/hosts file. i used the same host ip in the kafka config