Created 07-26-2016 09:28 AM
Hi,
I'm using Flume to collect data from a Spool Directory. My configuration is as follows:
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel2 agent1.sources.source1.channels = channel2 agent1.sinks.sink1.channel = channel2 agent1.sources.source1.type = spooldir agent1.sources.source1.basenameHeader = true agent1.sources.source1.spoolDir = /root/flume_example/spooldir agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path = /user/root/flume agent1.sinks.sink1.hdfs.filePrefix = %{basename} agent1.sinks.sink1.hdfs.fileSuffix = .csv agent1.sinks.sink1.hdfs.idleTimeout = 5 agent1.sinks.sink1.hdfs.rollSize = 0 agent1.sinks.sink1.hdfs.rollCount = 100000 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.channels.channel2.type = file
When placing 43MB file in spooldir, flume starts writing files into HDFS Directory /user/root/flume:
-rw-r--r-- 3 root hdfs 7.9 M 2016-07-26 11:10 /user/root/flume/filename.csv.1469524239209.csv -rw-r--r-- 3 root hdfs 7.6 M 2016-07-26 11:11 /user/root/flume/filename.csv.1469524239210.csv
But a java.lang.OutOfMemoryError: Java heap space error is raised:
ERROR channel.ChannelProcessor: Error while writing to required channel: FileChannel channel2 { dataDirs: [/root/.flume/file-channel/data] } java.lang.OutOfMemoryError: Java heap space at java.util.HashMap.resize(HashMap.java:703) at java.util.HashMap.putVal(HashMap.java:662) at java.util.HashMap.put(HashMap.java:611) at org.apache.flume.channel.file.EventQueueBackingStoreFile.put(EventQueueBackingStoreFile.java:338) at org.apache.flume.channel.file.FlumeEventQueue.set(FlumeEventQueue.java:287) at org.apache.flume.channel.file.FlumeEventQueue.add(FlumeEventQueue.java:317) at org.apache.flume.channel.file.FlumeEventQueue.addTail(FlumeEventQueue.java:211) at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doCommit(FileChannel.java:553) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/07/26 11:10:59 ERROR source.SpoolDirectorySource: FATAL: Spool Directory source source1: { spoolDir: /root/flume_example/spooldir }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing. java.lang.OutOfMemoryError: Java heap space at java.util.HashMap.resize(HashMap.java:703) at java.util.HashMap.putVal(HashMap.java:662) at java.util.HashMap.put(HashMap.java:611) at org.apache.flume.channel.file.EventQueueBackingStoreFile.put(EventQueueBackingStoreFile.java:338) at org.apache.flume.channel.file.FlumeEventQueue.set(FlumeEventQueue.java:287) at org.apache.flume.channel.file.FlumeEventQueue.add(FlumeEventQueue.java:317) at org.apache.flume.channel.file.FlumeEventQueue.addTail(FlumeEventQueue.java:211) at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doCommit(FileChannel.java:553) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Any idea how can I fix this issue ?
Thanks.
Created 07-26-2016 10:52 AM
Problem solved, I changed the channel type from file to memory
agent1.channels.channel2.type = memory
Answers about how to make it work with a channel type file are welcome.
Created 07-26-2016 10:52 AM
Problem solved, I changed the channel type from file to memory
agent1.channels.channel2.type = memory
Answers about how to make it work with a channel type file are welcome.
Created 07-26-2016 02:58 PM
Default transactionCapacity for file channel is 10 000. For memory channel - 100
Thats why it works for you. Add transactionCapacity property to your file channel or increase memory available for flume process (like -Xmx1024m)
Created 07-26-2016 06:00 PM
Hi @Zaher,
Depending on your data you should care about the channel you choose. The memory-channel is simple and easy, but data is lost when the Flume-agent crashes (OutOfMemory) most likely, or power/hardware-issues also likely... There are channels with higher durability for your data. The filechannel is very durable when underlaying storage is redundant as well.
Take a look at the flume-channels and there configuration options.
For your OutOfMem-problem you can decrease the transaction and batch capacity and increase the heap in the flume-env config in Ambari as @Michael Miklavcic suggests.
Created 07-27-2016 09:17 AM
Thank you @Michael M and @Alexander Bij for your valuable help.