Reply
Expert Contributor
Posts: 139
Registered: ‎07-21-2014

Flume Morphlines Interceptor to handle fan out flow?

Can the custom Morphline command and wanted to know if its possible to set headers in the same command to use with Flume's fan out flow? I need to redirect certain events based on the content for example, if the field 'page' is 'Home' then I would like to send it to a seperate channel and write to a different Kite Dataset sink.

 

SInce I'm parsing the fields already, it might be best if I can do this within the same custom Morphline command.

 

Thanks!

Cloudera Employee
Posts: 146
Registered: ‎08-21-2013

Re: Flume Morphlines Interceptor to handle fan out flow?

Yes, you can set headers. On input each flume header is converted to a morphline record field of the same name, and on output each morphline record field is converted to a flume header of the same name. The _attachment_body field carries the flume event body as a byte[] on input as well as on output as well.

Expert Contributor
Posts: 139
Registered: ‎07-21-2014

Re: Flume Morphlines Interceptor to handle fan out flow?

Thanks for the information. So if I understand correctly, I do not need to implement the Interceptor but just setting a new field in the record would result it that field being a header? This header then be used for redirecting the event to a different channel. 

 

Thanks!

Cloudera Employee
Posts: 146
Registered: ‎08-21-2013

Re: Flume Morphlines Interceptor to handle fan out flow?

Any morphline field except for the _atttachment_body field will become a flume event header.

Expert Contributor
Posts: 139
Registered: ‎07-21-2014

Re: Flume Morphlines Interceptor to handle fan out flow?

Thanks!

 

I'm following this example to attach the avro record's schema to Flume event header for the Dataset sink. If I'm routing the event to 2 different schemas, would attaching both schemas as Static Interceptor work? For example, I got 2 kite datasets created and heres the flume properties:

 

# attach the schema to the record, then convert it to avro
tier1.sources.listener.interceptors = attach-schema1 attache-schema2 morphline

# add the schema for our record sink
tier1.sources.listener.interceptors.attach-schema1.type = static
tier1.sources.listener.interceptors.attach-schema1.key = flume.avro.schema.url
tier1.sources.listener.interceptors.attach-schema1.value = file:/etc/flume-ng/schemas/event1.avsc
tier1.sources.listener.interceptors.attach-schema2.type = static
tier1.sources.listener.interceptors.attach-schema2.key = flume.avro.schema.url
tier1.sources.listener.interceptors.attach-schema2.value = file:/etc/flume-ng/schemas/event2.avsc
tier1.sources.listener.selector.type = multiplexing
tier1.sources.listener.selector.header = eventname
tier1.sources.listener.selector.mapping.e1 = eventChannel1
tier1.sources.listener.selector.mapping.e2 = eventChannel2
tier1.sources.listener.selector.default = eventChannel2
tier1.sink.eventSink1.type = org.apache.flume.sink.kite.DatasetSink
tier1.sink.eventSink1.channel = eventChannel1
tier1.sink.eventSink1.kite.dataset.name = event1
.....
....
tier1.sink.eventSink2.type = org.apache.flume.sink.kite.DatasetSink
tier1.sink.eventSink2.channel = eventChannel2
tier1.sink.eventSink2.kite.dataset.name = event2

Does that look correct?

 

 

Expert Contributor
Posts: 139
Registered: ‎07-21-2014

Re: Flume Morphlines Interceptor to handle fan out flow?

Passing in both the schemas seem to have worked out fine, thanks!