Created on 12-29-2014 01:27 AM - edited 09-16-2022 02:16 AM
I'm implementing a small hadoop cluster for a POC in my company. I'm trying to import files into HDFS with Flume. Each files contains JSON objects like this (1 "long" line per file):
{ "objectType" : [ { JSON Object } , { JSON Object }, ... ] }
"objectType" is the type the objects in the array (ex: events, users, ...).
These files will be processed later by several tasks depending on the "objectType".
I'm using the spoolDir source and the HDFS sink.
My questions are:
Is it possible to keep the source filename when flume write into HDFS (filenames are unique as they contains a timestamp and a UUID in their name)
Is there a way to set "deserializer.maxLineLength" to an unlimited value (instead of setting a high value)?
I really dn't want to loose data. Which channel is the best, JDBC or File? (I do not have a flow with high throughput)
My constraint is that I have to use flume out-of-the-box (no custom elements) as much as possible.
Thanks for your help!
Created 12-29-2014 09:57 AM
If you want each file to end up remaining whole, you can use the BlobDeserialzier[1] for the deserializer parameter of the SpoolingDirectorySource[2].:
a1.channels = c1 a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool a1.sources.src-1.fileHeader = true
a1.sources.src-1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
If you need to, set deserialzier.maxBlobLength to the maximum file size you'll be picking up. The default is 100 million bytes. This won't work for very large files as the entire file contents will get buffered into RAM.
The File channel is the best option for reliable data flow.
If you want the output file to have the same name is the input file, you can set the basenameHeader parameter to true. This will set a header in the flume event called basename. You can customize the name of the header by setting basenameHeaderKey. Then in your sink configuration, you can refer to the header value in the filePrefix with something like this:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/ a1.sinks.k1.hdfs.filePrefix = %{basename}-
a1.sinks.k1.hdfs.fileType = DataStream
HTH,
-Joey
[1] http://flume.apache.org/FlumeUserGuide.html#blobdeserializer
[2] http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
Created 12-29-2014 09:57 AM
If you want each file to end up remaining whole, you can use the BlobDeserialzier[1] for the deserializer parameter of the SpoolingDirectorySource[2].:
a1.channels = c1 a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool a1.sources.src-1.fileHeader = true
a1.sources.src-1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
If you need to, set deserialzier.maxBlobLength to the maximum file size you'll be picking up. The default is 100 million bytes. This won't work for very large files as the entire file contents will get buffered into RAM.
The File channel is the best option for reliable data flow.
If you want the output file to have the same name is the input file, you can set the basenameHeader parameter to true. This will set a header in the flume event called basename. You can customize the name of the header by setting basenameHeaderKey. Then in your sink configuration, you can refer to the header value in the filePrefix with something like this:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/ a1.sinks.k1.hdfs.filePrefix = %{basename}-
a1.sinks.k1.hdfs.fileType = DataStream
HTH,
-Joey
[1] http://flume.apache.org/FlumeUserGuide.html#blobdeserializer
[2] http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source