Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Flafka selector doesn't work

avatar
Contributor

Hello, I'm trying to split my data depending on the header of the log (json). My source is Kafka and my sinks are HDFS folders. When I run flume all the data goes to de default selector even if there is a header match with the mapper. 

 

Here is the Flume config file:

# Sources, channels, and sinks are defined per
# agent name, in this case flume1.
flume1.sources  = kafka-source-1
flume1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7
flume1.sinks    = hdfs-sink-1 hdfs-sink-2 hdfs-sink-3 hdfs-sink-4 hdfs-sink-5 hdfs-sink-6 hdfs-sink-7
 
# For each source, channel, and sink, set
# standard properties.
flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect = 192.168.70.23:2181
flume1.sources.kafka-source-1.topic = kafkatopic
flume1.sources.kafka-source-1.batchSize = 1000
flume1.sources.kafka-source-1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7

flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-2.channel = hdfs-channel-2
flume1.sinks.hdfs-sink-3.channel = hdfs-channel-3
flume1.sinks.hdfs-sink-4.channel = hdfs-channel-4
flume1.sinks.hdfs-sink-5.channel = hdfs-channel-5
flume1.sinks.hdfs-sink-6.channel = hdfs-channel-6
flume1.sinks.hdfs-sink-7.channel = hdfs-channel-7
 
flume1.channels.hdfs-channel-1.type   = memory
flume1.channels.hdfs-channel-2.type   = memory
flume1.channels.hdfs-channel-3.type   = memory
flume1.channels.hdfs-channel-4.type   = memory
flume1.channels.hdfs-channel-5.type   = memory
flume1.channels.hdfs-channel-6.type   = memory
flume1.channels.hdfs-channel-7.type   = memory

flume1.channels.hdfs-channel-1.capacity = 10000
flume1.channels.hdfs-channel-1.transactionCapacity = 1000
flume1.channels.hdfs-channel-2.capacity = 10000
flume1.channels.hdfs-channel-2.transactionCapacity = 1000
flume1.channels.hdfs-channel-3.capacity = 10000
flume1.channels.hdfs-channel-3.transactionCapacity = 1000
flume1.channels.hdfs-channel-4.capacity = 10000
flume1.channels.hdfs-channel-4.transactionCapacity = 1000
flume1.channels.hdfs-channel-5.capacity = 10000
flume1.channels.hdfs-channel-5.transactionCapacity = 1000
flume1.channels.hdfs-channel-6.capacity = 10000
flume1.channels.hdfs-channel-6.transactionCapacity = 1000
flume1.channels.hdfs-channel-7.capacity = 10000
flume1.channels.hdfs-channel-7.transactionCapacity = 1000



#checkpoint,smgsyslog, sepsyslog, pgp, bluecoat-syslog,bluecoat
# channel selector configuration
flume1.sources.kafka-source-1.selector.type = multiplexing
flume1.sources.kafka-source-1.selector.header = product
flume1.sources.kafka-source-1.selector.mapping.ckeckpoint = hdfs-channel-1
flume1.sources.kafka-source-1.selector.mapping.smgsyslog = hdfs-channel-2
flume1.sources.kafka-source-1.selector.mapping.sepsyslog = hdfs-channel-3
flume1.sources.kafka-source-1.selector.mapping.pgp = hdfs-channel-4
flume1.sources.kafka-source-1.selector.mapping.bluecoat-syslog = hdfs-channel-5
flume1.sources.kafka-source-1.selector.mapping.bluecoat = hdfs-channel-6
flume1.sources.kafka-source-1.selector.default = hdfs-channel-7

# sinks configuration
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path = /user/root/logs/checkpoint
flume1.sinks.hdfs-sink-1.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0

flume1.sinks.hdfs-sink-2.type = hdfs
flume1.sinks.hdfs-sink-2.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-2.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-2.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-2.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-2.hdfs.path = /user/root/logs/smgsyslog
flume1.sinks.hdfs-sink-2.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-2.hdfs.rollSize=0

