Member since
11-16-2015
6
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3489 | 12-09-2015 04:17 AM |
12-09-2015
04:17 AM
Sorry. I should have added the "key" for Kafka messages as flume event event headers. Adding hostname interceptor with name as key solved my issue. kafkaproducer.sources.tail1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
kafkaproducer.sources.tail1.interceptors.i1.preserveExisting = false
kafkaproducer.sources.tail1.interceptors.i1.hostHeader = key
... View more
12-09-2015
03:35 AM
Hi, I'm trying to accumulate all web logs into HDFS using two sets of flume agents. FlumeAgentProducer tailsLog --> memory_channel --> Kafka FlumeAgentConsumer Kafka --> memory_channel --> File_Roll / HDFS Issue is FlumeAgentConsumer is throwing a null pointer exception for key being null. 2015-12-09 16:48:11,132 ERROR org.apache.flume.source.kafka.KafkaSource: KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at java.lang.String.<init>(String.java:556)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:105)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:722) And from FlumeAgentProducer logs, its clear that messages are written with a null key. 2015-12-09 16:34:01,065 DEBUG org.apache.flume.sink.kafka.KafkaSink: event #10
2015-12-09 16:34:01,065 DEBUG org.apache.flume.sink.kafka.KafkaSink: {Event} logtail : null : 0 0 0 3150340 194156 5139772 0 0 0 32 592 1017 1 0 99 0 0 I'm trying to figure out how I can add a key to each kafka message. Please help 🙂 Attaching Flume configs --> FlumeAgentConsumer and FlumeAgentProducer
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Kafka