Reply
Contributor
Posts: 90
Registered: ‎11-12-2015
Accepted Solution

Flafka selector doesn't work

Hello, I'm trying to split my data depending on the header of the log (json). My source is Kafka and my sinks are HDFS folders. When I run flume all the data goes to de default selector even if there is a header match with the mapper. 

 

Here is the Flume config file:

# Sources, channels, and sinks are defined per
# agent name, in this case flume1.
flume1.sources  = kafka-source-1
flume1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7
flume1.sinks    = hdfs-sink-1 hdfs-sink-2 hdfs-sink-3 hdfs-sink-4 hdfs-sink-5 hdfs-sink-6 hdfs-sink-7
 
# For each source, channel, and sink, set
# standard properties.
flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect = 192.168.70.23:2181
flume1.sources.kafka-source-1.topic = kafkatopic
flume1.sources.kafka-source-1.batchSize = 1000
flume1.sources.kafka-source-1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7

flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-2.channel = hdfs-channel-2
flume1.sinks.hdfs-sink-3.channel = hdfs-channel-3
flume1.sinks.hdfs-sink-4.channel = hdfs-channel-4
flume1.sinks.hdfs-sink-5.channel = hdfs-channel-5
flume1.sinks.hdfs-sink-6.channel = hdfs-channel-6
flume1.sinks.hdfs-sink-7.channel = hdfs-channel-7
 
flume1.channels.hdfs-channel-1.type   = memory
flume1.channels.hdfs-channel-2.type   = memory
flume1.channels.hdfs-channel-3.type   = memory
flume1.channels.hdfs-channel-4.type   = memory
flume1.channels.hdfs-channel-5.type   = memory
flume1.channels.hdfs-channel-6.type   = memory
flume1.channels.hdfs-channel-7.type   = memory

flume1.channels.hdfs-channel-1.capacity = 10000
flume1.channels.hdfs-channel-1.transactionCapacity = 1000
flume1.channels.hdfs-channel-2.capacity = 10000
flume1.channels.hdfs-channel-2.transactionCapacity = 1000
flume1.channels.hdfs-channel-3.capacity = 10000
flume1.channels.hdfs-channel-3.transactionCapacity = 1000
flume1.channels.hdfs-channel-4.capacity = 10000
flume1.channels.hdfs-channel-4.transactionCapacity = 1000
flume1.channels.hdfs-channel-5.capacity = 10000
flume1.channels.hdfs-channel-5.transactionCapacity = 1000
flume1.channels.hdfs-channel-6.capacity = 10000
flume1.channels.hdfs-channel-6.transactionCapacity = 1000
flume1.channels.hdfs-channel-7.capacity = 10000
flume1.channels.hdfs-channel-7.transactionCapacity = 1000



#checkpoint,smgsyslog, sepsyslog, pgp, bluecoat-syslog,bluecoat
# channel selector configuration
flume1.sources.kafka-source-1.selector.type = multiplexing
flume1.sources.kafka-source-1.selector.header = product
flume1.sources.kafka-source-1.selector.mapping.ckeckpoint = hdfs-channel-1
flume1.sources.kafka-source-1.selector.mapping.smgsyslog = hdfs-channel-2
flume1.sources.kafka-source-1.selector.mapping.sepsyslog = hdfs-channel-3
flume1.sources.kafka-source-1.selector.mapping.pgp = hdfs-channel-4
flume1.sources.kafka-source-1.selector.mapping.bluecoat-syslog = hdfs-channel-5
flume1.sources.kafka-source-1.selector.mapping.bluecoat = hdfs-channel-6
flume1.sources.kafka-source-1.selector.default = hdfs-channel-7

# sinks configuration
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path = /user/root/logs/checkpoint
flume1.sinks.hdfs-sink-1.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0

flume1.sinks.hdfs-sink-2.type = hdfs
flume1.sinks.hdfs-sink-2.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-2.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-2.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-2.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-2.hdfs.path = /user/root/logs/smgsyslog
flume1.sinks.hdfs-sink-2.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-2.hdfs.rollSize=0

