Created on 12-09-2015 03:35 AM - edited 09-16-2022 02:51 AM
Hi,
I'm trying to accumulate all web logs into HDFS using two sets of flume agents.
FlumeAgentProducer
tailsLog --> memory_channel --> Kafka
FlumeAgentConsumer
Kafka --> memory_channel --> File_Roll / HDFS
Issue is FlumeAgentConsumer is throwing a null pointer exception for key being null.
2015-12-09 16:48:11,132 ERROR org.apache.flume.source.kafka.KafkaSource: KafkaSource EXCEPTION, {} java.lang.NullPointerException at java.lang.String.<init>(String.java:556) at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:105) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139) at java.lang.Thread.run(Thread.java:722)
And from FlumeAgentProducer logs, its clear that messages are written with a null key.
2015-12-09 16:34:01,065 DEBUG org.apache.flume.sink.kafka.KafkaSink: event #10 2015-12-09 16:34:01,065 DEBUG org.apache.flume.sink.kafka.KafkaSink: {Event} logtail : null : 0 0 0 3150340 194156 5139772 0 0 0 32 592 1017 1 0 99 0 0
I'm trying to figure out how I can add a key to each kafka message. Please help 🙂
Attaching Flume configs --> FlumeAgentConsumer and FlumeAgentProducer
Created 12-09-2015 04:17 AM
Sorry.
I should have added the "key" for Kafka messages as flume event event headers. Adding hostname interceptor with name as key solved my issue.
kafkaproducer.sources.tail1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder kafkaproducer.sources.tail1.interceptors.i1.preserveExisting = false kafkaproducer.sources.tail1.interceptors.i1.hostHeader = key
Created 12-09-2015 04:17 AM
Sorry.
I should have added the "key" for Kafka messages as flume event event headers. Adding hostname interceptor with name as key solved my issue.
kafkaproducer.sources.tail1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder kafkaproducer.sources.tail1.interceptors.i1.preserveExisting = false kafkaproducer.sources.tail1.interceptors.i1.hostHeader = key