- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Move files from a spooling directory to HDFS with flume
- Labels:
-
Apache Flume
-
Apache Hadoop
-
HDFS
Created on ‎12-29-2014 01:27 AM - edited ‎09-16-2022 02:16 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I'm implementing a small hadoop cluster for a POC in my company. I'm trying to import files into HDFS with Flume. Each files contains JSON objects like this (1 "long" line per file):
{ "objectType" : [ { JSON Object } , { JSON Object }, ... ] }
"objectType" is the type the objects in the array (ex: events, users, ...).
These files will be processed later by several tasks depending on the "objectType".
I'm using the spoolDir source and the HDFS sink.
My questions are:
Is it possible to keep the source filename when flume write into HDFS (filenames are unique as they contains a timestamp and a UUID in their name)
Is there a way to set "deserializer.maxLineLength" to an unlimited value (instead of setting a high value)?
I really dn't want to loose data. Which channel is the best, JDBC or File? (I do not have a flow with high throughput)
My constraint is that I have to use flume out-of-the-box (no custom elements) as much as possible.
Thanks for your help!
Created ‎12-29-2014 09:57 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
If you want each file to end up remaining whole, you can use the BlobDeserialzier[1] for the deserializer parameter of the SpoolingDirectorySource[2].:
a1.channels = c1 a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool a1.sources.src-1.fileHeader = true
a1.sources.src-1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
If you need to, set deserialzier.maxBlobLength to the maximum file size you'll be picking up. The default is 100 million bytes. This won't work for very large files as the entire file contents will get buffered into RAM.
The File channel is the best option for reliable data flow.
If you want the output file to have the same name is the input file, you can set the basenameHeader parameter to true. This will set a header in the flume event called basename. You can customize the name of the header by setting basenameHeaderKey. Then in your sink configuration, you can refer to the header value in the filePrefix with something like this:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/ a1.sinks.k1.hdfs.filePrefix = %{basename}-
a1.sinks.k1.hdfs.fileType = DataStream
HTH,
-Joey
[1] http://flume.apache.org/FlumeUserGuide.html#blobdeserializer
[2] http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
Created ‎12-29-2014 09:57 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
If you want each file to end up remaining whole, you can use the BlobDeserialzier[1] for the deserializer parameter of the SpoolingDirectorySource[2].:
a1.channels = c1 a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool a1.sources.src-1.fileHeader = true
a1.sources.src-1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
If you need to, set deserialzier.maxBlobLength to the maximum file size you'll be picking up. The default is 100 million bytes. This won't work for very large files as the entire file contents will get buffered into RAM.
The File channel is the best option for reliable data flow.
If you want the output file to have the same name is the input file, you can set the basenameHeader parameter to true. This will set a header in the flume event called basename. You can customize the name of the header by setting basenameHeaderKey. Then in your sink configuration, you can refer to the header value in the filePrefix with something like this:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/ a1.sinks.k1.hdfs.filePrefix = %{basename}-
a1.sinks.k1.hdfs.fileType = DataStream
HTH,
-Joey
[1] http://flume.apache.org/FlumeUserGuide.html#blobdeserializer
[2] http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