flume1.sinks.hdfs-sink-3.type = hdfs
flume1.sinks.hdfs-sink-3.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-3.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-3.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-3.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-3.hdfs.path = /user/root/logs/sepsyslog
flume1.sinks.hdfs-sink-3.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-3.hdfs.rollSize=0

flume1.sinks.hdfs-sink-4.type = hdfs
flume1.sinks.hdfs-sink-4.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-4.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-4.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-4.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-4.hdfs.path = /user/root/logs/pgp
flume1.sinks.hdfs-sink-4.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-4.hdfs.rollSize=0

flume1.sinks.hdfs-sink-5.type = hdfs
flume1.sinks.hdfs-sink-5.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-5.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-5.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-5.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-5.hdfs.path = /user/root/logs/bluecoatsyslog
flume1.sinks.hdfs-sink-5.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-5.hdfs.rollSize=0

flume1.sinks.hdfs-sink-6.type = hdfs
flume1.sinks.hdfs-sink-6.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-6.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-6.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-6.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-6.hdfs.path = /user/root/log/bluecoat
flume1.sinks.hdfs-sink-6.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-6.hdfs.rollSize=0

flume1.sinks.hdfs-sink-7.type = hdfs
flume1.sinks.hdfs-sink-7.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-7.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-7.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-7.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-7.hdfs.path = /user/root/log/otros
flume1.sinks.hdfs-sink-7.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-7.hdfs.rollSize=0

and the logs are like this:

{"@version":"1","@timestamp":"2015-12-09T20:09:36.000Z","client":"ccu","product":"smgsyslog","type":"syslog","host":"datos01","path":"/datos/logs/clientes/ccu/antispam/syslog/scanner02_smg_ccu_syslog.log","nombre":["mail2","mail2"],"proceso":["bmserver","bmserver"],"resto":["[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"<!--(?:(?:\\\\s*<\\\\/?[a-z]+[^>]+>)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\".","[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"<!--(?:(?:\\\\s*<\\\\/?[a-z]+[^>]+>)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\"."]}

So even if the product header match it goes to default selector (channel-7).

Please help me with this,

 

Cloudera Employee
Posts: 275
Registered: ‎01-09-2014

Re: Flafka selector doesn't work

[ Edited ]

Hello,

  Can you please add a logging channel and logger sink to your flume configuration?  This would show, in the solr-cmf logs, exactly what headers are set for the events coming from your kafka source.

 

You would need to add something like this (multiple channels on the default selector will be replicating):

 


flume1.sources.kafka-source-1.selector.default = hdfs-channel-7 logChannel
flume1.channels.logChannel.type = memory 
flume1.sinks.logSink.type = logger
flume1.sinks.logSink.channel = logChannel

 

Contributor
Posts: 90
Registered: ‎11-12-2015

Re: Flafka selector doesn't work

This is the result:

 

2015-12-10 09:38:59,065 INFO org.apache.solr.servlet.SolrDispatchFilter: [admin] webapp=null path=/admin/cores params={action=STATUS&wt=json} status=0 QTime=0

 The headers are status and Qtime? and if they are, how can I make that a field of a log is read as a header?.

 

 

Contributor
Posts: 90
Registered: ‎11-12-2015

Re: Flafka selector doesn't work

I added a interceptor that finds the field product in the log and creates a header with it. This is the code, and is not working. What could be wong?

 

#Interceptors setup
flume1.sources.kafka-source-1.interceptors = i1
flume1.sources.kafka-source-1.interceptors.i1.type = regex_extractor
flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\d+)"
flume1.sources.kafka-source-1.interceptors.i1.serializers = ser1
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.name = product

the field product in the log is like this

...,"product":"smgsyslog",...
Cloudera Employee
Posts: 275
Registered: ‎01-09-2014

Re: Flafka selector doesn't work

[ Edited ]

If your events coming from kafka are in json format, you could put together a quick json interceptor, and that way all your fields in json would get populated as flume headers.

 

Here are some examples:

http://mmolimar.blogspot.com/2015/01/analyzing-tweets-from-flume-in-kibana.html

https://github.com/szaharici/Flume-Json-Interceptor

 

If you do stick to just the regex interceptor, you are trying to use the (\\d+) to capture a string field, which will not match as the \d is for digits.  You'd need to do something like

flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\w+)"

