Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

KafkaSource NullPointerException in Flume with Kafka source and HDFS Sink

KafkaSource NullPointerException in Flume with Kafka source and HDFS Sink

New Contributor

Hi All,

 

I've been trying to wrap my head around this for a few days now and have not been able to get this to work.  Appologies in advance for the

length of this post, I wanted to include as many details as possible.  Any help at all would be greatly appreciated.

 

System Background:

CDH-5.4.2-1.cdh5.4.2.p0.2 (Same issue occurs in 5.3.x)

KAFKA-0.8.2.0-1.kafka1.3.0.p0.29

 

We have a topic in Kafka for application logs and we are trying to ship these to hdfs.  I've followed this support doc (http://www.cloudera.com/content/cloudera/en/documentation/cloudera-kafka/latest/topics/kafka_flume.h... but I keep running into errors.  For the record, we have keys in this topic to avoid the pre-1.6 flume bug (https://issues.apache.org/jira/browse/FLUME-2578).  I have also tried this with a standalone install of Flume 1.6 and 1.7 from source, but the errors were the same.

 

The data flow looks like this (logstash is only populating Kafka, flume doesnt know about it):

logstash -> Kafka -> memory channel -> hdfs

 

For reference here is the latest Flume config:

 

flume1.sources = source1
flume1.channels = channel1
flume1.sinks = sink1
flume1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.source1.zookeeperConnect = 192.168.100.45:2181
flume1.sources.source1.topic = RawLogs
flume1.sources.source1.groupId = flume1
flume1.sources.source1.channels = channel1
flume1.sources.source1.interceptors = i1
flume1.sources.source1.interceptors.i1.type = timestamp
flume1.sources.source1.kafka.consumer.timeout.ms = 1000
flume1.channels.channel1.type = memory
flume1.channels.channel1.capacity = 10000
flume1.channels.channel1.transactionCapacity = 1000
flume1.sinks.sink1.type = hdfs
flume1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
flume1.sinks.sink1.hdfs.rollInterval = 5
flume1.sinks.sink1.hdfs.rollSize = 0
flume1.sinks.sink1.hdfs.rollCount = 0
flume1.sinks.sink1.hdfs.fileType = DataStream
flume1.sinks.sink1.channel = channel1

 

The Kafka logs seem to be almost entirely filled with expired fetch request notices:

DEBUG kafka.server.FetchRequestPurgatory: [FetchRequestPurgatory-115] Expiring fetch request Name: FetchRequest; Version: 0; CorrelationId: 8204; ClientId: flume1-ConsumerFetcherThread-flume1_ip-192-168-100-241.ec2.internal-1435597633404-c8812b41-0-115; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [RawLogs,0] -> PartitionFetchInfo(10530,1048576).
DEBUG kafka.request.logger: Completed request:Name: FetchRequest; Version: 0; CorrelationId: 8204; ClientId: flume1-ConsumerFetcherThread-flume1_ip-192-168-100-241.ec2.internal-1435597633404-c8812b41-0-115; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes from client /192.168.100.241:40105;totalTime:101,requestQueueTime:0,localTime:0,remoteTime:101,responseQueueTime:0,sendTime:0
2015-06-29 13:21:07,176 DEBUG kafka.server.KafkaApis: [KafkaApi-115] Putting fetch request with correlation id 8205 from client flume1-ConsumerFetcherThread-flume1_ip-192-168-100-241.ec2.internal-1435597633404-c8812b41-0-115 into purgatory

 

Here are the Flume logs after startup.  (The NullPointerExpections fill up the rest of the log occuring every few seconds.)

DEBUG kafka.client.ClientUtils$: Successfully fetched metadata for 1 topic(s) Set(RawLogs)
DEBUG kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [flume1_ip-192-168-100-241.ec2.internal-1435597633404-c8812b41-leader-finder-thread], {TopicMetadata for topic RawLogs ->
Metadata for partition [RawLogs,0] is    partition 0     leader: 115 (192.168.100.99:9092)        replicas: 115 (192.168.100.99:9092)      isr: 115 (192.168.100.99:9092)   isUnderReplicated: false}
INFO kafka.consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flume1_ip-192-168-100-241.ec2.internal-1435597633404-c8812b41-0-115], Starting
DEBUG kafka.network.BlockingChannel: Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 47405 (requested -1).
DEBUG kafka.consumer.PartitionTopicInfo: reset fetch offset of ( RawLogs:0: fetched offset = 10530: consumed offset = -1 ) to 10530
DEBUG kafka.consumer.PartitionTopicInfo: reset consume offset of RawLogs:0: fetched offset = 10530: consumed offset = 10530 to 10530
INFO kafka.consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1435597633481] Added fetcher for partitions ArrayBuffer([[RawLogs,0], initOffset -1 to broker id:115,host:192.168.100.99,port:9092] )
DEBUG org.apache.flume.source.kafka.KafkaSource: Waited: 1000
DEBUG org.apache.flume.source.kafka.KafkaSource: Event #: 0
ERROR org.apache.flume.source.kafka.KafkaSource: KafkaSource EXCEPTION, {}
java.lang.NullPointerException
        at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
        at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
        at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
        at java.lang.Thread.run(Thread.java:745)
