Created on 01-08-2015 08:17 AM - edited 09-16-2022 02:18 AM
I'm trying to do the following: [SpoolDir source] --> [Custom Interceptor] --> [FileChannel] --> [HDFS Sink]
My flume.conf looks like that:
# source / channel / sink sebanalytics.sources = spooldir-source sebanalytics.channels = file-channel sebanalytics.sinks = hdfs-sink # source definition sebanalytics.sources.spooldir-source.type = spooldir sebanalytics.sources.spooldir-source.spoolDir = /var/flume/in sebanalytics.sources.spooldir-source.basenameHeader = true sebanalytics.sources.spooldir-source.basenameHeaderKey = basename # Max blob size: 1.5Go sebanalytics.sources.spooldir-source.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder sebanalytics.sources.spooldir-source.deserializer.maxBlobLength = 1610000000 # Attach the interceptor to the source sebanalytics.sources.spooldir-source.interceptors = json-interceptor sebanalytics.sources.spooldir-source.interceptors.json-interceptor.type = com.adagyo.flume.interceptor.JsonInterceptor$Builder # Define event's headers. basenameHeader must be the same than source.basenameHeaderKey (defaults is basename) sebanalytics.sources.spooldir-source.interceptors.json-interceptor.basenameHeader = basename sebanalytics.sources.spooldir-source.interceptors.json-interceptor.resourceHeader = resources sebanalytics.sources.spooldir-source.interceptors.json-interceptor.ssidHeader = ssid # channel definition sebanalytics.channels.file-channel.type = file sebanalytics.channels.file-channel.checkpointDir = /var/flume/checkpoints sebanalytics.channels.file-channel.dataDirs = /var/flume/datadirs # sink definition sebanalytics.sinks.hdfs-sink.type = hdfs sebanalytics.sinks.hdfs-sink.hdfs.path = hdfs://xx.xx.xx.xx/user/adagyo/flume/in sebanalytics.sinks.hdfs-sink.hdfs.filePrefix = %{basename} sebanalytics.sinks.hdfs-sink.hdfs.fileSuffix = .json sebanalytics.sinks.hdfs-sink.hdfs.fileType = DataStream sebanalytics.sinks.hdfs-sink.hdfs.writeFormat = Text sebanalytics.sinks.hdfs-sink.hdfs.rollInterval = 0 sebanalytics.sinks.hdfs-sink.hdfs.rollSize = 0 sebanalytics.sinks.hdfs-sink.hdfs.rollCount = 0 sebanalytics.sinks.hdfs-sink.hdfs.batchSize = 1 # connect source and sink to channel sebanalytics.sources.spooldir-source.channels = file-channel sebanalytics.sinks.hdfs-sink.channel = file-channel
File in the spoolDir looks like that:
{ "objectType" : [ { JSON Object 1 } , { JSON Object 2 }, ... ] }
My custom interceptor modify the event body to that
{JSON Object 1} {JSON Object 2}
...
Everything works fine with small files (less than 10MB).
When I try with a bigger one (175MB), spoolDir, interceptor and channel works (the file in the spoolDir is renamed with the .COMPLETED suffix), but I get an error after that:
2015-01-08 15:49:30,351 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows. java.lang.IllegalStateException: Log is closed at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.channel.file.Log.getFlumeEventQueue(Log.java:575) at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.<init>(FileChannel.java:431) at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:351) at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:376) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745)
Thanks for your help!
Created 01-09-2015 07:09 AM
You probably need to adjust the maxFileSize and minimumSpaceRequired settings on the file channel[1].
FWIW, transfering large files with Flume is an anti-pattern. Flume is designed for event/log transport not large file transport. You might want to check out a new Apache project called Apache NiFi[2] that is better suited to large file transfer. There's a quick how-to blog post available here to get you started:
http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
-Joey
[1] http://flume.apache.org/FlumeUserGuide.html#file-channel
Created 01-09-2015 06:59 AM
It seems that this error is caused by the file channel: I change the sink to "null channel" and the error still occurs.
2015-01-09 14:37:28,928 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows. java.lang.IllegalStateException: Log is closed at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.channel.file.Log.rollback(Log.java:701) at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:614) at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168) at org.apache.flume.sink.NullSink.process(NullSink.java:104) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745)
Created 01-09-2015 07:09 AM
You probably need to adjust the maxFileSize and minimumSpaceRequired settings on the file channel[1].
FWIW, transfering large files with Flume is an anti-pattern. Flume is designed for event/log transport not large file transport. You might want to check out a new Apache project called Apache NiFi[2] that is better suited to large file transfer. There's a quick how-to blog post available here to get you started:
http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
-Joey
[1] http://flume.apache.org/FlumeUserGuide.html#file-channel
Created 01-09-2015 07:24 AM
Thanks for your answer. I try the memory channel and it works. So I confirm the channel issue.
I understand (and already know) that my use case is an anti-pattern. Usually the events we receive are below 1Mo (and it's already too big). I'm migrating a old workflow to a new one and I can't change the incoming data at that moment.
I manage to make it work with the memory channel and It's ok for me now.
Created 01-09-2015 07:28 AM
Keep in mind that with the MemoryChannel you lose any records in the channel if Flume crashes or the system reboots.