Support Questions

Find answers, ask questions, and share your expertise

got confused with Flume spoolDirsrc=> morphlineInterceptor => HDFS sink

avatar
Guru

Hi,

currently I'm a bit confused about the different formats of Flume events and morphline records and I'm wondering how to modify the flume-event body in the morphline to write the modified text line to HDFS ?!?!

I just want to create a workflow consisting of

- source: spoolDir with LINE deserializer, Logfiles (syslog, apache, solr, ...) will be dropped to this folder

- morphlineInterceptor: parse the current flume event (one line of the logs) by grok regex's and send just the matched entries back to Flume

- memory channel

- HDFS sink: write the text output of the morphlineInterceptor to HDFS

 

The problem is, that I don't know how the modified text can be sent back from the morphline pipe to Flume for further processing and writing to HDFS.

I tried a morphline.conf including just a regex extracting the matches to some additional header fields of the morphline record. This regex is applied correctly, the flume logfile tells me so, but I just get an empty string back to flume and thereby there are only files on HDFS with size 0 Bytes.

Is there a morphline command similar to e.g. "loadSolr", to return a manually built string back, or more general: How can the output of the morphlineInterceptor pipe be accessed by Flume to write it to HDFS ?

 

A sample morphline.conf (for this morphlineInterceptor) including the steps to add some fields to the string being returned for further processing by Flume would be great Smiley Happy

 

Additionally I would be very happy if someone can explain (or point me to some docs) the format of a Flume event and a morphline record, and the mapping/data transfer between both.

 

many thanks in advance....Gerd....

1 ACCEPTED SOLUTION

avatar
Guru

This question has been answered already by W.Hoschek and I'll post the whole conversation here for the sake of completeness and to push the usage of the community forum(s). I'd be best to read from bottom up 😉

 

=======================================================

 

On output the Flume MorphlineInterceptor copies the _attachment_body record field (which must be of type byte[]) into the flume body.
The readLine morphline command reads the _attachment_body and then removes the _attachment_body field, which is why the flume event body becomes empty.
So an example morphline that sets the flume event body to "foo" is as follows:

