Created on 08-23-2013 12:44 PM - edited 09-16-2022 01:47 AM
Hi,
currently I'm a bit confused about the different formats of Flume events and morphline records and I'm wondering how to modify the flume-event body in the morphline to write the modified text line to HDFS ?!?!
I just want to create a workflow consisting of
- source: spoolDir with LINE deserializer, Logfiles (syslog, apache, solr, ...) will be dropped to this folder
- morphlineInterceptor: parse the current flume event (one line of the logs) by grok regex's and send just the matched entries back to Flume
- memory channel
- HDFS sink: write the text output of the morphlineInterceptor to HDFS
The problem is, that I don't know how the modified text can be sent back from the morphline pipe to Flume for further processing and writing to HDFS.
I tried a morphline.conf including just a regex extracting the matches to some additional header fields of the morphline record. This regex is applied correctly, the flume logfile tells me so, but I just get an empty string back to flume and thereby there are only files on HDFS with size 0 Bytes.
Is there a morphline command similar to e.g. "loadSolr", to return a manually built string back, or more general: How can the output of the morphlineInterceptor pipe be accessed by Flume to write it to HDFS ?
A sample morphline.conf (for this morphlineInterceptor) including the steps to add some fields to the string being returned for further processing by Flume would be great
Additionally I would be very happy if someone can explain (or point me to some docs) the format of a Flume event and a morphline record, and the mapping/data transfer between both.
many thanks in advance....Gerd....
Created 08-24-2013 11:52 AM
This question has been answered already by W.Hoschek and I'll post the whole conversation here for the sake of completeness and to push the usage of the community forum(s). I'd be best to read from bottom up 😉
=======================================================
On output the Flume MorphlineInterceptor copies the _attachment_body record field (which must be of type byte[]) into the flume body.
The readLine morphline command reads the _attachment_body and then removes the _attachment_body field, which is why the flume event body becomes empty.
So an example morphline that sets the flume event body to "foo" is as follows:
…
{ readLine { charset : UTF-8 } }
… some processing of the line goes here
{ setValues { _attachment_body : [foo] } }
{ java { code: """
record.replaceValues("_attachment_body", record.getFirstValue("_attachment_body").toString().getBytes(Charsets.UTF_8));
return child.process(record);
} }
P.S. Unfortunately there's no toByteArray command available just yet, which is why this example uses a java command to convert the string to a byte array.
Wolfgang.
Created 08-24-2013 11:52 AM
This question has been answered already by W.Hoschek and I'll post the whole conversation here for the sake of completeness and to push the usage of the community forum(s). I'd be best to read from bottom up 😉
=======================================================
On output the Flume MorphlineInterceptor copies the _attachment_body record field (which must be of type byte[]) into the flume body.
The readLine morphline command reads the _attachment_body and then removes the _attachment_body field, which is why the flume event body becomes empty.
So an example morphline that sets the flume event body to "foo" is as follows:
…
{ readLine { charset : UTF-8 } }
… some processing of the line goes here
{ setValues { _attachment_body : [foo] } }
{ java { code: """
record.replaceValues("_attachment_body", record.getFirstValue("_attachment_body").toString().getBytes(Charsets.UTF_8));
return child.process(record);
} }
P.S. Unfortunately there's no toByteArray command available just yet, which is why this example uses a java command to convert the string to a byte array.
Wolfgang.