Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Flafka selector doesn't work

avatar
Contributor

Hello, I'm trying to split my data depending on the header of the log (json). My source is Kafka and my sinks are HDFS folders. When I run flume all the data goes to de default selector even if there is a header match with the mapper. 

 

Here is the Flume config file:

# Sources, channels, and sinks are defined per
# agent name, in this case flume1.
flume1.sources  = kafka-source-1
flume1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7
flume1.sinks    = hdfs-sink-1 hdfs-sink-2 hdfs-sink-3 hdfs-sink-4 hdfs-sink-5 hdfs-sink-6 hdfs-sink-7
 
# For each source, channel, and sink, set
# standard properties.
flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect = 192.168.70.23:2181
flume1.sources.kafka-source-1.topic = kafkatopic
flume1.sources.kafka-source-1.batchSize = 1000
flume1.sources.kafka-source-1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7

flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-2.channel = hdfs-channel-2
flume1.sinks.hdfs-sink-3.channel = hdfs-channel-3
flume1.sinks.hdfs-sink-4.channel = hdfs-channel-4
flume1.sinks.hdfs-sink-5.channel = hdfs-channel-5
flume1.sinks.hdfs-sink-6.channel = hdfs-channel-6
flume1.sinks.hdfs-sink-7.channel = hdfs-channel-7
 
flume1.channels.hdfs-channel-1.type   = memory
flume1.channels.hdfs-channel-2.type   = memory
flume1.channels.hdfs-channel-3.type   = memory
flume1.channels.hdfs-channel-4.type   = memory
flume1.channels.hdfs-channel-5.type   = memory
flume1.channels.hdfs-channel-6.type   = memory
flume1.channels.hdfs-channel-7.type   = memory

flume1.channels.hdfs-channel-1.capacity = 10000
flume1.channels.hdfs-channel-1.transactionCapacity = 1000
flume1.channels.hdfs-channel-2.capacity = 10000
flume1.channels.hdfs-channel-2.transactionCapacity = 1000
flume1.channels.hdfs-channel-3.capacity = 10000
flume1.channels.hdfs-channel-3.transactionCapacity = 1000
flume1.channels.hdfs-channel-4.capacity = 10000
flume1.channels.hdfs-channel-4.transactionCapacity = 1000
flume1.channels.hdfs-channel-5.capacity = 10000
flume1.channels.hdfs-channel-5.transactionCapacity = 1000
flume1.channels.hdfs-channel-6.capacity = 10000
flume1.channels.hdfs-channel-6.transactionCapacity = 1000
flume1.channels.hdfs-channel-7.capacity = 10000
flume1.channels.hdfs-channel-7.transactionCapacity = 1000



#checkpoint,smgsyslog, sepsyslog, pgp, bluecoat-syslog,bluecoat
# channel selector configuration
flume1.sources.kafka-source-1.selector.type = multiplexing
flume1.sources.kafka-source-1.selector.header = product
flume1.sources.kafka-source-1.selector.mapping.ckeckpoint = hdfs-channel-1
flume1.sources.kafka-source-1.selector.mapping.smgsyslog = hdfs-channel-2
flume1.sources.kafka-source-1.selector.mapping.sepsyslog = hdfs-channel-3
flume1.sources.kafka-source-1.selector.mapping.pgp = hdfs-channel-4
flume1.sources.kafka-source-1.selector.mapping.bluecoat-syslog = hdfs-channel-5
flume1.sources.kafka-source-1.selector.mapping.bluecoat = hdfs-channel-6
flume1.sources.kafka-source-1.selector.default = hdfs-channel-7

# sinks configuration
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path = /user/root/logs/checkpoint
flume1.sinks.hdfs-sink-1.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0

flume1.sinks.hdfs-sink-2.type = hdfs
flume1.sinks.hdfs-sink-2.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-2.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-2.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-2.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-2.hdfs.path = /user/root/logs/smgsyslog
flume1.sinks.hdfs-sink-2.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-2.hdfs.rollSize=0