DEBUG com.cloudera.cmf.event.publish.AvroEventStorePublishProxy: (Re)connecting to ip-192-168-100-227.ec2.internal:7184
DEBUG org.apache.zookeeper.ClientCnxn: Got ping response for sessionid: 0x14e40473d44000e after 1ms

 

Before we repointed the flume config to a topic with keys, we were seeing this in the flume logs. 

This error stopped showing up once we pointed flume to a Kafka topic with keys.

org.apache.flume.lifecycle.LifecycleSupervisor: Unable to start PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:source1,state:IDLE} counterGroup:{ name:null counters:{} } } - Exception follows.
org.apache.flume.FlumeException: Unable to get message iterator from Kafka
        at org.apache.flume.source.kafka.KafkaSource.start(KafkaSource.java:222)
        at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:74)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Path length must be > 0
        at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
        at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
        at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:766)
        at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
        at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
        at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
        at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213)
        at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
        at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
        at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
        at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:245)
        at kafka.utils.ZkUtils$.createEphemeralPath(ZkUtils.scala:256)
        at kafka.utils.ZkUtils$.createEphemeralPathExpectConflict(ZkUtils.scala:268)
        at kafka.utils.ZkUtils$.createEphemeralPathExpectConflictHandleZKBug(ZkUtils.scala:306)
        at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$registerConsumerInZK(ZookeeperConsumerConnector.scala:226)
        at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:211)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:92)
        at org.apache.flume.source.kafka.KafkaSource.start(KafkaSource.java:216)
        ... 9 more
INFO org.apache.flume.source.kafka.KafkaSource: Starting org.apache.flume.source.kafka.KafkaSource{name:source1,state:IDLE}...

 

I am truly stuck at this point, any help at all would be great appreciated!  

 

Thank you!

 

 

 

 

 

 

 

 

6 REPLIES 6

Re: Kafka throwing errors when using flume to pull messages from kafka and deliver to hdfs

Cloudera Employee

I tried recreating this, but (in pretty limited time) I wasn't able to recreate the issue. 

 

Here's what I did:

 

1) Copied your flume config

2) produced a bunch of messages using the Kafka Python producer:

 

from kafka import (
KafkaClient, KeyedProducer,
RoundRobinPartitioner)

kafka = KafkaClient('bda1node16.sjc.cloudera.com:9092')

# HashedPartitioner is default (currently uses python hash())
producer = KeyedProducer(kafka)
producer.send_messages(b'RawLogs', b'key1', b'some message')
producer.send_messages(b'RawLogs', b'key2', b'this methode')

 

This worked ok and I didn't get an NPEs during this. 

 

Do you actually need the functionality of the interceptor? I'm guessing that you don't really.

 

Can you try just using the kafka Channel?

 

Based on your configuraiton, it would look like this:

 

flume1.channels = channel1
flume1.sinks = sink1

flume1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
flume1.channels.channel1.brokerList = localhost:9092
flume1.channels.channel1.topic = RawLogs
flume1.channels.channel1.zookeeperConnect = 192.168.100.45:2181
flume1.channels.channel1.parseAsFlumeEvent = false
flume1.sinks.sink1.type = hdfs
flume1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
flume1.sinks.sink1.hdfs.rollInterval = 5
flume1.sinks.sink1.hdfs.rollSize = 0
flume1.sinks.sink1.hdfs.rollCount = 0
flume1.sinks.sink1.hdfs.fileType = DataStream
flume1.sinks.sink1.hdfs.writeFormat = Text
flume1.sinks.sink1.hdfs.useLocalTimeStamp = true
flume1.sinks.sink1.channel = channel1

 

It's important to have useLocalTimeStamp = true otherwise you'll get an error on the sink.

I really don't think you want to use a rollInterval of 5 btw, but I'll leave that up to you. You should examine the parameters related to rolling of files to make sure that you you fully understand them.

 

If I have some more time to look into the code, I can try to diagnose your original issue, but I wanted to try to respond since you were having some trouble.

 

Thanks

 

Jeff

