Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Getting error while running flume agent

avatar
Rising Star

Hi,

 

I am getting below error while running the flume agent.

 

ERROR PollableSourceRunner:156 - Unhandled exception, logging and sleeping for 5000ms
org.apache.flume.ChannelException: Unable to put batch on required channel: FileChannel ch2 { dataDirs: [/var/lib/flume-ng/plugins.d/custom/datac2] }
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.keedio.flume.source.SQLSource$ChannelWriter.flush(SQLSource.java:195)
at org.keedio.flume.source.SQLSource$ChannelWriter.write(SQLSource.java:190)
at org.keedio.flume.source.SQLSource$ChannelWriter.write(SQLSource.java:190)
at java.io.Writer.write(Writer.java:192)
at java.io.PrintWriter.write(PrintWriter.java:456)
at java.io.PrintWriter.write(PrintWriter.java:473)
at com.opencsv.CSVWriter.writeNext(CSVWriter.java:263)
at com.opencsv.CSVWriter.writeAll(CSVWriter.java:151)
at org.keedio.flume.source.HibernateHelper.executeQuery(HibernateHelper.java:155)
at org.keedio.flume.source.SQLSource.process(SQLSource.java:127)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelFullException: The channel has reached it's capacity. This might be the result of a sink on the channel having too low of batch size,
a downstream system running slower than normal, or that the channel capacity is just too low. [channel=ch2]
at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:460)
at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)

 

The configuration settings that I am using for flume is as below.

 

agent1.channels.ch1.capacity=100000
agent1.channels.ch2.capacity=100000

agent1.sinks.sink1.morphlineId=morphline1
agent1.sinks.sink1.type=org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent1.sinks.sink1.channel=ch1
agent1.sinks.sink1.morphlineFile=/var/lib/flume-ng/plugins.d/custom/conf/morphlines.conf
agent1.sinks.sink1.batchSize=100
agent1.sinks.sink1.batchDurationMillis=1000

agent1.sinks.sink2.morphlineId=morphline1
agent1.sinks.sink2.type=org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent1.sinks.sink2.channel=ch2
agent1.sinks.sink2.morphlineFile=/var/lib/flume-ng/plugins.d/custom/conf/morphlines.conf
agent1.sinks.sink2.batchSize=100
agent1.sinks.sink2.batchDurationMillis=1000

agent1.sources=sql-source ods-sql-source
agent1.sinks=sink1 sink2
agent1.channels=ch1 ch2
#agent1.channels.ch1.type=memory
#agent1.channels.ch2.type=memory

agent1.sources.sql-source.type=org.keedio.flume.source.SQLSource
agent1.sources.ods-sql-source.type=org.keedio.flume.source.SQLSource

agent1.sources.sql-source.channels=ch1
agent1.sources.ods-sql-source.channels=ch2

#use FILE channel
agent1.channels.ch1.type = file
agent1.channels.ch1.transactionCapacity = 1000
agent1.channels.ch1.checkpointDir = /var/lib/flume-ng/plugins.d/custom/checkpointc1
#NOTE: point to your checkpoint directory
agent1.channels.ch1.dataDirs = /var/lib/flume-ng/plugins.d/custom/datac1
#NOTE: point to your data directory

#use FILE channel
agent1.channels.ch2.type = file
agent1.channels.ch2.transactionCapacity = 1000
agent1.channels.ch2.checkpointDir = /var/lib/flume-ng/plugins.d/custom/checkpointc2
#NOTE: point to your checkpoint directory
agent1.channels.ch2.dataDirs = /var/lib/flume-ng/plugins.d/custom/datac2
#NOTE: point to your data directory

 

 

 

Can you please help me on this?

 

Thanks,

Priya

8 REPLIES 8

avatar
Explorer

Hi, I am getting below error when reading logs from rabbitmq server to HDFS path using flume

 

 

 