flume1.sinks.hdfs-sink-3.type = hdfs
flume1.sinks.hdfs-sink-3.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-3.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-3.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-3.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-3.hdfs.path = /user/root/logs/sepsyslog
flume1.sinks.hdfs-sink-3.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-3.hdfs.rollSize=0

flume1.sinks.hdfs-sink-4.type = hdfs
flume1.sinks.hdfs-sink-4.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-4.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-4.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-4.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-4.hdfs.path = /user/root/logs/pgp
flume1.sinks.hdfs-sink-4.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-4.hdfs.rollSize=0

flume1.sinks.hdfs-sink-5.type = hdfs
flume1.sinks.hdfs-sink-5.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-5.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-5.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-5.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-5.hdfs.path = /user/root/logs/bluecoatsyslog
flume1.sinks.hdfs-sink-5.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-5.hdfs.rollSize=0

flume1.sinks.hdfs-sink-6.type = hdfs
flume1.sinks.hdfs-sink-6.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-6.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-6.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-6.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-6.hdfs.path = /user/root/log/bluecoat
flume1.sinks.hdfs-sink-6.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-6.hdfs.rollSize=0

flume1.sinks.hdfs-sink-7.type = hdfs
flume1.sinks.hdfs-sink-7.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-7.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-7.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-7.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-7.hdfs.path = /user/root/log/otros
flume1.sinks.hdfs-sink-7.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-7.hdfs.rollSize=0

and the logs are like this:

{"@version":"1","@timestamp":"2015-12-09T20:09:36.000Z","client":"ccu","product":"smgsyslog","type":"syslog","host":"datos01","path":"/datos/logs/clientes/ccu/antispam/syslog/scanner02_smg_ccu_syslog.log","nombre":["mail2","mail2"],"proceso":["bmserver","bmserver"],"resto":["[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"<!--(?:(?:\\\\s*<\\\\/?[a-z]+[^>]+>)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\".","[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"<!--(?:(?:\\\\s*<\\\\/?[a-z]+[^>]+>)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\"."]}

So even if the product header match it goes to default selector (channel-7).

Please help me with this,

 

1 ACCEPTED SOLUTION

avatar
Contributor

I solved the problem. I had to created a java custom interceptor (based in the one you sent me), compile it with maven and paste it in the flume-ng dir. 

 

Thanks pdvorak for all the help 🙂

View solution in original post

12 REPLIES 12

avatar

Hello,

  Can you please add a logging channel and logger sink to your flume configuration?  This would show, in the solr-cmf logs, exactly what headers are set for the events coming from your kafka source.

 

You would need to add something like this (multiple channels on the default selector will be replicating):

 


flume1.sources.kafka-source-1.selector.default = hdfs-channel-7 logChannel
flume1.channels.logChannel.type = memory 
flume1.sinks.logSink.type = logger
flume1.sinks.logSink.channel = logChannel

 

avatar
Contributor

This is the result:

 

2015-12-10 09:38:59,065 INFO org.apache.solr.servlet.SolrDispatchFilter: [admin] webapp=null path=/admin/cores params={action=STATUS&wt=json} status=0 QTime=0

 The headers are status and Qtime? and if they are, how can I make that a field of a log is read as a header?.

 

 

avatar
Contributor

I added a interceptor that finds the field product in the log and creates a header with it. This is the code, and is not working. What could be wong?

 

#Interceptors setup
flume1.sources.kafka-source-1.interceptors = i1
flume1.sources.kafka-source-1.interceptors.i1.type = regex_extractor
flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\d+)"
flume1.sources.kafka-source-1.interceptors.i1.serializers = ser1
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.name = product

the field product in the log is like this

...,"product":"smgsyslog",...

avatar

If your events coming from kafka are in json format, you could put together a quick json interceptor, and that way all your fields in json would get populated as flume headers.

 

Here are some examples:

http://mmolimar.blogspot.com/2015/01/analyzing-tweets-from-flume-in-kibana.html

https://github.com/szaharici/Flume-Json-Interceptor

 

If you do stick to just the regex interceptor, you are trying to use the (\\d+) to capture a string field, which will not match as the \d is for digits.  You'd need to do something like

flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\w+)"

