Member since
01-08-2015
8
Posts
1
Kudos Received
0
Solutions
01-09-2015
07:24 AM
Thanks for your answer. I try the memory channel and it works. So I confirm the channel issue. I understand (and already know) that my use case is an anti-pattern. Usually the events we receive are below 1Mo (and it's already too big). I'm migrating a old workflow to a new one and I can't change the incoming data at that moment. I manage to make it work with the memory channel and It's ok for me now.
... View more
01-09-2015
06:59 AM
It seems that this error is caused by the file channel: I change the sink to "null channel" and the error still occurs. 2015-01-09 14:37:28,928 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Log is closed
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.file.Log.rollback(Log.java:701)
at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:614)
at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
at org.apache.flume.sink.NullSink.process(NullSink.java:104)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
... View more
01-08-2015
08:17 AM
I'm trying to do the following: [SpoolDir source] --> [Custom Interceptor] --> [FileChannel] --> [HDFS Sink] My flume.conf looks like that: # source / channel / sink
sebanalytics.sources = spooldir-source
sebanalytics.channels = file-channel
sebanalytics.sinks = hdfs-sink
# source definition
sebanalytics.sources.spooldir-source.type = spooldir
sebanalytics.sources.spooldir-source.spoolDir = /var/flume/in
sebanalytics.sources.spooldir-source.basenameHeader = true
sebanalytics.sources.spooldir-source.basenameHeaderKey = basename
# Max blob size: 1.5Go
sebanalytics.sources.spooldir-source.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
sebanalytics.sources.spooldir-source.deserializer.maxBlobLength = 1610000000
# Attach the interceptor to the source
sebanalytics.sources.spooldir-source.interceptors = json-interceptor
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.type = com.adagyo.flume.interceptor.JsonInterceptor$Builder
# Define event's headers. basenameHeader must be the same than source.basenameHeaderKey (defaults is basename)
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.basenameHeader = basename
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.resourceHeader = resources
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.ssidHeader = ssid
# channel definition
sebanalytics.channels.file-channel.type = file
sebanalytics.channels.file-channel.checkpointDir = /var/flume/checkpoints
sebanalytics.channels.file-channel.dataDirs = /var/flume/datadirs
# sink definition
sebanalytics.sinks.hdfs-sink.type = hdfs
sebanalytics.sinks.hdfs-sink.hdfs.path = hdfs://xx.xx.xx.xx/user/adagyo/flume/in
sebanalytics.sinks.hdfs-sink.hdfs.filePrefix = %{basename}
sebanalytics.sinks.hdfs-sink.hdfs.fileSuffix = .json
sebanalytics.sinks.hdfs-sink.hdfs.fileType = DataStream
sebanalytics.sinks.hdfs-sink.hdfs.writeFormat = Text
sebanalytics.sinks.hdfs-sink.hdfs.rollInterval = 0
sebanalytics.sinks.hdfs-sink.hdfs.rollSize = 0
sebanalytics.sinks.hdfs-sink.hdfs.rollCount = 0
sebanalytics.sinks.hdfs-sink.hdfs.batchSize = 1
# connect source and sink to channel
sebanalytics.sources.spooldir-source.channels = file-channel
sebanalytics.sinks.hdfs-sink.channel = file-channel File in the spoolDir looks like that: { "objectType" : [ { JSON Object 1 } , { JSON Object 2 }, ... ] } My custom interceptor modify the event body to that {JSON Object 1}
{JSON Object 2} ... Everything works fine with small files (less than 10MB). When I try with a bigger one (175MB), spoolDir, interceptor and channel works (the file in the spoolDir is renamed with the .COMPLETED suffix), but I get an error after that: 2015-01-08 15:49:30,351 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Log is closed
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.file.Log.getFlumeEventQueue(Log.java:575)
at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.<init>(FileChannel.java:431)
at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:351)
at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:376)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745) Thanks for your help!
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Solr
-
HDFS