Support Questions

Find answers, ask questions, and share your expertise

Who agreed with this topic

Problem about Configuring Flume as Kafka Consumer

avatar
New Contributor

My Configuration is

Cloudera Manager 5.12.1

Flume 1.7.0

Kakfa 2.2.0-1.2.2.0.p0.68

 

I created topic "test" in kafka, and would like to configure flume to act as consumer to fetch data from this topic and save it to HDFS.

My flume configuration is

#################################

tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = bda03:2182, bda04:2182,bda05:2182
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/flume/tmp/
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1

######################################

The configuration was just copied from https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html

 

After starting the flume agent, there was no error message in flume and kafka log file.

 

I tried to put some records to topics:

$ kafka-console-producer --broker-list bda03:9092,bda04:9092,bda05:9092 --topic test

Hello

This is new message

 

However, there was nothing put into HDFS.

Then, I tried to retreive message from topic "test" via command line (with consumer group flume)

$ kafka-console-consumer --zookeeper bda03:2182,bda04:2182,bda05:2182 --topic test --consumer-property group.id=flume

The output was:

Hello

This is new message

 

If the flume agent could fetch message from the topic, then the output of the above command should be empty.

The result indicated that the flume agent could not get any data from Kafka.

 

Can anyone help?

Thanks!

 

 

Who agreed with this topic