Which will match any word characters: http://www.w3schools.com/jsref/jsref_regexp_wordchar.asp

 

I would recommend creating a json interceptor though, as that will give you the most flexibility, and all your json fields will be populated in the headers 

avatar
Contributor

I changed the regex but still not working. the whole config file is this:

 

 

# Sources, channels, and sinks are defined per
# agent name, in this case flume1.
flume1.sources = kafka-source-1
flume1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 logChannel
flume1.sinks = hdfs-sink-1 hdfs-sink-2 hdfs-sink-3 hdfs-sink-4 hdfs-sink-5 hdfs-sink-6 hdfs-sink-7 logSink

# For each source, channel, and sink, set
# standard properties.
flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect = 192.168.70.23:2181
flume1.sources.kafka-source-1.topic = kafkatopic
flume1.sources.kafka-source-1.batchSize = 1000
flume1.sources.kafka-source-1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 logChannel

flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-2.channel = hdfs-channel-2
flume1.sinks.hdfs-sink-3.channel = hdfs-channel-3
flume1.sinks.hdfs-sink-4.channel = hdfs-channel-4
flume1.sinks.hdfs-sink-5.channel = hdfs-channel-5
flume1.sinks.hdfs-sink-6.channel = hdfs-channel-6
flume1.sinks.hdfs-sink-7.channel = hdfs-channel-7
flume1.sinks.logSink.channel = logChannel

flume1.channels.hdfs-channel-1.type = memory
flume1.channels.hdfs-channel-2.type = memory
flume1.channels.hdfs-channel-3.type = memory
flume1.channels.hdfs-channel-4.type = memory
flume1.channels.hdfs-channel-5.type = memory
flume1.channels.hdfs-channel-6.type = memory
flume1.channels.hdfs-channel-7.type = memory
flume1.channels.logChannel.type = memory

flume1.channels.hdfs-channel-1.capacity = 10000
flume1.channels.hdfs-channel-1.transactionCapacity = 1000
flume1.channels.hdfs-channel-2.capacity = 10000
flume1.channels.hdfs-channel-2.transactionCapacity = 1000
flume1.channels.hdfs-channel-3.capacity = 10000
flume1.channels.hdfs-channel-3.transactionCapacity = 1000
flume1.channels.hdfs-channel-4.capacity = 10000
flume1.channels.hdfs-channel-4.transactionCapacity = 1000
flume1.channels.hdfs-channel-5.capacity = 10000
flume1.channels.hdfs-channel-5.transactionCapacity = 1000
flume1.channels.hdfs-channel-6.capacity = 10000
flume1.channels.hdfs-channel-6.transactionCapacity = 1000
flume1.channels.hdfs-channel-7.capacity = 10000
flume1.channels.hdfs-channel-7.transactionCapacity = 1000
flume1.channels.logChannel.capacity = 10000
flume1.channels.logChannel.transactionCapacity = 1000

#Interceptors setup
flume1.sources.kafka-source-1.interceptors = i1
flume1.sources.kafka-source-1.interceptors.i1.type = regex_extractor
flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\w+)"
flume1.sources.kafka-source-1.interceptors.i1.serializers = ser1
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.name = product


#checkpoint,smgsyslog, sepsyslog, pgp, bluecoat-syslog,bluecoat
# channel selector configuration
flume1.sources.kafka-source-1.selector.type = multiplexing
flume1.sources.kafka-source-1.selector.header = product
flume1.sources.kafka-source-1.selector.mapping.ckeckpoint = hdfs-channel-1
flume1.sources.kafka-source-1.selector.mapping.smgsyslog = hdfs-channel-2
flume1.sources.kafka-source-1.selector.mapping.sepsyslog = hdfs-channel-3
flume1.sources.kafka-source-1.selector.mapping.pgp = hdfs-channel-4
flume1.sources.kafka-source-1.selector.mapping.bluecoat-syslog = hdfs-channel-5
flume1.sources.kafka-source-1.selector.mapping.bluecoat = hdfs-channel-6
flume1.sources.kafka-source-1.selector.default = hdfs-channel-7 logChannel

