01-14-2015 12:00 PM
Can the custom Morphline command and wanted to know if its possible to set headers in the same command to use with Flume's fan out flow? I need to redirect certain events based on the content for example, if the field 'page' is 'Home' then I would like to send it to a seperate channel and write to a different Kite Dataset sink.
SInce I'm parsing the fields already, it might be best if I can do this within the same custom Morphline command.
01-14-2015 01:50 PM
Yes, you can set headers. On input each flume header is converted to a morphline record field of the same name, and on output each morphline record field is converted to a flume header of the same name. The _attachment_body field carries the flume event body as a byte on input as well as on output as well.
01-14-2015 11:36 PM
Thanks for the information. So if I understand correctly, I do not need to implement the Interceptor but just setting a new field in the record would result it that field being a header? This header then be used for redirecting the event to a different channel.
01-15-2015 10:43 AM
I'm following this example to attach the avro record's schema to Flume event header for the Dataset sink. If I'm routing the event to 2 different schemas, would attaching both schemas as Static Interceptor work? For example, I got 2 kite datasets created and heres the flume properties:
# attach the schema to the record, then convert it to avro tier1.sources.listener.interceptors = attach-schema1 attache-schema2 morphline # add the schema for our record sink tier1.sources.listener.interceptors.attach-schema1.type = static tier1.sources.listener.interceptors.attach-schema1.key = flume.avro.schema.url tier1.sources.listener.interceptors.attach-schema1.value = file:/etc/flume-ng/schemas/event1.avsc
tier1.sources.listener.interceptors.attach-schema2.type = static tier1.sources.listener.interceptors.attach-schema2.key = flume.avro.schema.url tier1.sources.listener.interceptors.attach-schema2.value = file:/etc/flume-ng/schemas/event2.avsc
tier1.sources.listener.selector.type = multiplexing
tier1.sources.listener.selector.header = eventname
tier1.sources.listener.selector.mapping.e1 = eventChannel1
tier1.sources.listener.selector.mapping.e2 = eventChannel2
tier1.sources.listener.selector.default = eventChannel2
tier1.sink.eventSink1.type = org.apache.flume.sink.kite.DatasetSink
tier1.sink.eventSink1.channel = eventChannel1
tier1.sink.eventSink1.kite.dataset.name = event1
tier1.sink.eventSink2.type = org.apache.flume.sink.kite.DatasetSink
tier1.sink.eventSink2.channel = eventChannel2
tier1.sink.eventSink2.kite.dataset.name = event2
Does that look correct?