Reply
Champion
Posts: 744
Registered: ‎05-16-2016

Flume use of multiplexing channel selector - Clarification Needed

Below configuration, I am using multiplexing channel selector with header as State and mapping is made to OH, NY, IN . 

 

I would like to know how one could include event header in the flume configuration. I am aware of static interceptor but static interceptor will include the header key value for all the event in my senario it wont suit . please guide me on this . 

 

spoolDirAgent.sources.source1.selector.type = multiplexing
spoolDirAgent.sources.source1.selector.header = State
spoolDirAgent.sources.source1.selector.mapping.OH = channel1
spoolDirAgent.sources.source1.selector.mapping.NY = channel2
spoolDirAgent.sources.source1.selector.mapping.IN = channel2
spoolDirAgent.sources.source1.selector.defalut = channel1

Any information is higly appreciatable. 

Cloudera Employee
Posts: 232
Registered: ‎01-09-2014

Re: Flume use of multiplexing channel selector - Clarification Needed

Can you describe what you are trying to accomplish? You can use a morphline interceptor to add event headers based on data within the event, here is a thread with a couple ways of doing that:
https://groups.google.com/a/cloudera.org/forum/#!topic/cdk-dev/6HFtGfGixz8

You can also use the regex extractor interceptor to match data in the event and populate a header with that match:
http://flume.apache.org/FlumeUserGuide.html#regex-extractor-interceptor

-pd

New Contributor
Posts: 4
Registered: ‎08-25-2018

Re: Flume use of multiplexing channel selector - Clarification Needed

I also tried this method. But still it din't do multiplexing. It is doing replicating. Did you solve this?

 

agent1.sinks=hdfs-sink1_1 hdfs-sink1_2 hdfs-sink1_3
agent1.sources=source1_1
agent1.channels=fileChannel1_1 fileChannel1_2 fileChannel1_3

agent1.channels.fileChannel1_1.type=file
agent1.channels.fileChannel1_1.capacity=200000
agent1.channels.fileChannel1_1.transactionCapacity=1000
agent1.channels.fileChannel1_1.checkpointDir=/home/Flume/alpha/001
agent1.channels.fileChannel1_1.dataDirs=/home/Flume/alpha_data
agent1.channels.fileChannel1_1.checkpointOnClose=true
agent1.channels.fileChannel1_1.dataOnClose=true


agent1.sources.source1_1.type=spooldir
agent1.sources.source1_1.spoolDir=/home/ABC/
agent1.sources.source1_1.recursiveDirectorySearch=true
#agent1.sources.source1_1.fileHeader=true
#agent1.sources.source1_1.fileHeaderKey=file
agent1.sources.source1_1.fileSuffix=.COMPLETED
agent1.sources.source1_1.basenameHeader = true
agent1.sources.source1_1.basenameHeaderKey = basename

agent1.sinks.hdfs-sink1_1.type=hdfs
agent1.sinks.hdfs-sink1_1.hdfs.filePrefix = %{basename}
agent1.sinks.hdfs-sink1_1.hdfs.path=hdfs://10.44.209.44:9000/flume_sink/CA
agent1.sinks.hdfs-sink1_1.hdfs.batchSize=1000
agent1.sinks.hdfs-sink1_1.hdfs.rollSize=268435456
agent1.sinks.hdfs-sink1_1.hdfs.rollInterval=0
agent1.sinks.hdfs-sink1_1.hdfs.rollCount=50000000
agent1.sinks.hdfs-sink1_1.hdfs.fileType=DataStream
agent1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text
agent1.sinks.hdfs-sink1_1.hdfs.useLocalTimeStamp=false


agent1.channels.fileChannel1_2.type=file
agent1.channels.fileChannel1_2.capacity=200000
agent1.channels.fileChannel1_2.transactionCapacity=1000
agent1.channels.fileChannel1_2.checkpointDir=/home/Flume/beta/001
agent1.channels.fileChannel1_2.dataDirs=/home/Flume/beta_data
agent1.channels.fileChannel1_2.checkpointOnClose=true
agent1.channels.fileChannel1_2.dataOnClose=true

 

agent1.sinks.hdfs-sink1_2.type=hdfs
agent1.sinks.hdfs-sink1_2.hdfs.filePrefix = %{basename}
agent1.sinks.hdfs-sink1_2.hdfs.path=hdfs://10.44.209.44:9000/flume_sink/AZ
agent1.sinks.hdfs-sink1_2.hdfs.batchSize=1000
agent1.sinks.hdfs-sink1_2.hdfs.rollSize=268435456
agent1.sinks.hdfs-sink1_2.hdfs.rollInterval=0
agent1.sinks.hdfs-sink1_2.hdfs.rollCount=50000000
agent1.sinks.hdfs-sink1_2.hdfs.fileType=DataStream
agent1.sinks.hdfs-sink1_2.hdfs.writeFormat=Text
agent1.sinks.hdfs-sink1_2.hdfs.useLocalTimeStamp=false

agent1.channels.fileChannel1_3.type=file
agent1.channels.fileChannel1_3.capacity=200000
agent1.channels.fileChannel1_3.transactionCapacity=10
agent1.channels.fileChannel1_3.checkpointDir=/home/Flume/gamma/001
agent1.channels.fileChannel1_3.dataDirs=/home/Flume/gamma_data
agent1.channels.fileChannel1_3.checkpointOnClose=true
agent1.channels.fileChannel1_3.dataOnClose=true


agent1.sinks.hdfs-sink1_3.type=hdfs
agent1.sinks.hdfs-sink1_3.hdfs.filePrefix = %{basename}
agent1.sinks.hdfs-sink1_3.hdfs.path=hdfs://10.44.209.44:9000/flume_sink/KT
agent1.sinks.hdfs-sink1_3.hdfs.batchSize=1000
agent1.sinks.hdfs-sink1_3.hdfs.rollSize=268435456
agent1.sinks.hdfs-sink1_3.hdfs.rollInterval=0
agent1.sinks.hdfs-sink1_3.hdfs.rollCount=50000000
agent1.sinks.hdfs-sink1_3.hdfs.fileType=DataStream
agent1.sinks.hdfs-sink1_3.hdfs.writeFormat=Text
agent1.sinks.hdfs-sink1_3.hdfs.useLocalTimeStamp=false


agent1.sources.source1_1.channels=fileChannel1_1 fileChannel1_2 fileChannel1_3

agent1.sinks.hdfs-sink1_1.channel=fileChannel1_1
agent1.sinks.hdfs-sink1_2.channel=fileChannel1_2
agent1.sinks.hdfs-sink1_3.channel=fileChannel1_3


agent1.sources.source1_1.selector.type=replicating
agent1.sources.source1_1.selector.header=basename
agent1.sources.source1_1.selector.mapping.CA=fileChannel1_1
agent1.sources.source1_1.selector.mapping.AZ=fileChannel1_2
agent1.sources.source1_1.selector.default=fileChannel1_3

 

New Contributor
Posts: 4
Registered: ‎08-25-2018

Re: Flume use of multiplexing channel selector - Clarification Needed

But above two interceptors add event headers based on data within the event. Is there any method to do that based on filename.
Highlighted
Champion
Posts: 744
Registered: ‎05-16-2016

Re: Flume use of multiplexing channel selector - Clarification Needed

@Kala You can achieve this using Apache Nifi 

New Contributor
Posts: 4
Registered: ‎08-25-2018

Re: Flume use of multiplexing channel selector - Clarification Needed

Thank you.
Can you explain how to use apache NiFi. Is NiFi using instead of flume or combined with Flume?
Announcements