Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Kafka + Flume : Not able to read from KafkaSource, key = null

avatar
Explorer

Hi, 

 

 

I'm trying to accumulate all web logs into HDFS using two sets of flume agents.

 

FlumeAgentProducer

tailsLog --> memory_channel --> Kafka 

 

FlumeAgentConsumer 

Kafka --> memory_channel --> File_Roll / HDFS

 

Issue is FlumeAgentConsumer is throwing a null pointer exception for key being null. 

2015-12-09 16:48:11,132 ERROR org.apache.flume.source.kafka.KafkaSource: KafkaSource EXCEPTION, {}
java.lang.NullPointerException
	at java.lang.String.<init>(String.java:556)
	at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:105)
	at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
	at java.lang.Thread.run(Thread.java:722)

 

And from FlumeAgentProducer logs, its clear that messages are written with a null key.

2015-12-09 16:34:01,065 DEBUG org.apache.flume.sink.kafka.KafkaSink: event #10
2015-12-09 16:34:01,065 DEBUG org.apache.flume.sink.kafka.KafkaSink: {Event} logtail : null :  0  0      0 3150340 194156 5139772    0    0     0    32  592 1017  1  0 99  0  0	

 

I'm trying to figure out how I can add a key to each kafka message. Please help 🙂 

 

Attaching Flume configs --> FlumeAgentConsumer and FlumeAgentProducer

 

 

1 ACCEPTED SOLUTION

avatar
Explorer

Sorry. 

 

I should have added the "key" for Kafka messages as flume event event headers. Adding hostname interceptor with name as key solved my issue. 

kafkaproducer.sources.tail1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
kafkaproducer.sources.tail1.interceptors.i1.preserveExisting = false
kafkaproducer.sources.tail1.interceptors.i1.hostHeader = key

View solution in original post

1 REPLY 1

avatar
Explorer

Sorry. 

 

I should have added the "key" for Kafka messages as flume event event headers. Adding hostname interceptor with name as key solved my issue. 

kafkaproducer.sources.tail1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
kafkaproducer.sources.tail1.interceptors.i1.preserveExisting = false
kafkaproducer.sources.tail1.interceptors.i1.hostHeader = key