Hi,
I'm trying to accumulate all web logs into HDFS using two sets of flume agents.
FlumeAgentProducer
tailsLog --> memory_channel --> Kafka
FlumeAgentConsumer
Kafka --> memory_channel --> File_Roll / HDFS
Issue is FlumeAgentConsumer is throwing a null pointer exception for key being null.
2015-12-09 16:48:11,132 ERROR org.apache.flume.source.kafka.KafkaSource: KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at java.lang.String.<init>(String.java:556)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:105)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:722)
And from FlumeAgentProducer logs, its clear that messages are written with a null key.
2015-12-09 16:34:01,065 DEBUG org.apache.flume.sink.kafka.KafkaSink: event #10
2015-12-09 16:34:01,065 DEBUG org.apache.flume.sink.kafka.KafkaSink: {Event} logtail : null : 0 0 0 3150340 194156 5139772 0 0 0 32 592 1017 1 0 99 0 0
I'm trying to figure out how I can add a key to each kafka message. Please help 🙂
Attaching Flume configs --> FlumeAgentConsumer and FlumeAgentProducer