Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Flume: HDFS sink: Can't write large files

avatar
Explorer

I'm trying to do the following: [SpoolDir source] --> [Custom Interceptor] --> [FileChannel] --> [HDFS Sink]

My flume.conf looks like that:

# source / channel / sink
sebanalytics.sources = spooldir-source
sebanalytics.channels = file-channel
sebanalytics.sinks = hdfs-sink

# source definition
sebanalytics.sources.spooldir-source.type = spooldir
sebanalytics.sources.spooldir-source.spoolDir = /var/flume/in
sebanalytics.sources.spooldir-source.basenameHeader = true
sebanalytics.sources.spooldir-source.basenameHeaderKey = basename
# Max blob size: 1.5Go
sebanalytics.sources.spooldir-source.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
sebanalytics.sources.spooldir-source.deserializer.maxBlobLength = 1610000000
# Attach the interceptor to the source
sebanalytics.sources.spooldir-source.interceptors = json-interceptor
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.type = com.adagyo.flume.interceptor.JsonInterceptor$Builder
# Define event's headers. basenameHeader must be the same than source.basenameHeaderKey (defaults is basename)
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.basenameHeader = basename
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.resourceHeader = resources
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.ssidHeader = ssid

# channel definition
sebanalytics.channels.file-channel.type = file
sebanalytics.channels.file-channel.checkpointDir = /var/flume/checkpoints
sebanalytics.channels.file-channel.dataDirs = /var/flume/datadirs

# sink definition
sebanalytics.sinks.hdfs-sink.type = hdfs
sebanalytics.sinks.hdfs-sink.hdfs.path = hdfs://xx.xx.xx.xx/user/adagyo/flume/in
sebanalytics.sinks.hdfs-sink.hdfs.filePrefix = %{basename}
sebanalytics.sinks.hdfs-sink.hdfs.fileSuffix = .json
sebanalytics.sinks.hdfs-sink.hdfs.fileType = DataStream
sebanalytics.sinks.hdfs-sink.hdfs.writeFormat = Text
sebanalytics.sinks.hdfs-sink.hdfs.rollInterval = 0
sebanalytics.sinks.hdfs-sink.hdfs.rollSize = 0
sebanalytics.sinks.hdfs-sink.hdfs.rollCount = 0
sebanalytics.sinks.hdfs-sink.hdfs.batchSize = 1

# connect source and sink to channel
sebanalytics.sources.spooldir-source.channels = file-channel
sebanalytics.sinks.hdfs-sink.channel = file-channel

 

 

File in the spoolDir looks like that:

{ "objectType" : [ { JSON Object 1 } , { JSON Object 2 }, ... ] }

 My custom interceptor modify the event body to that

{JSON Object 1}
{JSON Object 2}
...

Everything works fine with small files (less than 10MB).

When I try with a bigger one (175MB), spoolDir, interceptor and channel works (the file in the spoolDir is renamed with the .COMPLETED suffix), but I get an error after that:

2015-01-08 15:49:30,351 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Log is closed
	at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
	at org.apache.flume.channel.file.Log.getFlumeEventQueue(Log.java:575)
	at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.<init>(FileChannel.java:431)
	at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:351)
	at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:376)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)

 

Thanks for your help!

1 ACCEPTED SOLUTION

avatar
Rising Star

You probably need to adjust the maxFileSize and minimumSpaceRequired settings on the file channel[1].

 

FWIW, transfering large files with Flume is an anti-pattern. Flume is designed for event/log transport not large file transport. You might want to check out a new Apache project called Apache NiFi[2] that is better suited to large file transfer. There's a quick how-to blog post available here to get you started:

 

http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/

 

-Joey

 

[1] http://flume.apache.org/FlumeUserGuide.html#file-channel

[2] http://nifi.incubator.apache.org

View solution in original post

4 REPLIES 4

avatar
Explorer

It seems that this error is caused by the file channel: I change the sink to "null channel" and the error still occurs.

 

2015-01-09 14:37:28,928 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Log is closed
	at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
	at org.apache.flume.channel.file.Log.rollback(Log.java:701)
	at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:614)
	at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
	at org.apache.flume.sink.NullSink.process(NullSink.java:104)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)

 

avatar
Rising Star

You probably need to adjust the maxFileSize and minimumSpaceRequired settings on the file channel[1].

 

FWIW, transfering large files with Flume is an anti-pattern. Flume is designed for event/log transport not large file transport. You might want to check out a new Apache project called Apache NiFi[2] that is better suited to large file transfer. There's a quick how-to blog post available here to get you started:

 

http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/

 

-Joey

 

[1] http://flume.apache.org/FlumeUserGuide.html#file-channel

[2] http://nifi.incubator.apache.org

avatar
Explorer

Thanks for your answer. I try the memory channel and it works. So I confirm the channel issue.

 

I understand (and already know) that my use case is an anti-pattern. Usually the events we receive are below 1Mo (and it's already too big). I'm migrating a old workflow to a new one and I can't change the incoming data at that moment.

 

I manage to make it work with the memory channel and It's ok for me now.

avatar
Rising Star

Keep in mind that with the MemoryChannel you lose any records in the channel if Flume crashes or the system reboots.