Which will match any word characters: http://www.w3schools.com/jsref/jsref_regexp_wordchar.asp

 

I would recommend creating a json interceptor though, as that will give you the most flexibility, and all your json fields will be populated in the headers 

Contributor
Posts: 90
Registered: ‎11-12-2015

Re: Flafka selector doesn't work

I changed the regex but still not working. the whole config file is this:

 

 

# Sources, channels, and sinks are defined per
# agent name, in this case flume1.
flume1.sources = kafka-source-1
flume1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 logChannel
flume1.sinks = hdfs-sink-1 hdfs-sink-2 hdfs-sink-3 hdfs-sink-4 hdfs-sink-5 hdfs-sink-6 hdfs-sink-7 logSink

# For each source, channel, and sink, set
# standard properties.
flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect = 192.168.70.23:2181
flume1.sources.kafka-source-1.topic = kafkatopic
flume1.sources.kafka-source-1.batchSize = 1000
flume1.sources.kafka-source-1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 logChannel

flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-2.channel = hdfs-channel-2
flume1.sinks.hdfs-sink-3.channel = hdfs-channel-3
flume1.sinks.hdfs-sink-4.channel = hdfs-channel-4
flume1.sinks.hdfs-sink-5.channel = hdfs-channel-5
flume1.sinks.hdfs-sink-6.channel = hdfs-channel-6
flume1.sinks.hdfs-sink-7.channel = hdfs-channel-7
flume1.sinks.logSink.channel = logChannel

flume1.channels.hdfs-channel-1.type = memory
flume1.channels.hdfs-channel-2.type = memory
flume1.channels.hdfs-channel-3.type = memory
flume1.channels.hdfs-channel-4.type = memory
flume1.channels.hdfs-channel-5.type = memory
flume1.channels.hdfs-channel-6.type = memory
flume1.channels.hdfs-channel-7.type = memory
flume1.channels.logChannel.type = memory

flume1.channels.hdfs-channel-1.capacity = 10000
flume1.channels.hdfs-channel-1.transactionCapacity = 1000
flume1.channels.hdfs-channel-2.capacity = 10000
flume1.channels.hdfs-channel-2.transactionCapacity = 1000
flume1.channels.hdfs-channel-3.capacity = 10000
flume1.channels.hdfs-channel-3.transactionCapacity = 1000
flume1.channels.hdfs-channel-4.capacity = 10000
flume1.channels.hdfs-channel-4.transactionCapacity = 1000
flume1.channels.hdfs-channel-5.capacity = 10000
flume1.channels.hdfs-channel-5.transactionCapacity = 1000
flume1.channels.hdfs-channel-6.capacity = 10000
flume1.channels.hdfs-channel-6.transactionCapacity = 1000
flume1.channels.hdfs-channel-7.capacity = 10000
flume1.channels.hdfs-channel-7.transactionCapacity = 1000
flume1.channels.logChannel.capacity = 10000
flume1.channels.logChannel.transactionCapacity = 1000

#Interceptors setup
flume1.sources.kafka-source-1.interceptors = i1
flume1.sources.kafka-source-1.interceptors.i1.type = regex_extractor
flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\w+)"
flume1.sources.kafka-source-1.interceptors.i1.serializers = ser1
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.name = product


#checkpoint,smgsyslog, sepsyslog, pgp, bluecoat-syslog,bluecoat
# channel selector configuration
flume1.sources.kafka-source-1.selector.type = multiplexing
flume1.sources.kafka-source-1.selector.header = product
flume1.sources.kafka-source-1.selector.mapping.ckeckpoint = hdfs-channel-1
flume1.sources.kafka-source-1.selector.mapping.smgsyslog = hdfs-channel-2
flume1.sources.kafka-source-1.selector.mapping.sepsyslog = hdfs-channel-3
flume1.sources.kafka-source-1.selector.mapping.pgp = hdfs-channel-4
flume1.sources.kafka-source-1.selector.mapping.bluecoat-syslog = hdfs-channel-5
flume1.sources.kafka-source-1.selector.mapping.bluecoat = hdfs-channel-6
flume1.sources.kafka-source-1.selector.default = hdfs-channel-7 logChannel