{ readLine { charset : UTF-8 } }
… some processing of the line goes here
{ setValues { _attachment_body : [foo] } }
{ java { code: """
record.replaceValues("_attachment_body",   record.getFirstValue("_attachment_body").toString().getBytes(Charsets.UTF_8));
return child.process(record);
} }

P.S. Unfortunately there's no toByteArray command available just yet, which is why this example uses a java command to convert the string to a byte array.

Wolfgang.

 
On Aug 24, 2013, at 8:06 AM, Gerd Koenig wrote:
 > Hi Wolfgang,
>
> many thanks for your explanations and the links. The header manipulation stuff sound evident and works as expected in my simple example, but more probably I can't see the forest but the trees to solve my initial problem 😉
>
> I tried a veeery simple morphline to test the workflow, but I'm missing a spontaneous intuition of how to adjust the flume event body (the text which shall be written to HDFS).
> Here are the details of my simple Flume flow test:
> a) content of input file (this file will be put in folder where the Flume-spoolSrc listens to)
> 2013-08-22 13:52:56,281 INFO user=alfred-e-neumann blub=1 blob=abc blib=100
> b) configured interceptor
> agent.sources.spoolSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
> agent.sources.spoolSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphlineinterceptor.conf
> agent.sources.spoolSrc.interceptors.morphlineinterceptor.morphlineId = morphlineSourceInterceptor
> c) content of morphlineinterceptor.conf
> morphlines : [
>   {
>     id : morphlineSourceInterceptor
>     importCommands : ["com.cloudera.**"]
>     commands : [
>             {
>             readLine {
>                charset : UTF-8
>             }
>       }
>       { logInfo { format : " ############## START ##############" } }
>       { logInfo { format : " input: {}", args : ["@{}"] } }
>       { addValues { myheader : myheader-value } }
>       { logInfo { format : "(1) : {}", args : ["@{}"] } }
>     ]
>   }
> ]
>
> After putting the input file in the spool dir, I would expect to receive a file on HDFS including the original text of the input file since the incoming record in the morphline will not be modified and thereby shall be sent back to flume unmodified (the output record you mentioned is the text which will be piped back to the Flume event body?). But........at the end I have an empty file on HDFS .. ?!?!
>
> The flume log includes:
> ""
> 2013-08-24 16:18:16,183 INFO com.cloudera.cdk.morphline.stdlib.LogInfoBuilder$LogInfo:  ############## START ##############
> 2013-08-24 16:18:16,183 DEBUG com.cloudera.cdk.morphline.stdlib.LogInfoBuilder$LogInfo: beforeProcess()
> 2013-08-24 16:18:16,186 INFO com.cloudera.cdk.morphline.stdlib.LogInfoBuilder$LogInfo:  input: [{message=[2013-08-22 13:52:56,281 INFO user=alfred-e-neumann blub=1 blob=abc blib=100 ]}]
> 2013-08-24 16:18:16,186 DEBUG com.cloudera.cdk.morphline.stdlib.AddValuesBuilder$AddValues: beforeProcess()
> 2013-08-24 16:18:16,186 DEBUG com.cloudera.cdk.morphline.stdlib.LogInfoBuilder$LogInfo: beforeProcess()
> 2013-08-24 16:18:16,187 INFO com.cloudera.cdk.morphline.stdlib.LogInfoBuilder$LogInfo: (1) : [{message=[2013-08-22 13:52:56,281 INFO user=alfred-e-neumann blub=1 blob=abc blib=100 ], myheader=[myheader-value]}]
> ...
> 2013-08-24 16:18:16,203 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
> 2013-08-24 16:18:16,338 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs://nn/flume-events/13-08-24/1610/event.1377353896200.tmp
> ""
>
> I have no idea what I'm doing wrong (or didn't understand correctly 🙂 ) in this simple example  .... any hint highly appreciated.
> But this is just the first and most simple case, what I wanted to do in general is something like:
> * parse each line of text in the morphline pipe and send just parts of the incoming text line back to flume
> * write those parts into the file on HDFS
> e.g. :
> a) incoming text to morphline: 2013-08-22 13:52:56,281 INFO user=alfred-e-neumann blub=1 blob=abc blib=100
> b) apply some grok regex e.g. and send the string "user,alfred-e-neumann,blob,abc" back to flume
> c) write "user,alfred-e-neumann,blob,abc" to the file on HDFS
>
> I can access the text of a) via the field "message" and apply regex on that to extract some information into newly created header fields. And now ? What to do with this header fields to replace the incoming record body (the text line) by a manually created string like the one in c) .
>
> Hopefully I did describe my problem in an understandable manner 🙂
>
> thanks in advance.....gk......
>
>
> On 23 August 2013 22:41, Wolfgang Hoschek <whoschek@cloudera.com> wrote:
> On input the MorphlineInterceptor fills the body of the Flume event into the _attachment_body field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name.
>
> The output record of the final command in the MorphlineInterceptor is automatically piped back into flume. The interceptor copies the record fields into flume headers of the same name.
>
> Thus, a simple morphline to add a new flume header "foo" with value "bar" to a flume event with MorphlineInterceptor might be:
>
> …
> addValues {
>   foo : bar
> }
>
> Perhaps a unit test can help illustrate it:
>
> https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java...
>
> The source code for the mapping is here:
>
> https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java...
>
> Wolfgang.

 

View solution in original post

1 REPLY 1

avatar
Guru

This question has been answered already by W.Hoschek and I'll post the whole conversation here for the sake of completeness and to push the usage of the community forum(s). I'd be best to read from bottom up 😉

 

=======================================================

 

On output the Flume MorphlineInterceptor copies the _attachment_body record field (which must be of type byte[]) into the flume body.
The readLine morphline command reads the _attachment_body and then removes the _attachment_body field, which is why the flume event body becomes empty.
So an example morphline that sets the flume event body to "foo" is as follows:

{ readLine { charset : UTF-8 } }
… some processing of the line goes here
{ setValues { _attachment_body : [foo] } }
{ java { code: """
record.replaceValues("_attachment_body",   record.getFirstValue("_attachment_body").toString().getBytes(Charsets.UTF_8));
return child.process(record);
} }

P.S. Unfortunately there's no toByteArray command available just yet, which is why this example uses a java command to convert the string to a byte array.

Wolfgang.

 
On Aug 24, 2013, at 8:06 AM, Gerd Koenig wrote:
 > Hi Wolfgang,
