Reply
Highlighted
New Contributor
Posts: 2
Registered: ‎01-27-2019

Flume cannot output data from kafka to HDFS

[ Edited ]

I have a problem now, after starting the Flume agent, Flume immediately gives an error:

2019-01-28 14:28:52,406 ERROR org.apache.flume.source.kafka.KafkaSource: KafkaSource EXCEPTION, {}
org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: ct103}
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:132)
at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:58)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doPut(MemoryChannel.java:84)
at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
... 4 more

 

My flume configuration:

tier1.sources= rt103
tier1.channels= ct103
tier1.sinks= kt103

# Configure source
tier1.sources.rt103.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.rt103.channels = ct103
tier1.sources.rt103.zookeeperConnect =10.231.35.6:2181,10.231.35.15:2181,10.231.35.122:2181
tier1.sources.rt103.topic = zk103
tier1.sources.rt103.groupId = flume21a22
tier1.sources.rt103.kafka.consumer.timeout.ms = 100
tier1.sources.rt103.kafka.fetch.message.max.bytes = 512000000

# Configure channel

tier1.channels.ct103.type=memory
tier1.channels.ct103.capacity=20000000
tier1.channels.ct103.transactioncapacity=2000000
tier1.channels.ct103.byteCapacityBufferPercentage = 20
tier1.channels.ct103.byteCapacity = 8589934592
tier1.channels.ct103.threads=20

#Configure sink

tier1.sinks.kt103.channel = ct103
tier1.sinks.kt103.type = hdfs
tier1.sinks.kt103.hdfs.useLocalTimeStamp = true
tier1.sinks.kt103.hdfs.kerberosPrincipal = bdi
tier1.sinks.kt103.hdfs.kerberosKeytab = /bdi/conf/bdi.keytab
tier1.sinks.kt103.hdfs.path = hdfs://nameservice-newremote1/user/bdi/4gdpi/11/103/%Y%m%d%H
tier1.sinks.kt103.hdfs.filePrefix = 4gdpi_ip21_flume1_kt103_%Y%m%d%H%M%S
tier1.sinks.kt103.hdfs.fileSuffix = .txt
tier1.sinks.kt103.hdfs.rollInterval = 0
tier1.sinks.kt103.hdfs.rollSize = 134217728
tier1.sinks.kt103.hdfs.rollCount = 0
tier1.sinks.kt103.hdfs.idleTimeout = 60
tier1.sinks.kt103.hdfs.round = true
tier1.sinks.kt103.hdfs.roundValue = 10
tier1.sinks.kt103.hdfs.callTimeout = 600000
tier1.sinks.kt103.hdfs.roundUnit = minute
tier1.sinks.kt103.hdfs.writeFormat = Text
tier1.sinks.kt103.hdfs.fileType = DataStream
tier1.sinks.kt103.hdfs.batchSize = 10000
tier1.sinks.kt103.hdfs.maxOpenFiles = 5000

 

How has anyone encountered this problem?

Cloudera Employee
Posts: 58
Registered: ‎04-24-2017

Re: Flume cannot output data from kafka to HDFS

Hi,

 

Looks like the put queue is getting full immediately as the data is not getting committed to the sink as fast as the data is coming to the channel.

 

Can you try to reduce the hdfs.batchSize to 100 and see if that helps.

 

Regards
Bimal

Announcements
New solutions