# sinks configuration
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path = /user/root/logs/checkpoint
flume1.sinks.hdfs-sink-1.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0

flume1.sinks.hdfs-sink-2.type = hdfs
flume1.sinks.hdfs-sink-2.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-2.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-2.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-2.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-2.hdfs.path = /user/root/logs/smgsyslog
flume1.sinks.hdfs-sink-2.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-2.hdfs.rollSize=0

flume1.sinks.hdfs-sink-3.type = hdfs
flume1.sinks.hdfs-sink-3.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-3.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-3.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-3.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-3.hdfs.path = /user/root/logs/sepsyslog
flume1.sinks.hdfs-sink-3.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-3.hdfs.rollSize=0

flume1.sinks.hdfs-sink-4.type = hdfs
flume1.sinks.hdfs-sink-4.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-4.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-4.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-4.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-4.hdfs.path = /user/root/logs/pgp
flume1.sinks.hdfs-sink-4.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-4.hdfs.rollSize=0

flume1.sinks.hdfs-sink-5.type = hdfs
flume1.sinks.hdfs-sink-5.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-5.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-5.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-5.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-5.hdfs.path = /user/root/logs/bluecoatsyslog
flume1.sinks.hdfs-sink-5.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-5.hdfs.rollSize=0

flume1.sinks.hdfs-sink-6.type = hdfs
flume1.sinks.hdfs-sink-6.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-6.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-6.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-6.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-6.hdfs.path = /user/root/logs/bluecoat
flume1.sinks.hdfs-sink-6.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-6.hdfs.rollSize=0

flume1.sinks.hdfs-sink-7.type = hdfs
flume1.sinks.hdfs-sink-7.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-7.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-7.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-7.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-7.hdfs.path = /user/root/logs/otros
flume1.sinks.hdfs-sink-7.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-7.hdfs.rollSize=0

flume1.sinks.logSink.type = logger

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.

 

I think that is somthing wrong with the channels but i dont know what is the problem. The logger output without the interceptor part has two headers, timestamp and topic 

 

 

Contributor
Posts: 90
Registered: ‎11-12-2015

Re: Flafka selector doesn't work

Problem solved. 

Instead of using: 

 

flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default

changed it for:

 

 

flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer

And it worked fine.

 

 

I have two more questions:

 

1) the - (hyphen) cannot be readed as part of a header, so if the value of the header has - its goes to the default and not to the corresponding mapper.

 

2) I wanna add a second regex but how can I mapp two headers together, for example:

 

flume1.sources.kafka-source-1.selector.header = header1 header2

flume1.sources.kafka-source-1.selector.mapping.(value1)&(value2) = hdfs-channel-x

 

It is possible by doing it without programing it? Because im not a programer.

 

Regards,

 

Cloudera Employee
Posts: 275
Registered: ‎01-09-2014

Re: Flafka selector doesn't work

[ Edited ]

Glad to hear you got it working.  

 

You can't concatenate headers together with the flume config. However, the morphline interceptor will allow you more complex functionality for manipulating headers:  http://flume.apache.org/FlumeUserGuide.html#morphline-interceptor

 

This will allow you to arbitrarily update/delete/modify headers as well as event body, prior to being passed to the channel selector.  You can write a morphline that will examine the body and set any headers that you wish.

 

Here is the morphlines command reference guide to help you get started: http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html

 

HTH!

Highlighted
Contributor
Posts: 90
Registered: ‎11-12-2015

Re: Flafka selector doesn't work

Hello, I  created a java file with the custom interceptor, but I don't know how to compile it or transform it to a jar file properly. I tested the javac and the jar command, put the interceptor builder is not found. 

Cloudera Employee
Posts: 275
Registered: ‎01-09-2014

Re: Flafka selector doesn't work

Did you try this one: https://github.com/szaharici/Flume-Json-Interceptor

 

Once you have that compiled, you'll want to put that into the flume /var/lib/flume-ng/plugins.d directory in the proper subdirectory (and with proper permissions for flume to read), following the convention here: http://flume.apache.org/FlumeUserGuide.html#the-plugins-d-directory

 

Announcements
New solutions