>
> many thanks for your explanations and the links. The header manipulation stuff sound evident and works as expected in my simple example, but more probably I can't see the forest but the trees to solve my initial problem 😉
>
> I tried a veeery simple morphline to test the workflow, but I'm missing a spontaneous intuition of how to adjust the flume event body (the text which shall be written to HDFS).
> Here are the details of my simple Flume flow test:
> a) content of input file (this file will be put in folder where the Flume-spoolSrc listens to)
> 2013-08-22 13:52:56,281 INFO user=alfred-e-neumann blub=1 blob=abc blib=100
> b) configured interceptor
> agent.sources.spoolSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
> agent.sources.spoolSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphlineinterceptor.conf
> agent.sources.spoolSrc.interceptors.morphlineinterceptor.morphlineId = morphlineSourceInterceptor
> c) content of morphlineinterceptor.conf
> morphlines : [
>   {
>     id : morphlineSourceInterceptor
>     importCommands : ["com.cloudera.**"]
>     commands : [
>             {
>             readLine {
>                charset : UTF-8
>             }
>       }
>       { logInfo { format : " ############## START ##############" } }
>       { logInfo { format : " input: {}", args : ["@{}"] } }
>       { addValues { myheader : myheader-value } }
>       { logInfo { format : "(1) : {}", args : ["@{}"] } }
>     ]
>   }
> ]
>
> After putting the input file in the spool dir, I would expect to receive a file on HDFS including the original text of the input file since the incoming record in the morphline will not be modified and thereby shall be sent back to flume unmodified (the output record you mentioned is the text which will be piped back to the Flume event body?). But........at the end I have an empty file on HDFS .. ?!?!
>
> The flume log includes:
> ""
> 2013-08-24 16:18:16,183 INFO com.cloudera.cdk.morphline.stdlib.LogInfoBuilder$LogInfo:  ############## START ##############
> 2013-08-24 16:18:16,183 DEBUG com.cloudera.cdk.morphline.stdlib.LogInfoBuilder$LogInfo: beforeProcess()
> 2013-08-24 16:18:16,186 INFO com.cloudera.cdk.morphline.stdlib.LogInfoBuilder$LogInfo:  input: [{message=[2013-08-22 13:52:56,281 INFO user=alfred-e-neumann blub=1 blob=abc blib=100 ]}]
> 2013-08-24 16:18:16,186 DEBUG com.cloudera.cdk.morphline.stdlib.AddValuesBuilder$AddValues: beforeProcess()
> 2013-08-24 16:18:16,186 DEBUG com.cloudera.cdk.morphline.stdlib.LogInfoBuilder$LogInfo: beforeProcess()
> 2013-08-24 16:18:16,187 INFO com.cloudera.cdk.morphline.stdlib.LogInfoBuilder$LogInfo: (1) : [{message=[2013-08-22 13:52:56,281 INFO user=alfred-e-neumann blub=1 blob=abc blib=100 ], myheader=[myheader-value]}]
> ...
> 2013-08-24 16:18:16,203 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
> 2013-08-24 16:18:16,338 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs://nn/flume-events/13-08-24/1610/event.1377353896200.tmp
> ""
>
> I have no idea what I'm doing wrong (or didn't understand correctly 🙂 ) in this simple example  .... any hint highly appreciated.
> But this is just the first and most simple case, what I wanted to do in general is something like:
> * parse each line of text in the morphline pipe and send just parts of the incoming text line back to flume
> * write those parts into the file on HDFS
> e.g. :
> a) incoming text to morphline: 2013-08-22 13:52:56,281 INFO user=alfred-e-neumann blub=1 blob=abc blib=100
> b) apply some grok regex e.g. and send the string "user,alfred-e-neumann,blob,abc" back to flume
> c) write "user,alfred-e-neumann,blob,abc" to the file on HDFS
>
> I can access the text of a) via the field "message" and apply regex on that to extract some information into newly created header fields. And now ? What to do with this header fields to replace the incoming record body (the text line) by a manually created string like the one in c) .
>
> Hopefully I did describe my problem in an understandable manner 🙂
>
> thanks in advance.....gk......
>
>
> On 23 August 2013 22:41, Wolfgang Hoschek <whoschek@cloudera.com> wrote:
> On input the MorphlineInterceptor fills the body of the Flume event into the _attachment_body field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name.
>
> The output record of the final command in the MorphlineInterceptor is automatically piped back into flume. The interceptor copies the record fields into flume headers of the same name.
>
> Thus, a simple morphline to add a new flume header "foo" with value "bar" to a flume event with MorphlineInterceptor might be:
>
> …
> addValues {
>   foo : bar
> }
>
> Perhaps a unit test can help illustrate it:
>
> https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java...
>
> The source code for the mapping is here:
>
> https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java...
>
> Wolfgang.