flume1.sinks.hdfs-sink-3.type = hdfs
flume1.sinks.hdfs-sink-3.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-3.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-3.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-3.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-3.hdfs.path = /user/root/logs/sepsyslog
flume1.sinks.hdfs-sink-3.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-3.hdfs.rollSize=0

flume1.sinks.hdfs-sink-4.type = hdfs
flume1.sinks.hdfs-sink-4.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-4.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-4.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-4.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-4.hdfs.path = /user/root/logs/pgp
flume1.sinks.hdfs-sink-4.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-4.hdfs.rollSize=0

flume1.sinks.hdfs-sink-5.type = hdfs
flume1.sinks.hdfs-sink-5.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-5.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-5.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-5.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-5.hdfs.path = /user/root/logs/bluecoatsyslog
flume1.sinks.hdfs-sink-5.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-5.hdfs.rollSize=0

flume1.sinks.hdfs-sink-6.type = hdfs
flume1.sinks.hdfs-sink-6.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-6.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-6.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-6.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-6.hdfs.path = /user/root/log/bluecoat
flume1.sinks.hdfs-sink-6.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-6.hdfs.rollSize=0

flume1.sinks.hdfs-sink-7.type = hdfs
flume1.sinks.hdfs-sink-7.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-7.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-7.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-7.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-7.hdfs.path = /user/root/log/otros
flume1.sinks.hdfs-sink-7.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-7.hdfs.rollSize=0

and the logs are like this:

{"@version":"1","@timestamp":"2015-12-09T20:09:36.000Z","client":"ccu","product":"smgsyslog","type":"syslog","host":"datos01","path":"/datos/logs/clientes/ccu/antispam/syslog/scanner02_smg_ccu_syslog.log","nombre":["mail2","mail2"],"proceso":["bmserver","bmserver"],"resto":["[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"<!--(?:(?:\\\\s*<\\\\/?[a-z]+[^>]+>)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\".","[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"<!--(?:(?:\\\\s*<\\\\/?[a-z]+[^>]+>)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\"."]}

So even if the product header match it goes to default selector (channel-7).

Please help me with this,

 

1 ACCEPTED SOLUTION

avatar
Contributor

I solved the problem. I had to created a java custom interceptor (based in the one you sent me), compile it with maven and paste it in the flume-ng dir. 

 

Thanks pdvorak for all the help 🙂

View solution in original post

12 REPLIES 12

avatar
Contributor

Yes I tried that. All the fields are set as headers but the message is transformed by:

 

event.setBody("Message modified by Jsoninterceptor".getBytes());

And it becomes unusefull because I need the log as the original.

I tried to change the JsonIntersepter.java file in the .jar using vim but it can't be done, I think that is because the .class file. Also tried to create a java morphline but i can't get it compile correctly.

 

morphlines : [
    java {
        imports : """
            import java.util.List;
            import java.util.Map;
            import org.apache.flume.Context;
            import org.apache.flume.Event;
            import org.apache.flume.interceptor.Interceptor;
            import org.apache.log4j.Logger;
        """
    code: """ 
         Map<String, String> headers = event.getHeaders();
     
         // example: add / remove headers
         if (headers.containsKey("product")) {
        headers.put("product", headers.get("product"));
         }
         if (headers.containKey("client")){
        headers.put("client", headers.get("client"));  
         }
     
         return event;
      
      """
    }
]

Regards,

 

 

 

 

 

 

avatar

If you wanted to use the morphline interceptor, you could simply use a grok statement to extract the information you need and set it as a new field that becomes a header: http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#grok

 

Here is a grok debugger that you can use to make sure your grok pattern will match your input string: http://grokdebug.herokuapp.com/

 

 

avatar
Contributor

I solved the problem. I had to created a java custom interceptor (based in the one you sent me), compile it with maven and paste it in the flume-ng dir. 

 

Thanks pdvorak for all the help 🙂