Created on 12-09-2015 01:22 PM - edited 09-16-2022 02:52 AM
Hello, I'm trying to split my data depending on the header of the log (json). My source is Kafka and my sinks are HDFS folders. When I run flume all the data goes to de default selector even if there is a header match with the mapper.
Here is the Flume config file:
# Sources, channels, and sinks are defined per # agent name, in this case flume1. flume1.sources = kafka-source-1 flume1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 flume1.sinks = hdfs-sink-1 hdfs-sink-2 hdfs-sink-3 hdfs-sink-4 hdfs-sink-5 hdfs-sink-6 hdfs-sink-7 # For each source, channel, and sink, set # standard properties. flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource flume1.sources.kafka-source-1.zookeeperConnect = 192.168.70.23:2181 flume1.sources.kafka-source-1.topic = kafkatopic flume1.sources.kafka-source-1.batchSize = 1000 flume1.sources.kafka-source-1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1 flume1.sinks.hdfs-sink-2.channel = hdfs-channel-2 flume1.sinks.hdfs-sink-3.channel = hdfs-channel-3 flume1.sinks.hdfs-sink-4.channel = hdfs-channel-4 flume1.sinks.hdfs-sink-5.channel = hdfs-channel-5 flume1.sinks.hdfs-sink-6.channel = hdfs-channel-6 flume1.sinks.hdfs-sink-7.channel = hdfs-channel-7 flume1.channels.hdfs-channel-1.type = memory flume1.channels.hdfs-channel-2.type = memory flume1.channels.hdfs-channel-3.type = memory flume1.channels.hdfs-channel-4.type = memory flume1.channels.hdfs-channel-5.type = memory flume1.channels.hdfs-channel-6.type = memory flume1.channels.hdfs-channel-7.type = memory flume1.channels.hdfs-channel-1.capacity = 10000 flume1.channels.hdfs-channel-1.transactionCapacity = 1000 flume1.channels.hdfs-channel-2.capacity = 10000 flume1.channels.hdfs-channel-2.transactionCapacity = 1000 flume1.channels.hdfs-channel-3.capacity = 10000 flume1.channels.hdfs-channel-3.transactionCapacity = 1000 flume1.channels.hdfs-channel-4.capacity = 10000 flume1.channels.hdfs-channel-4.transactionCapacity = 1000 flume1.channels.hdfs-channel-5.capacity = 10000 flume1.channels.hdfs-channel-5.transactionCapacity = 1000 flume1.channels.hdfs-channel-6.capacity = 10000 flume1.channels.hdfs-channel-6.transactionCapacity = 1000 flume1.channels.hdfs-channel-7.capacity = 10000 flume1.channels.hdfs-channel-7.transactionCapacity = 1000 #checkpoint,smgsyslog, sepsyslog, pgp, bluecoat-syslog,bluecoat # channel selector configuration flume1.sources.kafka-source-1.selector.type = multiplexing flume1.sources.kafka-source-1.selector.header = product flume1.sources.kafka-source-1.selector.mapping.ckeckpoint = hdfs-channel-1 flume1.sources.kafka-source-1.selector.mapping.smgsyslog = hdfs-channel-2 flume1.sources.kafka-source-1.selector.mapping.sepsyslog = hdfs-channel-3 flume1.sources.kafka-source-1.selector.mapping.pgp = hdfs-channel-4 flume1.sources.kafka-source-1.selector.mapping.bluecoat-syslog = hdfs-channel-5 flume1.sources.kafka-source-1.selector.mapping.bluecoat = hdfs-channel-6 flume1.sources.kafka-source-1.selector.default = hdfs-channel-7 # sinks configuration flume1.sinks.hdfs-sink-1.type = hdfs flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-1.hdfs.path = /user/root/logs/checkpoint flume1.sinks.hdfs-sink-1.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-1.hdfs.rollSize=0 flume1.sinks.hdfs-sink-2.type = hdfs flume1.sinks.hdfs-sink-2.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-2.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-2.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-2.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-2.hdfs.path = /user/root/logs/smgsyslog flume1.sinks.hdfs-sink-2.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-2.hdfs.rollSize=0 flume1.sinks.hdfs-sink-3.type = hdfs flume1.sinks.hdfs-sink-3.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-3.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-3.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-3.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-3.hdfs.path = /user/root/logs/sepsyslog flume1.sinks.hdfs-sink-3.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-3.hdfs.rollSize=0 flume1.sinks.hdfs-sink-4.type = hdfs flume1.sinks.hdfs-sink-4.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-4.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-4.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-4.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-4.hdfs.path = /user/root/logs/pgp flume1.sinks.hdfs-sink-4.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-4.hdfs.rollSize=0 flume1.sinks.hdfs-sink-5.type = hdfs flume1.sinks.hdfs-sink-5.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-5.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-5.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-5.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-5.hdfs.path = /user/root/logs/bluecoatsyslog flume1.sinks.hdfs-sink-5.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-5.hdfs.rollSize=0 flume1.sinks.hdfs-sink-6.type = hdfs flume1.sinks.hdfs-sink-6.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-6.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-6.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-6.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-6.hdfs.path = /user/root/log/bluecoat flume1.sinks.hdfs-sink-6.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-6.hdfs.rollSize=0 flume1.sinks.hdfs-sink-7.type = hdfs flume1.sinks.hdfs-sink-7.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-7.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-7.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-7.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-7.hdfs.path = /user/root/log/otros flume1.sinks.hdfs-sink-7.hdfs.rollCount=1000 flume1.sinks.hdfs-sink-7.hdfs.rollSize=0
and the logs are like this:
{"@version":"1","@timestamp":"2015-12-09T20:09:36.000Z","client":"ccu","product":"smgsyslog","type":"syslog","host":"datos01","path":"/datos/logs/clientes/ccu/antispam/syslog/scanner02_smg_ccu_syslog.log","nombre":["mail2","mail2"],"proceso":["bmserver","bmserver"],"resto":["[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"<!--(?:(?:\\\\s*<\\\\/?[a-z]+[^>]+>)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\".","[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"<!--(?:(?:\\\\s*<\\\\/?[a-z]+[^>]+>)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\"."]}
So even if the product header match it goes to default selector (channel-7).
Please help me with this,
Created 12-16-2015 07:11 AM
I solved the problem. I had to created a java custom interceptor (based in the one you sent me), compile it with maven and paste it in the flume-ng dir.
Thanks pdvorak for all the help 🙂
Created 12-14-2015 03:18 PM
Yes I tried that. All the fields are set as headers but the message is transformed by:
event.setBody("Message modified by Jsoninterceptor".getBytes());
And it becomes unusefull because I need the log as the original.
I tried to change the JsonIntersepter.java file in the .jar using vim but it can't be done, I think that is because the .class file. Also tried to create a java morphline but i can't get it compile correctly.
morphlines : [ java { imports : """ import java.util.List; import java.util.Map; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.apache.log4j.Logger; """ code: """ Map<String, String> headers = event.getHeaders(); // example: add / remove headers if (headers.containsKey("product")) { headers.put("product", headers.get("product")); } if (headers.containKey("client")){ headers.put("client", headers.get("client")); } return event; """ } ]
Regards,
Created 12-16-2015 06:48 AM
If you wanted to use the morphline interceptor, you could simply use a grok statement to extract the information you need and set it as a new field that becomes a header: http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#grok
Here is a grok debugger that you can use to make sure your grok pattern will match your input string: http://grokdebug.herokuapp.com/
Created 12-16-2015 07:11 AM
I solved the problem. I had to created a java custom interceptor (based in the one you sent me), compile it with maven and paste it in the flume-ng dir.
Thanks pdvorak for all the help 🙂