# sinks configuration
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path = /user/root/logs/checkpoint
flume1.sinks.hdfs-sink-1.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0

flume1.sinks.hdfs-sink-2.type = hdfs
flume1.sinks.hdfs-sink-2.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-2.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-2.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-2.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-2.hdfs.path = /user/root/logs/smgsyslog
flume1.sinks.hdfs-sink-2.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-2.hdfs.rollSize=0

flume1.sinks.hdfs-sink-3.type = hdfs
flume1.sinks.hdfs-sink-3.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-3.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-3.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-3.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-3.hdfs.path = /user/root/logs/sepsyslog
flume1.sinks.hdfs-sink-3.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-3.hdfs.rollSize=0

flume1.sinks.hdfs-sink-4.type = hdfs
flume1.sinks.hdfs-sink-4.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-4.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-4.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-4.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-4.hdfs.path = /user/root/logs/pgp
flume1.sinks.hdfs-sink-4.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-4.hdfs.rollSize=0

flume1.sinks.hdfs-sink-5.type = hdfs
flume1.sinks.hdfs-sink-5.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-5.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-5.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-5.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-5.hdfs.path = /user/root/logs/bluecoatsyslog
flume1.sinks.hdfs-sink-5.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-5.hdfs.rollSize=0

flume1.sinks.hdfs-sink-6.type = hdfs
flume1.sinks.hdfs-sink-6.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-6.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-6.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-6.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-6.hdfs.path = /user/root/logs/bluecoat
flume1.sinks.hdfs-sink-6.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-6.hdfs.rollSize=0

flume1.sinks.hdfs-sink-7.type = hdfs
flume1.sinks.hdfs-sink-7.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-7.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-7.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-7.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-7.hdfs.path = /user/root/logs/otros
flume1.sinks.hdfs-sink-7.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-7.hdfs.rollSize=0

flume1.sinks.logSink.type = logger

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.

 

I think that is somthing wrong with the channels but i dont know what is the problem. The logger output without the interceptor part has two headers, timestamp and topic 

 

 

avatar
Contributor

Problem solved. 

Instead of using: 

 

flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default

changed it for:

 

 

flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer

And it worked fine.

 

 

I have two more questions:

 

1) the - (hyphen) cannot be readed as part of a header, so if the value of the header has - its goes to the default and not to the corresponding mapper.

 

2) I wanna add a second regex but how can I mapp two headers together, for example:

 

flume1.sources.kafka-source-1.selector.header = header1 header2

flume1.sources.kafka-source-1.selector.mapping.(value1)&(value2) = hdfs-channel-x

 

It is possible by doing it without programing it? Because im not a programer.

 

Regards,

 

avatar

Glad to hear you got it working.  

 

You can't concatenate headers together with the flume config. However, the morphline interceptor will allow you more complex functionality for manipulating headers:  http://flume.apache.org/FlumeUserGuide.html#morphline-interceptor

 

This will allow you to arbitrarily update/delete/modify headers as well as event body, prior to being passed to the channel selector.  You can write a morphline that will examine the body and set any headers that you wish.

 

Here is the morphlines command reference guide to help you get started: http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html

 

HTH!

avatar
Contributor

Hello, I  created a java file with the custom interceptor, but I don't know how to compile it or transform it to a jar file properly. I tested the javac and the jar command, put the interceptor builder is not found. 

avatar

Did you try this one: https://github.com/szaharici/Flume-Json-Interceptor

 

Once you have that compiled, you'll want to put that into the flume /var/lib/flume-ng/plugins.d directory in the proper subdirectory (and with proper permissions for flume to read), following the convention here: http://flume.apache.org/FlumeUserGuide.html#the-plugins-d-directory