Re: Kafka throwing errors when using flume to pull messages from kafka and deliver to hdfs

New Contributor

Hi all,

 

Appologies for taking so long to post an update on this issue.  After nearly giving up on this issue I realized that cloudera manager was at v5.4.1, but everything else was on CDH 5.4.2.  After upgrading CM to 5.4.3 everything started to magically work (but not upgrading CDH).

 

Unfortunately, I upgraded CDH to 5.4.3 to keep everything at the same version and the same errors are back.  

 

siddhartha.jain-1190932798, the errors you posted are exactly what I am seeing on my end.

 

jholoman, I followed the exact steps you outlined and continue to get NPEs.  

 

I also attempted setting Kafka as the channel as you suggested.  The issue is I need to be able to backfill from the data already in kafka topics so I added the flume1.channels.channel1.readSmallestOffset = true flag to read topics from the beginning.  For some reason I cant get reading directly from a Kafka channel to the HDFS to work.  In this configuration flume just spams "Timed out while waiting for data to come from Kafka" exception.

 

 

Any help would be greatly appreciated!

Thank you!

 

 

Re: Kafka throwing errors when using flume to pull messages from kafka and deliver to hdfs

New Contributor

Bump. Has anyone else been able to run a data pipeline with Kafka -> Flume > HDFS (KafkaSource -> memory channel or file channel -> HDFS sink) on CDH 5.4.3/CM 5.4.3 configuration? If so, please post an example flume config. Thank you!

Re: Kafka throwing errors when using flume to pull messages from kafka and deliver to hdfs

I have run into the same issue. Mindful of the flume NPE bug, I push test messages into kafka like this:

 

===================
import kafka.producer._
import kafka.serializer.{Decoder, StringDecoder}
import org.apache.log4j.{Level, Logger}
import java.util.Properties


val str = """{json_string}"""

val propsMap = Map("metadata.broker.list" -> "localhost:9092", "serializer.class" -> "kafka.serializer.StringEncoder", "producer.type" -> "async", "request.required.acks" -> "1", "batch.num.messages" -> "1000", "client.id" -> "sparkJsonWriter")

val props = new Properties()
propsMap.foreach(param => props.put(param._1, param._2))

val producerConfig = new ProducerConfig(props)

val producer = new Producer[String, String](producerConfig)

producer.send(new KeyedMessage[String,String]("alerts", "randomStringKey", str))

producer.close
===================

 

I can validate with kafka-console-consumer that the messages do not have a null key.

kafka-console-consumer.sh --zookeeper localhost:2181 --topic alerts --from-beginning --property print.key=true

 

I am writing a custom sink and for testing, I have simple code (that I tested with the netcat source):

try {
// This try clause includes whatever Channel operations you want to do

Event event = ch.take();
LOG.info("Event is: " + event.toString());;
// Send the Event to the external repository.
// storeSomeData(e);

txn.commit();
status = Status.READY;

}

 

In flume (CDH 5.4.2), if I use kafka as a source, I get these errors:

2015-07-01 21:06:32,639 ERROR org.apache.flume.source.kafka.KafkaSource: KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)

 

If I use kafka as a channel, I get these errors:

2015-07-01 23:08:43,194 DEBUG org.apache.flume.channel.kafka.KafkaChannel: Timed out while waiting for data to come from Kafka
kafka.consumer.ConsumerTimeoutException
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at SfdcAlertSink.process(SfdcAlertSink.java:170)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)

 

Either way, my custom flume sink does not seem to get messages off Kafka and print them in the flume logs.

 

 

 

Re: Kafka throwing errors when using flume to pull messages from kafka and deliver to hdfs

I figured out why using only kafka channel and some sink doesn't work very nicely - the config parameter "readSmallestOffset".

 

The Flume Guide doesn't use punctuations very well so here is a translated version ;)

 

"readSmallestOffset"

- When set to true, the channel will read all data in the topic, starting from the oldest event

- when false, it will read only events written after the channel started

 

So if you write data to a kafka topic when the flume agent is down and "readSmallestOffset" is set to false (which is the default), the flume kafka channel won't read events after it starts up. Which, to my mind, makes it sort of useless in the (channel+sink)-only combo.

 

You set it to true, and it will keep re-reading messages from the beginning - not desirable either. How about let the channel behave like a regular high level kafka consumer and pick up where it left off before being shut down?

 

So either the flume channel behaviour is fixed to account for (channel+sink)-only config or the kafka source bug is fixed to avoid the NPE issue.

 

 

Highlighted

Re: Kafka throwing errors when using flume to pull messages from kafka and deliver to hdfs

Looks like there is an open bug:

https://issues.apache.org/jira/browse/FLUME-2672