Reply
Expert Contributor
Posts: 121
Registered: ‎08-07-2017

Getting error while running flume agent

[ Edited ]

Hi,

 

I am getting below error while running the flume agent.

 

ERROR PollableSourceRunner:156 - Unhandled exception, logging and sleeping for 5000ms
org.apache.flume.ChannelException: Unable to put batch on required channel: FileChannel ch2 { dataDirs: [/var/lib/flume-ng/plugins.d/custom/datac2] }
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.keedio.flume.source.SQLSource$ChannelWriter.flush(SQLSource.java:195)
at org.keedio.flume.source.SQLSource$ChannelWriter.write(SQLSource.java:190)
at org.keedio.flume.source.SQLSource$ChannelWriter.write(SQLSource.java:190)
at java.io.Writer.write(Writer.java:192)
at java.io.PrintWriter.write(PrintWriter.java:456)
at java.io.PrintWriter.write(PrintWriter.java:473)
at com.opencsv.CSVWriter.writeNext(CSVWriter.java:263)
at com.opencsv.CSVWriter.writeAll(CSVWriter.java:151)
at org.keedio.flume.source.HibernateHelper.executeQuery(HibernateHelper.java:155)
at org.keedio.flume.source.SQLSource.process(SQLSource.java:127)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelFullException: The channel has reached it's capacity. This might be the result of a sink on the channel having too low of batch size,
a downstream system running slower than normal, or that the channel capacity is just too low. [channel=ch2]
at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:460)
at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)

 

The configuration settings that I am using for flume is as below.

 

agent1.channels.ch1.capacity=100000
agent1.channels.ch2.capacity=100000

agent1.sinks.sink1.morphlineId=morphline1
agent1.sinks.sink1.type=org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent1.sinks.sink1.channel=ch1
agent1.sinks.sink1.morphlineFile=/var/lib/flume-ng/plugins.d/custom/conf/morphlines.conf
agent1.sinks.sink1.batchSize=100
agent1.sinks.sink1.batchDurationMillis=1000

agent1.sinks.sink2.morphlineId=morphline1
agent1.sinks.sink2.type=org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent1.sinks.sink2.channel=ch2
agent1.sinks.sink2.morphlineFile=/var/lib/flume-ng/plugins.d/custom/conf/morphlines.conf
agent1.sinks.sink2.batchSize=100
agent1.sinks.sink2.batchDurationMillis=1000

agent1.sources=sql-source ods-sql-source
agent1.sinks=sink1 sink2
agent1.channels=ch1 ch2
#agent1.channels.ch1.type=memory
#agent1.channels.ch2.type=memory

agent1.sources.sql-source.type=org.keedio.flume.source.SQLSource
agent1.sources.ods-sql-source.type=org.keedio.flume.source.SQLSource

agent1.sources.sql-source.channels=ch1
agent1.sources.ods-sql-source.channels=ch2

#use FILE channel
agent1.channels.ch1.type = file
agent1.channels.ch1.transactionCapacity = 1000
agent1.channels.ch1.checkpointDir = /var/lib/flume-ng/plugins.d/custom/checkpointc1
#NOTE: point to your checkpoint directory
agent1.channels.ch1.dataDirs = /var/lib/flume-ng/plugins.d/custom/datac1
#NOTE: point to your data directory

#use FILE channel
agent1.channels.ch2.type = file
agent1.channels.ch2.transactionCapacity = 1000
agent1.channels.ch2.checkpointDir = /var/lib/flume-ng/plugins.d/custom/checkpointc2
#NOTE: point to your checkpoint directory
agent1.channels.ch2.dataDirs = /var/lib/flume-ng/plugins.d/custom/datac2
#NOTE: point to your data directory

 

 

 

Can you please help me on this?

 

Thanks,

Priya

New Contributor
Posts: 4
Registered: ‎06-07-2018

Re: Getting error while running flume agent

Hi, I am getting below error when reading logs from rabbitmq server to HDFS path using flume

 

 

 

Exception in thread "PollableSourceRunner-RabbitMQSource-rabbitmq-source1" java.lang.AbstractMethodError: org.apache.flume.source.rabbitmq.RabbitMQSource.getBackOffSleepIncrement()J
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:142)
at java.lang.Thread.run(Thread.java:748)

Champion
Posts: 753
Registered: ‎05-16-2016

Re: Getting error while running flume agent