Exception in thread "PollableSourceRunner-RabbitMQSource-rabbitmq-source1" java.lang.AbstractMethodError: org.apache.flume.source.rabbitmq.RabbitMQSource.getBackOffSleepIncrement()J
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:142)
at java.lang.Thread.run(Thread.java:748)

avatar
Champion

based on the error it looks like one of your channel ch1 is almost full

you might want to increase the channel capacity to higher for the below properties . 

is there specify reason that you want to use memory channel over file channel ? 

agent1.channels.ch1.capacity=100000

 

avatar
Rising Star
Thanks

avatar
Explorer

Exception in thread "PollableSourceRunner-RabbitMQSource-rabbitmq-source1" java.lang.AbstractMethodError: org.apache.flume.source.rabbitmq.RabbitMQSource.getBackOffSleepIncrement()J
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:142)
at java.lang.Thread.run(Thread.java:748)

 

 

Please someone help me to resolve this error.

 

# Configure the channel
type = SPILLABLEMEMORY
memoryCapacity = 10000
byteCapacity = 10000000
transactionCapacity = 10000
checkpointInterval = 300000
maxFileSize = 614643507
overflowCapacity = 1000000
minimumRequiredSpace = 324288000
write-timeout = 100

avatar
It seems like your sinks may not be draining fast enough....Do you see any sink errors in your logs?

If you look at the flume graphs for the event takes per second (by the sinks), vs events accepted (from the source), do you see any patterns?

-pd

avatar
Explorer

This is the only error that I am getting. Flume agent runs for about 15-20 minutes and pulls the log from rabbitmq into the hdfs path. Once it throws down the error, no file is being created. So every time we stop manually and the agent starts to get the log. 

 

Exception in thread "PollableSourceRunner-RabbitMQSource-rabbitmq-source1" java.lang.AbstractMethodError: org.apache.flume.source.rabbitmq.RabbitMQSource.getBackOffSleepIncrement()J
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:142)
at java.lang.Thread.run(Thread.java:748)

 

 

# Configure the sinsink1
a1.sinks.sink1.hdfs.path =<Path>
a1.sinks.sink1.hdfs.fileType = CompressedStream
a1.sinks.sink1.hdfs.codeC = gzip
a1.sinks.sink1.hdfs.filePrefix = XYZ1
a1.sinks.sink1.hdfs.rollInterval = 920
a1.sinks.sink1.hdfs.batchsize = 5000
a1.sinks.sink1.hdfs.rollSize = 1000000000
a1.sinks.sink1.hdfs.rollCount = 100000
a1.sinks.sink1.hdfs.callTimeout = 150000
a1.sinks.sink1.hdfs.idleTimeout = 1000

# Configure the sinsink2
a1.sinks.sink2.hdfs.path = <Path>
a1.sinks.sink2.hdfs.fileType = CompressedStream
a1.sinks.sink2.hdfs.codeC = gzip
a1.sinks.sink2.hdfs.filePrefix = XYZ2
a1.sinks.sink2.hdfs.rollInterval = 920
a1.sinks.sink2.hdfs.batchsize = 5000
a1.sinks.sink2.hdfs.rollSize = 1000000000
a1.sinks.sink2.hdfs.rollCount = 100000
a1.sinks.sink2.hdfs.callTimeout = 150000
a1.sinks.sink1.hdfs.idleTimeout = 1000

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = sink1 sink2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

 

avatar
Its still possible that your hdfs sinks are just not able to keep up with the rabbitmq source. Did you review the graphs to see the rate the sinks are draining vs the rate the source is adding?

Also, you are using sinkgroups, which makes deliver single threaded (e.g. one sink at a time). There is really no reason to use sinkgroups, you can remove that and will parallelize the delivery of events through sinks (2x sinks = 2x delivery rate, 4x sinks = 4x delivery rate).

-pd

avatar
Explorer

Hello - I am unable to see any other details.

It does pull the file in the first 1-2 hours, and later prevents it from dragging logs from rabbitmq into the hdfs path. After running for 1-2 hours, it throws an error in which no file is being created. So every time, we need to stop the agent and start it again.