Reply
Highlighted
Champion Alumni
Posts: 196
Registered: ‎11-18-2014
Accepted Solution

Move files from a spooling directory to HDFS with flume

I'm implementing a small hadoop cluster for a POC in my company. I'm trying to import files into HDFS with Flume. Each files contains JSON objects like this (1 "long" line per file):

{ "objectType" : [ { JSON Object } , { JSON Object }, ... ] }

"objectType" is the type the objects in the array (ex: events, users, ...).

These files will be processed later by several tasks depending on the "objectType".

I'm using the spoolDir source and the HDFS sink.

My questions are:

  • Is it possible to keep the source filename when flume write into HDFS (filenames are unique as they contains a timestamp and a UUID in their name)

  • Is there a way to set "deserializer.maxLineLength" to an unlimited value (instead of setting a high value)?

  • I really dn't want to loose data. Which channel is the best, JDBC or File? (I do not have a flow with high throughput)

My constraint is that I have to use flume out-of-the-box (no custom elements) as much as possible.

Thanks for your help!

GHERMAN Alina
Cloudera Employee
Posts: 26
Registered: ‎07-08-2013

Re: Move files from a spooling directory to HDFS with flume

If you want each file to end up remaining whole, you can use the BlobDeserialzier[1] for the deserializer parameter of the SpoolingDirectorySource[2].:

 

a1.channels = c1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
a1.sources.src-1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder

If you need to, set deserialzier.maxBlobLength to the maximum file size you'll be picking up. The default is 100 million bytes. This won't work for very large files as the entire file contents will get buffered into RAM.

 

The File channel is the best option for reliable data flow.

 

If you want the output file to have the same name is the input file, you can set the basenameHeader parameter to true. This will set a header in the flume event called basename. You can customize the name of the header by setting basenameHeaderKey. Then in your sink configuration, you can refer to the header value in the filePrefix with something like this:

 

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/
a1.sinks.k1.hdfs.filePrefix = %{basename}-
a1.sinks.k1.hdfs.fileType = DataStream

 

HTH,

 

-Joey

 

 

 

[1] http://flume.apache.org/FlumeUserGuide.html#blobdeserializer

[2] http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source

Announcements
New solutions