Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Problem about Configuring Flume as Kafka Consumer

avatar
New Contributor

My Configuration is

Cloudera Manager 5.12.1

Flume 1.7.0

Kakfa 2.2.0-1.2.2.0.p0.68

 

I created topic "test" in kafka, and would like to configure flume to act as consumer to fetch data from this topic and save it to HDFS.

My flume configuration is

#################################

tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = bda03:2182, bda04:2182,bda05:2182
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/flume/tmp/
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1

######################################

The configuration was just copied from https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html

 

After starting the flume agent, there was no error message in flume and kafka log file.

 

I tried to put some records to topics:

$ kafka-console-producer --broker-list bda03:9092,bda04:9092,bda05:9092 --topic test

Hello

This is new message

 

However, there was nothing put into HDFS.

Then, I tried to retreive message from topic "test" via command line (with consumer group flume)

$ kafka-console-consumer --zookeeper bda03:2182,bda04:2182,bda05:2182 --topic test --consumer-property group.id=flume

The output was:

Hello

This is new message

 

If the flume agent could fetch message from the topic, then the output of the above command should be empty.

The result indicated that the flume agent could not get any data from Kafka.

 

Can anyone help?

Thanks!

 

 

13 REPLIES 13

avatar
New Contributor

The flume agent log file was:

2017-10-18 17:01:49,742 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
2017-10-18 17:01:49,760 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/run/cloudera-scm-agent/process/2727-flume-AGENT/flume.conf
2017-10-18 17:01:49,765 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1
2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1
2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1
2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1
2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1
2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1
2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Added sinks: sink1 Agent: tier1
2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1
2017-10-18 17:01:49,783 INFO org.apache.flume.conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [tier1]
2017-10-18 17:01:49,783 INFO org.apache.flume.node.AbstractConfigurationProvider: Creating channels
2017-10-18 17:01:49,790 INFO org.apache.flume.channel.DefaultChannelFactory: Creating instance of channel channel1 type memory
2017-10-18 17:01:49,798 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel channel1
2017-10-18 17:01:49,799 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source source1, type org.apache.flume.source.kafka.KafkaSource
2017-10-18 17:01:49,835 INFO org.apache.flume.sink.DefaultSinkFactory: Creating instance of sink: sink1, type: hdfs
2017-10-18 17:01:49,847 INFO org.apache.flume.node.AbstractConfigurationProvider: Channel channel1 connected to [source1, sink1]
2017-10-18 17:01:49,854 INFO org.apache.flume.node.Application: Starting new configuration:{ sourceRunners:{source1=PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:source1,state:IDLE} counterGroup:{ name:null counters:{} } }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@705cdf05 counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} }
2017-10-18 17:01:49,855 INFO org.apache.flume.node.Application: Starting Channel channel1
2017-10-18 17:01:49,908 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean.
2017-10-18 17:01:49,909 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 started
2017-10-18 17:01:49,909 INFO org.apache.flume.node.Application: Starting Sink sink1
2017-10-18 17:01:49,910 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
2017-10-18 17:01:49,911 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
2017-10-18 17:01:49,913 INFO org.apache.flume.node.Application: Starting Source source1
2017-10-18 17:01:49,914 INFO org.apache.flume.source.kafka.KafkaSource: Starting org.apache.flume.source.kafka.KafkaSource{name:source1,state:IDLE}...
2017-10-18 17:01:49,941 INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = abcd
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [bda03.aslsdc.local:9092, bda04.aslsdc.local:9092, bda05.aslsdc.local:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = latest

2017-10-18 17:01:49,952 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
2017-10-18 17:01:49,994 INFO org.mortbay.log: jetty-6.1.26.cloudera.4
2017-10-18 17:01:50,016 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
2017-10-18 17:01:50,066 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 0.9.0-kafka-2.0.2
2017-10-18 17:01:50,066 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : unknown

avatar
New Contributor
Did you find any solution? I am facing exact issue

avatar
New Contributor
me too, I also met this situation.

avatar
New Contributor

Sadly I have no new info to provide insight into this issue...but also wanted to callout that we are seeing the same thing on our side.  No errors, just no data movement on the kafka-source -> flume side. 

avatar
The problem is usually that the kafka consumer is not configured properly, and is failing silently while it is running. You can verify if the flume consumer group is actually connected to partitions by running the
"kafka-consumer-groups" command.

You could also turn on log4j.logger.org.apache.kafka=DEBUG in the broker logging safety valve, and review the messages when flume tries to connect to kafka. A lot of "errors" are retryable, meaning they won't throw an exception, but you won't see any output.

-pd

avatar
New Contributor
Great, thank you. I'll take a look at both paths to see if we can narrow down where the issue is occurring.

I'm very confident it's a consumer config issue, but I thought the bare bones config examples on the web would just work, even if not efficiently.

Thanks again...I'll provide an update on how this goes.

avatar
Just realized, the log4j setting should go in the flume logging safety valve, not the broker. Also, make sure you can run a kafka-console-consumer and connect to the topic as well, just to make sure its not something with kafka.

-pd

avatar
New Contributor

So, there were three issues.  First off, you were correct in that the group was not attached to the partition.  I'm sure it's obvious, but I'm new to kafka, so I'm not sure how the groups get associated with a partition.  In my case the partition was already tied to a different group that I'm not familiar with.  I'll need to do some research.

 

Second, my offsets were equal, so I had to reset my offset to 0.

 

Third, my java heap was too small.  

 

Soon as I tweaked those, and adjusted to the proper group, as seen in the kafka-consumer-groups command, data is flowing.  Now to figure out how the groups work.

 

Thanks so much for your help.

avatar
New Contributor

Got the same problem here.

btw, log4j could not be modified(log4j.logger.org.apache.kafka=DEBUG) in the quickstart vm 5.13 cuz it is a read-only file, it doesn't allow me to save after the change. @pdvorak

 

Any other help would be highly appreciated! Thanks.