Posts: 23
Registered: ‎02-21-2014
Accepted Solution

Kafka + Flume

Hi guys,


I've been trying to integrate Kafka+Flume to send data to HDFS, i followed the link step-by-step ( The Kafka is doing pretty good, but i'm getting the following error from Flume Agent:


KafkaSource EXCEPTION, {}
             at java.lang.String.<init>(
             at org.apache.flume.source.kafka.KafkaSource.process(
             at org.apache.flume.source.PollableSourceRunner$


It seems related to null key from classical console-producer,  i've updated jars ... but still getting this error !!!

Any help would be very appreciated !!!

Cloudera Employee
Posts: 481
Registered: ‎08-11-2014

Re: Kafka + Flume

My guess is you are hitting the issue fixed in but the cause was that your records have a null key. Maybe you can work around by ensuring the keys aren't null.

Posts: 23
Registered: ‎02-21-2014

Re: Kafka + Flume

Again, tks a lot for answering me !!!

I'll add a key to my producer.

New Contributor
Posts: 1
Registered: ‎01-31-2019

Re: Kafka + Flume

Hi In my project we are using KafkaChannel in Flume configuration to stream data from tail_dir source to Kafka Topic. The tail file messages are being written from syslog.


We observed that while writing the messages into kafka topic, every line is getting appended with a Junk character as a prefix. Did anyone observed the similar issue in Flume ?


My Flume config as below:


source_agent_pib.sources = tail_dir_source
source_agent_pib.channels = kafkaChannel

source_agent_pib.sources.tail_dir_source.type = org.apache.flume.source.taildir.TaildirSource
source_agent_pib.sources.tail_dir_source.channels = kafkaChannel
source_agent_pib.sources.tail_dir_source.positionFile = /home/ownueda/Flume/taildir_position_pibWebLog.json

#source_agent_pib.sources.tail_dir_source.batchSize = 100
source_agent_pib.sources.tail_dir_source.batchSize = 100000

#source_agent_pib.sources.tail_dir_source.backoffSleepIncrement = 1000
source_agent_pib.sources.tail_dir_source.backoffSleepIncrement = 10

#source_agent_pib.sources.tail_dir_source.maxBackoffSleep = 5000
source_agent_pib.sources.tail_dir_source.maxBackoffSleep = 50

source_agent_pib.sources.tail_dir_source.deserializer.outputCharset = UTF-8
source_agent_pib.sources.tail_dir_source.inputCharset = ASCII

source_agent_pib.sources.tail_dir_source.recursiveDirectorySearch = true
source_agent_pib.sources.tail_dir_source.filegroups = f1
source_agent_pib.sources.tail_dir_source.filegroups.f1 = /data/streaming/syslog/edagd/pibgd/weblog.log

# Write to Channel = kafkaChannel
source_agent_pib.channels.kafkaChannel.type =
source_agent_pib.channels.kafkaChannel.topic = pibWebLogs
source_agent_pib.channels.kafkaChannel.brokerList =,
source_agent_pib.channels.kafkaChannel.zookeeperConnect = = SASL_PLAINTEXT = kafka = kafka = SASL_PLAINTEXT
source_agent_pib.channels.kafkaChannel.generateKeytabFor = $KERBEROS_PRINCIPAL