Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Flume Morphlines Interceptor to handle fan out flow?

Flume Morphlines Interceptor to handle fan out flow?

Expert Contributor

Can the custom Morphline command and wanted to know if its possible to set headers in the same command to use with Flume's fan out flow? I need to redirect certain events based on the content for example, if the field 'page' is 'Home' then I would like to send it to a seperate channel and write to a different Kite Dataset sink.

 

SInce I'm parsing the fields already, it might be best if I can do this within the same custom Morphline command.

 

Thanks!

5 REPLIES 5

Re: Flume Morphlines Interceptor to handle fan out flow?

Expert Contributor

Yes, you can set headers. On input each flume header is converted to a morphline record field of the same name, and on output each morphline record field is converted to a flume header of the same name. The _attachment_body field carries the flume event body as a byte[] on input as well as on output as well.

Re: Flume Morphlines Interceptor to handle fan out flow?

Expert Contributor

Thanks for the information. So if I understand correctly, I do not need to implement the Interceptor but just setting a new field in the record would result it that field being a header? This header then be used for redirecting the event to a different channel. 

 

Thanks!

Re: Flume Morphlines Interceptor to handle fan out flow?

Expert Contributor

Any morphline field except for the _atttachment_body field will become a flume event header.

Re: Flume Morphlines Interceptor to handle fan out flow?

Expert Contributor

Thanks!

 

I'm following this example to attach the avro record's schema to Flume event header for the Dataset sink. If I'm routing the event to 2 different schemas, would attaching both schemas as Static Interceptor work? For example, I got 2 kite datasets created and heres the flume properties:

 

# attach the schema to the record, then convert it to avro
tier1.sources.listener.interceptors = attach-schema1 attache-schema2 morphline

# add the schema for our record sink
tier1.sources.listener.interceptors.attach-schema1.type = static
tier1.sources.listener.interceptors.attach-schema1.key = flume.avro.schema.url
tier1.sources.listener.interceptors.attach-schema1.value = file:/etc/flume-ng/schemas/event1.avsc
tier1.sources.listener.interceptors.attach-schema2.type = static
tier1.sources.listener.interceptors.attach-schema2.key = flume.avro.schema.url
tier1.sources.listener.interceptors.attach-schema2.value = file:/etc/flume-ng/schemas/event2.avsc
tier1.sources.listener.selector.type = multiplexing
tier1.sources.listener.selector.header = eventname
tier1.sources.listener.selector.mapping.e1 = eventChannel1
tier1.sources.listener.selector.mapping.e2 = eventChannel2
tier1.sources.listener.selector.default = eventChannel2
tier1.sink.eventSink1.type = org.apache.flume.sink.kite.DatasetSink
tier1.sink.eventSink1.channel = eventChannel1
tier1.sink.eventSink1.kite.dataset.name = event1
.....
....
tier1.sink.eventSink2.type = org.apache.flume.sink.kite.DatasetSink
tier1.sink.eventSink2.channel = eventChannel2
tier1.sink.eventSink2.kite.dataset.name = event2

Does that look correct?

 

 

Re: Flume Morphlines Interceptor to handle fan out flow?

Expert Contributor

Passing in both the schemas seem to have worked out fine, thanks!