Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Kafka + Flume : Not able to read from KafkaSource, key = null

avatar
Explorer

Hi, 

 

 

I'm trying to accumulate all web logs into HDFS using two sets of flume agents.

 

FlumeAgentProducer

tailsLog --> memory_channel --> Kafka 

 

FlumeAgentConsumer 

Kafka --> memory_channel --> File_Roll / HDFS

 

Issue is FlumeAgentConsumer is throwing a null pointer exception for key being null. 

2015-12-09 16:48:11,132 ERROR org.apache.flume.source.kafka.KafkaSource: KafkaSource EXCEPTION, {}
java.lang.NullPointerException
	at java.lang.String.<init>(String.java:556)
	at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:105)
	at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
	at java.lang.Thread.run(Thread.java:722)

 

And from FlumeAgentProducer logs, its clear that messages are written with a null key.

2015-12-09 16:34:01,065 DEBUG org.apache.flume.sink.kafka.KafkaSink: event #10
2015-12-09 16:34:01,065 DEBUG org.apache.flume.sink.kafka.KafkaSink: {Event} logtail : null :  0  0      0 3150340 194156 5139772    0    0     0    32  592 1017  1  0 99  0  0	

 

I'm trying to figure out how I can add a key to each kafka message. Please help 🙂 

 

Attaching Flume configs --> FlumeAgentConsumer and FlumeAgentProducer

 

 

1 ACCEPTED SOLUTION

avatar
Explorer

Sorry. 

 

I should have added the "key" for Kafka messages as flume event event headers. Adding hostname interceptor with name as key solved my issue. 

kafkaproducer.sources.tail1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
kafkaproducer.sources.tail1.interceptors.i1.preserveExisting = false
kafkaproducer.sources.tail1.interceptors.i1.hostHeader = key

View solution in original post

1 REPLY 1

avatar
Explorer

Sorry. 

 

I should have added the "key" for Kafka messages as flume event event headers. Adding hostname interceptor with name as key solved my issue. 

kafkaproducer.sources.tail1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
kafkaproducer.sources.tail1.interceptors.i1.preserveExisting = false
kafkaproducer.sources.tail1.interceptors.i1.hostHeader = key