Created on 12-09-2015 01:22 PM - edited 09-16-2022 02:52 AM
Hello, I'm trying to split my data depending on the header of the log (json). My source is Kafka and my sinks are HDFS folders. When I run flume all the data goes to de default selector even if there is a header match with the mapper.
Here is the Flume config file:
# Sources, channels, and sinks are defined per # agent name, in this case flume1. flume1.sources = kafka-source-1 flume1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 flume1.sinks = hdfs-sink-1 hdfs-sink-2 hdfs-sink-3 hdfs-sink-4 hdfs-sink-5 hdfs-sink-6 hdfs-sink-7 # For each source, channel, and sink, set # standard properties. flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource flume1.sources.kafka-source-1.zookeeperConnect = 192.168.70.23:2181 flume1.sources.kafka-source-1.topic = kafkatopic flume1.sources.kafka-source-1.batchSize = 1000 flume1.sources.kafka-source-1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1 flume1.sinks.hdfs-sink-2.channel = hdfs-channel-2 flume1.sinks.hdfs-sink-3.channel = hdfs-channel-3 flume1.sinks.hdfs-sink-4.channel = hdfs-channel-4 flume1.sinks.hdfs-sink-5.channel = hdfs-channel-5 flume1.sinks.hdfs-sink-6.channel = hdfs-channel-6 flume1.sinks.hdfs-sink-7.channel = hdfs-channel-7 flume1.channels.hdfs-channel-1.type = memory flume1.channels.hdfs-channel-2.type = memory flume1.channels.hdfs-channel-3.type = memory flume1.channels.hdfs-channel-4.type = memory flume1.channels.hdfs-channel-5.type = memory flume1.channels.hdfs-channel-6.type = memory flume1.channels.hdfs-channel-7.type = memory flume1.channels.hdfs-channel-1.capacity = 10000 flume1.channels.hdfs-channel-1.transactionCapacity = 1000 flume1.channels.hdfs-channel-2.capacity = 10000 flume1.channels.hdfs-channel-2.transactionCapacity = 1000 flume1.channels.hdfs-channel-3.capacity = 10000 flume1.channels.hdfs-channel-3.transactionCapacity = 1000 flume1.channels.hdfs-channel-4.capacity = 10000 flume1.channels.hdfs-channel-4.transactionCapacity = 1000 flume1.channels.hdfs-channel-5.capacity = 10000 flume1.channels.hdfs-channel-5.transactionCapacity = 1000 flume1.channels.hdfs-channel-6.capacity = 10000 flume1.channels.hdfs-channel-6.transactionCapacity = 1000 flume1.channels.hdfs-channel-7.capacity = 10000 flume1.channels.hdfs-channel-7.transactionCapacity = 1000 #checkpoint,smgsyslog, sepsyslog, pgp, bluecoat-syslog,bluecoat # channel selector configuration flume1.sources.kafka-source-1.selector.type = multiplexing flume1.sources.kafka-source-1.selector.header = product flume1.sources.kafka-source-1.selector.mapping.ckeckpoint = hdfs-channel-1 flume1.sources.kafka-source-1.selector.mapping.smgsyslog = hdfs-channel-2 flume1.sources.kafka-source-1.selector.mapping.sepsyslog = hdfs-channel-3 flume1.sources.kafka-source-1.selector.mapping.pgp = hdfs-channel-4 flume1.sources.kafka-source-1.selector.mapping.bluecoat-syslog = hdfs-channel-5 flume1.sources.kafka-source-1.selector.mapping.bluecoat = hdfs-channel-6 flume1.sources.kafka-source-1.selector.default = hdfs-channel-7 # sinks configuration flume1.sinks.hdfs-sink-1.type = hdfs flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-1.hdfs.path = /user/root/logs/checkpoint flume1.sinks.hdfs-sink-1.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-1.hdfs.rollSize=0 flume1.sinks.hdfs-sink-2.type = hdfs flume1.sinks.hdfs-sink-2.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-2.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-2.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-2.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-2.hdfs.path = /user/root/logs/smgsyslog flume1.sinks.hdfs-sink-2.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-2.hdfs.rollSize=0 flume1.sinks.hdfs-sink-3.type = hdfs flume1.sinks.hdfs-sink-3.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-3.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-3.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-3.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-3.hdfs.path = /user/root/logs/sepsyslog flume1.sinks.hdfs-sink-3.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-3.hdfs.rollSize=0 flume1.sinks.hdfs-sink-4.type = hdfs flume1.sinks.hdfs-sink-4.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-4.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-4.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-4.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-4.hdfs.path = /user/root/logs/pgp flume1.sinks.hdfs-sink-4.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-4.hdfs.rollSize=0 flume1.sinks.hdfs-sink-5.type = hdfs flume1.sinks.hdfs-sink-5.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-5.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-5.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-5.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-5.hdfs.path = /user/root/logs/bluecoatsyslog flume1.sinks.hdfs-sink-5.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-5.hdfs.rollSize=0 flume1.sinks.hdfs-sink-6.type = hdfs flume1.sinks.hdfs-sink-6.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-6.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-6.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-6.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-6.hdfs.path = /user/root/log/bluecoat flume1.sinks.hdfs-sink-6.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-6.hdfs.rollSize=0 flume1.sinks.hdfs-sink-7.type = hdfs flume1.sinks.hdfs-sink-7.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-7.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-7.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-7.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-7.hdfs.path = /user/root/log/otros flume1.sinks.hdfs-sink-7.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-7.hdfs.rollSize=0
and the logs are like this:
{"@version":"1","@timestamp":"2015-12-09T20:09:36.000Z","client":"ccu","product":"smgsyslog","type":"syslog","host":"datos01","path":"/datos/logs/clientes/ccu/antispam/syslog/scanner02_smg_ccu_syslog.log","nombre":["mail2","mail2"],"proceso":["bmserver","bmserver"],"resto":["[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"<!--(?:(?:\\\\s*<\\\\/?[a-z]+[^>]+>)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\".","[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"<!--(?:(?:\\\\s*<\\\\/?[a-z]+[^>]+>)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\"."]}
So even if the product header match it goes to default selector (channel-7).
Please help me with this,
Created 12-16-2015 07:11 AM
I solved the problem. I had to created a java custom interceptor (based in the one you sent me), compile it with maven and paste it in the flume-ng dir.
Thanks pdvorak for all the help 🙂
Created on 12-09-2015 02:05 PM - edited 12-09-2015 02:21 PM
Hello,
Can you please add a logging channel and logger sink to your flume configuration? This would show, in the solr-cmf logs, exactly what headers are set for the events coming from your kafka source.
You would need to add something like this (multiple channels on the default selector will be replicating):
flume1.sources.kafka-source-1.selector.default = hdfs-channel-7 logChannel
flume1.channels.logChannel.type = memory
flume1.sinks.logSink.type = logger
flume1.sinks.logSink.channel = logChannel
Created 12-10-2015 04:52 AM
This is the result:
2015-12-10 09:38:59,065 INFO org.apache.solr.servlet.SolrDispatchFilter: [admin] webapp=null path=/admin/cores params={action=STATUS&wt=json} status=0 QTime=0
The headers are status and Qtime? and if they are, how can I make that a field of a log is read as a header?.
Created 12-10-2015 11:44 AM
I added a interceptor that finds the field product in the log and creates a header with it. This is the code, and is not working. What could be wong?
#Interceptors setup flume1.sources.kafka-source-1.interceptors = i1 flume1.sources.kafka-source-1.interceptors.i1.type = regex_extractor flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\d+)" flume1.sources.kafka-source-1.interceptors.i1.serializers = ser1 flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.name = product
the field product in the log is like this
...,"product":"smgsyslog",...
Created on 12-10-2015 03:11 PM - edited 12-10-2015 03:11 PM
If your events coming from kafka are in json format, you could put together a quick json interceptor, and that way all your fields in json would get populated as flume headers.
Here are some examples:
http://mmolimar.blogspot.com/2015/01/analyzing-tweets-from-flume-in-kibana.html
https://github.com/szaharici/Flume-Json-Interceptor
If you do stick to just the regex interceptor, you are trying to use the (\\d+) to capture a string field, which will not match as the \d is for digits. You'd need to do something like
flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\w+)"
Which will match any word characters: http://www.w3schools.com/jsref/jsref_regexp_wordchar.asp
I would recommend creating a json interceptor though, as that will give you the most flexibility, and all your json fields will be populated in the headers
Created 12-11-2015 05:27 AM
I changed the regex but still not working. the whole config file is this:
# Sources, channels, and sinks are defined per
# agent name, in this case flume1.
flume1.sources = kafka-source-1
flume1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 logChannel
flume1.sinks = hdfs-sink-1 hdfs-sink-2 hdfs-sink-3 hdfs-sink-4 hdfs-sink-5 hdfs-sink-6 hdfs-sink-7 logSink
# For each source, channel, and sink, set
# standard properties.
flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect = 192.168.70.23:2181
flume1.sources.kafka-source-1.topic = kafkatopic
flume1.sources.kafka-source-1.batchSize = 1000
flume1.sources.kafka-source-1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 logChannel
flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-2.channel = hdfs-channel-2
flume1.sinks.hdfs-sink-3.channel = hdfs-channel-3
flume1.sinks.hdfs-sink-4.channel = hdfs-channel-4
flume1.sinks.hdfs-sink-5.channel = hdfs-channel-5
flume1.sinks.hdfs-sink-6.channel = hdfs-channel-6
flume1.sinks.hdfs-sink-7.channel = hdfs-channel-7
flume1.sinks.logSink.channel = logChannel
flume1.channels.hdfs-channel-1.type = memory
flume1.channels.hdfs-channel-2.type = memory
flume1.channels.hdfs-channel-3.type = memory
flume1.channels.hdfs-channel-4.type = memory
flume1.channels.hdfs-channel-5.type = memory
flume1.channels.hdfs-channel-6.type = memory
flume1.channels.hdfs-channel-7.type = memory
flume1.channels.logChannel.type = memory
flume1.channels.hdfs-channel-1.capacity = 10000
flume1.channels.hdfs-channel-1.transactionCapacity = 1000
flume1.channels.hdfs-channel-2.capacity = 10000
flume1.channels.hdfs-channel-2.transactionCapacity = 1000
flume1.channels.hdfs-channel-3.capacity = 10000
flume1.channels.hdfs-channel-3.transactionCapacity = 1000
flume1.channels.hdfs-channel-4.capacity = 10000
flume1.channels.hdfs-channel-4.transactionCapacity = 1000
flume1.channels.hdfs-channel-5.capacity = 10000
flume1.channels.hdfs-channel-5.transactionCapacity = 1000
flume1.channels.hdfs-channel-6.capacity = 10000
flume1.channels.hdfs-channel-6.transactionCapacity = 1000
flume1.channels.hdfs-channel-7.capacity = 10000
flume1.channels.hdfs-channel-7.transactionCapacity = 1000
flume1.channels.logChannel.capacity = 10000
flume1.channels.logChannel.transactionCapacity = 1000
#Interceptors setup
flume1.sources.kafka-source-1.interceptors = i1
flume1.sources.kafka-source-1.interceptors.i1.type = regex_extractor
flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\w+)"
flume1.sources.kafka-source-1.interceptors.i1.serializers = ser1
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.name = product
#checkpoint,smgsyslog, sepsyslog, pgp, bluecoat-syslog,bluecoat
# channel selector configuration
flume1.sources.kafka-source-1.selector.type = multiplexing
flume1.sources.kafka-source-1.selector.header = product
flume1.sources.kafka-source-1.selector.mapping.ckeckpoint = hdfs-channel-1
flume1.sources.kafka-source-1.selector.mapping.smgsyslog = hdfs-channel-2
flume1.sources.kafka-source-1.selector.mapping.sepsyslog = hdfs-channel-3
flume1.sources.kafka-source-1.selector.mapping.pgp = hdfs-channel-4
flume1.sources.kafka-source-1.selector.mapping.bluecoat-syslog = hdfs-channel-5
flume1.sources.kafka-source-1.selector.mapping.bluecoat = hdfs-channel-6
flume1.sources.kafka-source-1.selector.default = hdfs-channel-7 logChannel
# sinks configuration
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path = /user/root/logs/checkpoint
flume1.sinks.hdfs-sink-1.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
flume1.sinks.hdfs-sink-2.type = hdfs
flume1.sinks.hdfs-sink-2.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-2.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-2.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-2.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-2.hdfs.path = /user/root/logs/smgsyslog
flume1.sinks.hdfs-sink-2.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-2.hdfs.rollSize=0
flume1.sinks.hdfs-sink-3.type = hdfs
flume1.sinks.hdfs-sink-3.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-3.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-3.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-3.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-3.hdfs.path = /user/root/logs/sepsyslog
flume1.sinks.hdfs-sink-3.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-3.hdfs.rollSize=0
flume1.sinks.hdfs-sink-4.type = hdfs
flume1.sinks.hdfs-sink-4.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-4.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-4.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-4.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-4.hdfs.path = /user/root/logs/pgp
flume1.sinks.hdfs-sink-4.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-4.hdfs.rollSize=0
flume1.sinks.hdfs-sink-5.type = hdfs
flume1.sinks.hdfs-sink-5.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-5.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-5.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-5.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-5.hdfs.path = /user/root/logs/bluecoatsyslog
flume1.sinks.hdfs-sink-5.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-5.hdfs.rollSize=0
flume1.sinks.hdfs-sink-6.type = hdfs
flume1.sinks.hdfs-sink-6.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-6.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-6.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-6.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-6.hdfs.path = /user/root/logs/bluecoat
flume1.sinks.hdfs-sink-6.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-6.hdfs.rollSize=0
flume1.sinks.hdfs-sink-7.type = hdfs
flume1.sinks.hdfs-sink-7.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-7.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-7.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-7.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-7.hdfs.path = /user/root/logs/otros
flume1.sinks.hdfs-sink-7.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-7.hdfs.rollSize=0
flume1.sinks.logSink.type = logger
# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
I think that is somthing wrong with the channels but i dont know what is the problem. The logger output without the interceptor part has two headers, timestamp and topic .
Created 12-11-2015 10:47 AM
Problem solved.
Instead of using:
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default
changed it for:
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
And it worked fine.
I have two more questions:
1) the - (hyphen) cannot be readed as part of a header, so if the value of the header has - its goes to the default and not to the corresponding mapper.
2) I wanna add a second regex but how can I mapp two headers together, for example:
flume1.sources.kafka-source-1.selector.header = header1 header2
flume1.sources.kafka-source-1.selector.mapping.(value1)&(value2) = hdfs-channel-x
It is possible by doing it without programing it? Because im not a programer.
Regards,
Created on 12-11-2015 11:15 AM - edited 12-11-2015 11:16 AM
Glad to hear you got it working.
You can't concatenate headers together with the flume config. However, the morphline interceptor will allow you more complex functionality for manipulating headers: http://flume.apache.org/FlumeUserGuide.html#morphline-interceptor
This will allow you to arbitrarily update/delete/modify headers as well as event body, prior to being passed to the channel selector. You can write a morphline that will examine the body and set any headers that you wish.
Here is the morphlines command reference guide to help you get started: http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html
HTH!
Created 12-14-2015 01:10 PM
Hello, I created a java file with the custom interceptor, but I don't know how to compile it or transform it to a jar file properly. I tested the javac and the jar command, put the interceptor builder is not found.
Created 12-14-2015 02:39 PM
Did you try this one: https://github.com/szaharici/Flume-Json-Interceptor
Once you have that compiled, you'll want to put that into the flume /var/lib/flume-ng/plugins.d directory in the proper subdirectory (and with proper permissions for flume to read), following the convention here: http://flume.apache.org/FlumeUserGuide.html#the-plugins-d-directory