Created on 01-04-2018 01:19 AM - edited 09-16-2022 05:42 AM
Hi,
I am getting below error while running the flume agent.
ERROR PollableSourceRunner:156 - Unhandled exception, logging and sleeping for 5000ms
org.apache.flume.ChannelException: Unable to put batch on required channel: FileChannel ch2 { dataDirs: [/var/lib/flume-ng/plugins.d/custom/datac2] }
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.keedio.flume.source.SQLSource$ChannelWriter.flush(SQLSource.java:195)
at org.keedio.flume.source.SQLSource$ChannelWriter.write(SQLSource.java:190)
at org.keedio.flume.source.SQLSource$ChannelWriter.write(SQLSource.java:190)
at java.io.Writer.write(Writer.java:192)
at java.io.PrintWriter.write(PrintWriter.java:456)
at java.io.PrintWriter.write(PrintWriter.java:473)
at com.opencsv.CSVWriter.writeNext(CSVWriter.java:263)
at com.opencsv.CSVWriter.writeAll(CSVWriter.java:151)
at org.keedio.flume.source.HibernateHelper.executeQuery(HibernateHelper.java:155)
at org.keedio.flume.source.SQLSource.process(SQLSource.java:127)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelFullException: The channel has reached it's capacity. This might be the result of a sink on the channel having too low of batch size,
a downstream system running slower than normal, or that the channel capacity is just too low. [channel=ch2]
at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:460)
at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
The configuration settings that I am using for flume is as below.
agent1.channels.ch1.capacity=100000
agent1.channels.ch2.capacity=100000
agent1.sinks.sink1.morphlineId=morphline1
agent1.sinks.sink1.type=org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent1.sinks.sink1.channel=ch1
agent1.sinks.sink1.morphlineFile=/var/lib/flume-ng/plugins.d/custom/conf/morphlines.conf
agent1.sinks.sink1.batchSize=100
agent1.sinks.sink1.batchDurationMillis=1000
agent1.sinks.sink2.morphlineId=morphline1
agent1.sinks.sink2.type=org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent1.sinks.sink2.channel=ch2
agent1.sinks.sink2.morphlineFile=/var/lib/flume-ng/plugins.d/custom/conf/morphlines.conf
agent1.sinks.sink2.batchSize=100
agent1.sinks.sink2.batchDurationMillis=1000
agent1.sources=sql-source ods-sql-source
agent1.sinks=sink1 sink2
agent1.channels=ch1 ch2
#agent1.channels.ch1.type=memory
#agent1.channels.ch2.type=memory
agent1.sources.sql-source.type=org.keedio.flume.source.SQLSource
agent1.sources.ods-sql-source.type=org.keedio.flume.source.SQLSource
agent1.sources.sql-source.channels=ch1
agent1.sources.ods-sql-source.channels=ch2
#use FILE channel
agent1.channels.ch1.type = file
agent1.channels.ch1.transactionCapacity = 1000
agent1.channels.ch1.checkpointDir = /var/lib/flume-ng/plugins.d/custom/checkpointc1
#NOTE: point to your checkpoint directory
agent1.channels.ch1.dataDirs = /var/lib/flume-ng/plugins.d/custom/datac1
#NOTE: point to your data directory
#use FILE channel
agent1.channels.ch2.type = file
agent1.channels.ch2.transactionCapacity = 1000
agent1.channels.ch2.checkpointDir = /var/lib/flume-ng/plugins.d/custom/checkpointc2
#NOTE: point to your checkpoint directory
agent1.channels.ch2.dataDirs = /var/lib/flume-ng/plugins.d/custom/datac2
#NOTE: point to your data directory
Can you please help me on this?
Thanks,
Priya
Created 06-07-2018 08:56 AM
Hi, I am getting below error when reading logs from rabbitmq server to HDFS path using flume
Exception in thread "PollableSourceRunner-RabbitMQSource-rabbitmq-source1" java.lang.AbstractMethodError: org.apache.flume.source.rabbitmq.RabbitMQSource.getBackOffSleepIncrement()J
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:142)
at java.lang.Thread.run(Thread.java:748)
Created 06-08-2018 11:32 PM
based on the error it looks like one of your channel ch1 is almost full
you might want to increase the channel capacity to higher for the below properties .
is there specify reason that you want to use memory channel over file channel ?
agent1.channels.ch1.capacity=100000
Created 06-10-2018 08:52 PM
Created 06-28-2018 10:46 AM
Exception in thread "PollableSourceRunner-RabbitMQSource-rabbitmq-source1" java.lang.AbstractMethodError: org.apache.flume.source.rabbitmq.RabbitMQSource.getBackOffSleepIncrement()J
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:142)
at java.lang.Thread.run(Thread.java:748)
Please someone help me to resolve this error.
# Configure the channel
type = SPILLABLEMEMORY
memoryCapacity = 10000
byteCapacity = 10000000
transactionCapacity = 10000
checkpointInterval = 300000
maxFileSize = 614643507
overflowCapacity = 1000000
minimumRequiredSpace = 324288000
write-timeout = 100
Created 06-28-2018 03:37 PM
Created 06-29-2018 08:12 AM
This is the only error that I am getting. Flume agent runs for about 15-20 minutes and pulls the log from rabbitmq into the hdfs path. Once it throws down the error, no file is being created. So every time we stop manually and the agent starts to get the log.
Exception in thread "PollableSourceRunner-RabbitMQSource-rabbitmq-source1" java.lang.AbstractMethodError: org.apache.flume.source.rabbitmq.RabbitMQSource.getBackOffSleepIncrement()J
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:142)
at java.lang.Thread.run(Thread.java:748)
# Configure the sinsink1
a1.sinks.sink1.hdfs.path =<Path>
a1.sinks.sink1.hdfs.fileType = CompressedStream
a1.sinks.sink1.hdfs.codeC = gzip
a1.sinks.sink1.hdfs.filePrefix = XYZ1
a1.sinks.sink1.hdfs.rollInterval = 920
a1.sinks.sink1.hdfs.batchsize = 5000
a1.sinks.sink1.hdfs.rollSize = 1000000000
a1.sinks.sink1.hdfs.rollCount = 100000
a1.sinks.sink1.hdfs.callTimeout = 150000
a1.sinks.sink1.hdfs.idleTimeout = 1000
# Configure the sinsink2
a1.sinks.sink2.hdfs.path = <Path>
a1.sinks.sink2.hdfs.fileType = CompressedStream
a1.sinks.sink2.hdfs.codeC = gzip
a1.sinks.sink2.hdfs.filePrefix = XYZ2
a1.sinks.sink2.hdfs.rollInterval = 920
a1.sinks.sink2.hdfs.batchsize = 5000
a1.sinks.sink2.hdfs.rollSize = 1000000000
a1.sinks.sink2.hdfs.rollCount = 100000
a1.sinks.sink2.hdfs.callTimeout = 150000
a1.sinks.sink1.hdfs.idleTimeout = 1000
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = sink1 sink2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
Created 06-29-2018 08:38 AM
Created 07-20-2018 09:24 AM
Hello - I am unable to see any other details.
It does pull the file in the first 1-2 hours, and later prevents it from dragging logs from rabbitmq into the hdfs path. After running for 1-2 hours, it throws an error in which no file is being created. So every time, we need to stop the agent and start it again.