Cloudera Labs
Provide feedback on Cloudera Labs
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here. Want to know more about what has changed? Check out the Community News blog.

Kafka + Flume

SOLVED Go to solution

Kafka + Flume

Explorer

Hi guys,

 

I've been trying to integrate Kafka+Flume to send data to HDFS, i followed the link step-by-step (http://ingest.tips/2014/09/26/trying-to-decide-between-flume-and-kafka-try-both/). The Kafka is doing pretty good, but i'm getting the following error from Flume Agent:

 

KafkaSource EXCEPTION, {}
java.lang.NullPointerException
             at java.lang.String.<init>(String.java:556)
             at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:105)
             at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
             at java.lang.Thread.run(Thread.java:745)

 

It seems related to null key from classical console-producer,  i've updated jars ... but still getting this error !!!

Any help would be very appreciated !!!

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Kafka + Flume

Master Collaborator

My guess is you are hitting the issue fixed in https://github.com/apache/flume/commit/199684b62ec983b8f922b1d6d706479032a18e64 but the cause was that your records have a null key. Maybe you can work around by ensuring the keys aren't null.

3 REPLIES 3

Re: Kafka + Flume

Master Collaborator

My guess is you are hitting the issue fixed in https://github.com/apache/flume/commit/199684b62ec983b8f922b1d6d706479032a18e64 but the cause was that your records have a null key. Maybe you can work around by ensuring the keys aren't null.

Re: Kafka + Flume

Explorer

Again, tks a lot for answering me !!!

I'll add a key to my producer.

Highlighted

Re: Kafka + Flume

New Contributor

Hi In my project we are using KafkaChannel in Flume configuration to stream data from tail_dir source to Kafka Topic. The tail file messages are being written from syslog.

 

We observed that while writing the messages into kafka topic, every line is getting appended with a Junk character as a prefix. Did anyone observed the similar issue in Flume ?

 

My Flume config as below:

 

source_agent_pib.sources = tail_dir_source
source_agent_pib.channels = kafkaChannel

source_agent_pib.sources.tail_dir_source.type = org.apache.flume.source.taildir.TaildirSource
source_agent_pib.sources.tail_dir_source.channels = kafkaChannel
source_agent_pib.sources.tail_dir_source.positionFile = /home/ownueda/Flume/taildir_position_pibWebLog.json

#source_agent_pib.sources.tail_dir_source.batchSize = 100
source_agent_pib.sources.tail_dir_source.batchSize = 100000

#source_agent_pib.sources.tail_dir_source.backoffSleepIncrement = 1000
source_agent_pib.sources.tail_dir_source.backoffSleepIncrement = 10

#source_agent_pib.sources.tail_dir_source.maxBackoffSleep = 5000
source_agent_pib.sources.tail_dir_source.maxBackoffSleep = 50

source_agent_pib.sources.tail_dir_source.deserializer.outputCharset = UTF-8
source_agent_pib.sources.tail_dir_source.inputCharset = ASCII

source_agent_pib.sources.tail_dir_source.recursiveDirectorySearch = true
source_agent_pib.sources.tail_dir_source.filegroups = f1
source_agent_pib.sources.tail_dir_source.filegroups.f1 = /data/streaming/syslog/edagd/pibgd/weblog.log

# Write to Channel
source_agent_pib.channels.kafkaChannel.channel = kafkaChannel
source_agent_pib.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel
source_agent_pib.channels.kafkaChannel.topic = pibWebLogs
source_agent_pib.channels.kafkaChannel.brokerList = apledausg12.sg.uobnet.com:9092,apledausg13.sg.uobnet.com:9092
source_agent_pib.channels.kafkaChannel.zookeeperConnect = apledausg02.sg.uobnet.com:2181

source_agent_pib.channels.kafkaChannel.kafka.consumer.security.protocol = SASL_PLAINTEXT
source_agent_pib.channels.kafkaChannel.kafka.consumer.sasl.kerberos.service.name = kafka

source_agent_pib.channels.kafkaChannel.kafka.producer.sasl.kerberos.service.name = kafka
source_agent_pib.channels.kafkaChannel.kafka.producer.security.protocol = SASL_PLAINTEXT
source_agent_pib.channels.kafkaChannel.kafka.producer.ssl.truststore.password=changeit
source_agent_pib.channels.kafkaChannel.generateKeytabFor = $KERBEROS_PRINCIPAL