Reply
Explorer
Posts: 23
Registered: ‎02-21-2014
Accepted Solution

Kafka + Flume

Hi guys,

 

I've been trying to integrate Kafka+Flume to send data to HDFS, i followed the link step-by-step (http://ingest.tips/2014/09/26/trying-to-decide-between-flume-and-kafka-try-both/). The Kafka is doing pretty good, but i'm getting the following error from Flume Agent:

 

KafkaSource EXCEPTION, {}
java.lang.NullPointerException
             at java.lang.String.<init>(String.java:556)
             at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:105)
             at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
             at java.lang.Thread.run(Thread.java:745)

 

It seems related to null key from classical console-producer,  i've updated jars ... but still getting this error !!!

Any help would be very appreciated !!!

Cloudera Employee
Posts: 481
Registered: ‎08-11-2014

Re: Kafka + Flume

My guess is you are hitting the issue fixed in https://github.com/apache/flume/commit/199684b62ec983b8f922b1d6d706479032a18e64 but the cause was that your records have a null key. Maybe you can work around by ensuring the keys aren't null.

Explorer
Posts: 23
Registered: ‎02-21-2014

Re: Kafka + Flume

Again, tks a lot for answering me !!!

I'll add a key to my producer.

New Contributor
Posts: 1
Registered: ‎01-31-2019

Re: Kafka + Flume

Hi In my project we are using KafkaChannel in Flume configuration to stream data from tail_dir source to Kafka Topic. The tail file messages are being written from syslog.

 

We observed that while writing the messages into kafka topic, every line is getting appended with a Junk character as a prefix. Did anyone observed the similar issue in Flume ?

 

My Flume config as below:

 

source_agent_pib.sources = tail_dir_source
source_agent_pib.channels = kafkaChannel

source_agent_pib.sources.tail_dir_source.type = org.apache.flume.source.taildir.TaildirSource
source_agent_pib.sources.tail_dir_source.channels = kafkaChannel
source_agent_pib.sources.tail_dir_source.positionFile = /home/ownueda/Flume/taildir_position_pibWebLog.json

#source_agent_pib.sources.tail_dir_source.batchSize = 100
source_agent_pib.sources.tail_dir_source.batchSize = 100000

#source_agent_pib.sources.tail_dir_source.backoffSleepIncrement = 1000
source_agent_pib.sources.tail_dir_source.backoffSleepIncrement = 10

#source_agent_pib.sources.tail_dir_source.maxBackoffSleep = 5000
source_agent_pib.sources.tail_dir_source.maxBackoffSleep = 50

source_agent_pib.sources.tail_dir_source.deserializer.outputCharset = UTF-8
source_agent_pib.sources.tail_dir_source.inputCharset = ASCII

source_agent_pib.sources.tail_dir_source.recursiveDirectorySearch = true
source_agent_pib.sources.tail_dir_source.filegroups = f1
source_agent_pib.sources.tail_dir_source.filegroups.f1 = /data/streaming/syslog/edagd/pibgd/weblog.log

# Write to Channel
source_agent_pib.channels.kafkaChannel.channel = kafkaChannel
source_agent_pib.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel
source_agent_pib.channels.kafkaChannel.topic = pibWebLogs
source_agent_pib.channels.kafkaChannel.brokerList = apledausg12.sg.uobnet.com:9092,apledausg13.sg.uobnet.com:9092
source_agent_pib.channels.kafkaChannel.zookeeperConnect = apledausg02.sg.uobnet.com:2181

source_agent_pib.channels.kafkaChannel.kafka.consumer.security.protocol = SASL_PLAINTEXT
source_agent_pib.channels.kafkaChannel.kafka.consumer.sasl.kerberos.service.name = kafka

source_agent_pib.channels.kafkaChannel.kafka.producer.sasl.kerberos.service.name = kafka
source_agent_pib.channels.kafkaChannel.kafka.producer.security.protocol = SASL_PLAINTEXT
source_agent_pib.channels.kafkaChannel.kafka.producer.ssl.truststore.password=changeit
source_agent_pib.channels.kafkaChannel.generateKeytabFor = $KERBEROS_PRINCIPAL