based on the error it looks like one of your channel ch1 is almost full

you might want to increase the channel capacity to higher for the below properties . 

is there specify reason that you want to use memory channel over file channel ? 

agent1.channels.ch1.capacity=100000

 

Expert Contributor
Posts: 121
Registered: ‎08-07-2017

Re: Getting error while running flume agent

Thanks
New Contributor
Posts: 4
Registered: ‎06-07-2018

Re: Getting error while running flume agent

Exception in thread "PollableSourceRunner-RabbitMQSource-rabbitmq-source1" java.lang.AbstractMethodError: org.apache.flume.source.rabbitmq.RabbitMQSource.getBackOffSleepIncrement()J
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:142)
at java.lang.Thread.run(Thread.java:748)

 

 

Please someone help me to resolve this error.

 

# Configure the channel
type = SPILLABLEMEMORY
memoryCapacity = 10000
byteCapacity = 10000000
transactionCapacity = 10000
checkpointInterval = 300000
maxFileSize = 614643507
overflowCapacity = 1000000
minimumRequiredSpace = 324288000
write-timeout = 100

Cloudera Employee
Posts: 246
Registered: ‎01-09-2014

Re: Getting error while running flume agent

It seems like your sinks may not be draining fast enough....Do you see any sink errors in your logs?

If you look at the flume graphs for the event takes per second (by the sinks), vs events accepted (from the source), do you see any patterns?

-pd
New Contributor
Posts: 4
Registered: ‎06-07-2018

Re: Getting error while running flume agent

This is the only error that I am getting. Flume agent runs for about 15-20 minutes and pulls the log from rabbitmq into the hdfs path. Once it throws down the error, no file is being created. So every time we stop manually and the agent starts to get the log. 

 

Exception in thread "PollableSourceRunner-RabbitMQSource-rabbitmq-source1" java.lang.AbstractMethodError: org.apache.flume.source.rabbitmq.RabbitMQSource.getBackOffSleepIncrement()J
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:142)
at java.lang.Thread.run(Thread.java:748)

 

 

# Configure the sinsink1
a1.sinks.sink1.hdfs.path =<Path>
a1.sinks.sink1.hdfs.fileType = CompressedStream
a1.sinks.sink1.hdfs.codeC = gzip
a1.sinks.sink1.hdfs.filePrefix = XYZ1
a1.sinks.sink1.hdfs.rollInterval = 920
a1.sinks.sink1.hdfs.batchsize = 5000
a1.sinks.sink1.hdfs.rollSize = 1000000000
a1.sinks.sink1.hdfs.rollCount = 100000
a1.sinks.sink1.hdfs.callTimeout = 150000
a1.sinks.sink1.hdfs.idleTimeout = 1000

# Configure the sinsink2
a1.sinks.sink2.hdfs.path = <Path>
a1.sinks.sink2.hdfs.fileType = CompressedStream
a1.sinks.sink2.hdfs.codeC = gzip
a1.sinks.sink2.hdfs.filePrefix = XYZ2
a1.sinks.sink2.hdfs.rollInterval = 920
a1.sinks.sink2.hdfs.batchsize = 5000
a1.sinks.sink2.hdfs.rollSize = 1000000000
a1.sinks.sink2.hdfs.rollCount = 100000
a1.sinks.sink2.hdfs.callTimeout = 150000
a1.sinks.sink1.hdfs.idleTimeout = 1000

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = sink1 sink2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

 

Cloudera Employee
Posts: 246
Registered: ‎01-09-2014

Re: Getting error while running flume agent

Its still possible that your hdfs sinks are just not able to keep up with the rabbitmq source. Did you review the graphs to see the rate the sinks are draining vs the rate the source is adding?

Also, you are using sinkgroups, which makes deliver single threaded (e.g. one sink at a time). There is really no reason to use sinkgroups, you can remove that and will parallelize the delivery of events through sinks (2x sinks = 2x delivery rate, 4x sinks = 4x delivery rate).

-pd
New Contributor
Posts: 4
Registered: ‎06-07-2018

Re: Getting error while running flume agent

Hello - I am unable to see any other details.

It does pull the file in the first 1-2 hours, and later prevents it from dragging logs from rabbitmq into the hdfs path. After running for 1-2 hours, it throws an error in which no file is being created. So every time, we need to stop the agent and start it again.

Announcements
New solutions