<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Flafka selector doesn't work in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35001#M11711</link>
    <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;&amp;nbsp; Can you please add a logging channel and logger sink to your flume configuration? &amp;nbsp;This would show, in the solr-cmf logs, exactly what headers are set for the events coming from your kafka source.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;You would need to add something like this (multiple channels on the default selector will be replicating):&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;&lt;BR /&gt;flume1.sources.kafka-source-1.selector.default = hdfs-channel-7 logChannel&lt;BR /&gt;flume1.channels.logChannel.type = memory&amp;nbsp;&lt;BR /&gt;flume1.sinks.logSink.type = logger&lt;BR /&gt;flume1.sinks.logSink.channel = logChannel&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Wed, 09 Dec 2015 22:21:51 GMT</pubDate>
    <dc:creator>pdvorak</dc:creator>
    <dc:date>2015-12-09T22:21:51Z</dc:date>
    <item>
      <title>Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/34997#M11710</link>
      <description>&lt;P&gt;Hello, I'm trying to split my data depending on the header of the log (json). My source is Kafka and my sinks are HDFS folders. When I run flume all the data goes to de default selector even if there is a header match with the mapper.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is the Flume config file:&lt;/P&gt;&lt;PRE&gt;# Sources, channels, and sinks are defined per
# agent name, in this case flume1.
flume1.sources  = kafka-source-1
flume1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7
flume1.sinks    = hdfs-sink-1 hdfs-sink-2 hdfs-sink-3 hdfs-sink-4 hdfs-sink-5 hdfs-sink-6 hdfs-sink-7
 
# For each source, channel, and sink, set
# standard properties.
flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect = 192.168.70.23:2181
flume1.sources.kafka-source-1.topic = kafkatopic
flume1.sources.kafka-source-1.batchSize = 1000
flume1.sources.kafka-source-1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7

flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-2.channel = hdfs-channel-2
flume1.sinks.hdfs-sink-3.channel = hdfs-channel-3
flume1.sinks.hdfs-sink-4.channel = hdfs-channel-4
flume1.sinks.hdfs-sink-5.channel = hdfs-channel-5
flume1.sinks.hdfs-sink-6.channel = hdfs-channel-6
flume1.sinks.hdfs-sink-7.channel = hdfs-channel-7
 
flume1.channels.hdfs-channel-1.type   = memory
flume1.channels.hdfs-channel-2.type   = memory
flume1.channels.hdfs-channel-3.type   = memory
flume1.channels.hdfs-channel-4.type   = memory
flume1.channels.hdfs-channel-5.type   = memory
flume1.channels.hdfs-channel-6.type   = memory
flume1.channels.hdfs-channel-7.type   = memory

flume1.channels.hdfs-channel-1.capacity = 10000
flume1.channels.hdfs-channel-1.transactionCapacity = 1000
flume1.channels.hdfs-channel-2.capacity = 10000
flume1.channels.hdfs-channel-2.transactionCapacity = 1000
flume1.channels.hdfs-channel-3.capacity = 10000
flume1.channels.hdfs-channel-3.transactionCapacity = 1000
flume1.channels.hdfs-channel-4.capacity = 10000
flume1.channels.hdfs-channel-4.transactionCapacity = 1000
flume1.channels.hdfs-channel-5.capacity = 10000
flume1.channels.hdfs-channel-5.transactionCapacity = 1000
flume1.channels.hdfs-channel-6.capacity = 10000
flume1.channels.hdfs-channel-6.transactionCapacity = 1000
flume1.channels.hdfs-channel-7.capacity = 10000
flume1.channels.hdfs-channel-7.transactionCapacity = 1000



#checkpoint,smgsyslog, sepsyslog, pgp, bluecoat-syslog,bluecoat
# channel selector configuration
flume1.sources.kafka-source-1.selector.type = multiplexing
flume1.sources.kafka-source-1.selector.header = product
flume1.sources.kafka-source-1.selector.mapping.ckeckpoint = hdfs-channel-1
flume1.sources.kafka-source-1.selector.mapping.smgsyslog = hdfs-channel-2
flume1.sources.kafka-source-1.selector.mapping.sepsyslog = hdfs-channel-3
flume1.sources.kafka-source-1.selector.mapping.pgp = hdfs-channel-4
flume1.sources.kafka-source-1.selector.mapping.bluecoat-syslog = hdfs-channel-5
flume1.sources.kafka-source-1.selector.mapping.bluecoat = hdfs-channel-6
flume1.sources.kafka-source-1.selector.default = hdfs-channel-7

# sinks configuration
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path = /user/root/logs/checkpoint
flume1.sinks.hdfs-sink-1.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0

flume1.sinks.hdfs-sink-2.type = hdfs
flume1.sinks.hdfs-sink-2.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-2.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-2.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-2.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-2.hdfs.path = /user/root/logs/smgsyslog
flume1.sinks.hdfs-sink-2.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-2.hdfs.rollSize=0

flume1.sinks.hdfs-sink-3.type = hdfs
flume1.sinks.hdfs-sink-3.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-3.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-3.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-3.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-3.hdfs.path = /user/root/logs/sepsyslog
flume1.sinks.hdfs-sink-3.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-3.hdfs.rollSize=0

flume1.sinks.hdfs-sink-4.type = hdfs
flume1.sinks.hdfs-sink-4.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-4.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-4.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-4.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-4.hdfs.path = /user/root/logs/pgp
flume1.sinks.hdfs-sink-4.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-4.hdfs.rollSize=0

flume1.sinks.hdfs-sink-5.type = hdfs
flume1.sinks.hdfs-sink-5.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-5.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-5.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-5.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-5.hdfs.path = /user/root/logs/bluecoatsyslog
flume1.sinks.hdfs-sink-5.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-5.hdfs.rollSize=0

flume1.sinks.hdfs-sink-6.type = hdfs
flume1.sinks.hdfs-sink-6.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-6.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-6.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-6.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-6.hdfs.path = /user/root/log/bluecoat
flume1.sinks.hdfs-sink-6.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-6.hdfs.rollSize=0

flume1.sinks.hdfs-sink-7.type = hdfs
flume1.sinks.hdfs-sink-7.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-7.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-7.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-7.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-7.hdfs.path = /user/root/log/otros
flume1.sinks.hdfs-sink-7.hdfs.rollCount=1000
flume1.sinks.hdfs-sink-7.hdfs.rollSize=0&lt;/PRE&gt;&lt;P&gt;and&amp;nbsp;the logs are like this:&lt;/P&gt;&lt;PRE&gt;{"@version":"1","@timestamp":"2015-12-09T20:09:36.000Z","client":"ccu","product":"smgsyslog","type":"syslog","host":"datos01","path":"/datos/logs/clientes/ccu/antispam/syslog/scanner02_smg_ccu_syslog.log","nombre":["mail2","mail2"],"proceso":["bmserver","bmserver"],"resto":["[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"&amp;lt;!--(?:(?:\\\\s*&amp;lt;\\\\/?[a-z]+[^&amp;gt;]+&amp;gt;)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\".","[Brightmail] (INFO:18607.3833596816): [36087] Spamhunter module: pcre_exec failed, match limit exceeded: regex \"&amp;lt;!--(?:(?:\\\\s*&amp;lt;\\\\/?[a-z]+[^&amp;gt;]+&amp;gt;)+\\\\s*[-\\\\w,;:\\\\. ]+){10}\"."]}&lt;/PRE&gt;&lt;P&gt;So even if the product header match it goes to default selector (channel-7).&lt;/P&gt;&lt;P&gt;Please help me with this,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 16 Sep 2022 09:52:11 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/34997#M11710</guid>
      <dc:creator>JoaquinS</dc:creator>
      <dc:date>2022-09-16T09:52:11Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35001#M11711</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;&amp;nbsp; Can you please add a logging channel and logger sink to your flume configuration? &amp;nbsp;This would show, in the solr-cmf logs, exactly what headers are set for the events coming from your kafka source.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;You would need to add something like this (multiple channels on the default selector will be replicating):&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;&lt;BR /&gt;flume1.sources.kafka-source-1.selector.default = hdfs-channel-7 logChannel&lt;BR /&gt;flume1.channels.logChannel.type = memory&amp;nbsp;&lt;BR /&gt;flume1.sinks.logSink.type = logger&lt;BR /&gt;flume1.sinks.logSink.channel = logChannel&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 09 Dec 2015 22:21:51 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35001#M11711</guid>
      <dc:creator>pdvorak</dc:creator>
      <dc:date>2015-12-09T22:21:51Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35036#M11712</link>
      <description>&lt;P&gt;This is the result:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;2015-12-10 09:38:59,065 INFO org.apache.solr.servlet.SolrDispatchFilter: [admin] webapp=null path=/admin/cores params={action=STATUS&amp;amp;wt=json} status=0 QTime=0&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;The headers are status and Qtime? and if they are, how can I make that a field of a log is read as a header?.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 10 Dec 2015 12:52:08 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35036#M11712</guid>
      <dc:creator>JoaquinS</dc:creator>
      <dc:date>2015-12-10T12:52:08Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35062#M11713</link>
      <description>&lt;P&gt;I added a interceptor that finds the field&amp;nbsp;&lt;EM&gt;product&lt;/EM&gt; in the log and creates a header with it. This is the code, and is not working. What could be wong?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;#Interceptors setup
flume1.sources.kafka-source-1.interceptors = i1
flume1.sources.kafka-source-1.interceptors.i1.type = regex_extractor
flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\d+)"
flume1.sources.kafka-source-1.interceptors.i1.serializers = ser1
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default
flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.name = product&lt;/PRE&gt;&lt;P&gt;the field product in the log is like this&lt;/P&gt;&lt;PRE&gt;...,"product":"smgsyslog",...&lt;/PRE&gt;</description>
      <pubDate>Thu, 10 Dec 2015 19:44:36 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35062#M11713</guid>
      <dc:creator>JoaquinS</dc:creator>
      <dc:date>2015-12-10T19:44:36Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35073#M11714</link>
      <description>&lt;P&gt;If your events coming from kafka are in json format, you could put together a quick json interceptor, and that way all your fields in json would get populated as flume headers.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here are some examples:&lt;/P&gt;&lt;P&gt;&lt;A href="http://mmolimar.blogspot.com/2015/01/analyzing-tweets-from-flume-in-kibana.html" target="_blank"&gt;http://mmolimar.blogspot.com/2015/01/analyzing-tweets-from-flume-in-kibana.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;A href="https://github.com/szaharici/Flume-Json-Interceptor" target="_blank"&gt;https://github.com/szaharici/Flume-Json-Interceptor&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;If you do stick to just the regex interceptor, you are trying to use the (\\d+) to capture a string field, which will not match as the \d is for digits. &amp;nbsp;You'd need to do something like&lt;/P&gt;&lt;PRE&gt;flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\w+)"&lt;/PRE&gt;&lt;P&gt;Which will match any word characters: &lt;A href="http://www.w3schools.com/jsref/jsref_regexp_wordchar.asp" target="_blank"&gt;http://www.w3schools.com/jsref/jsref_regexp_wordchar.asp&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I would recommend creating a json interceptor though, as that will give you the most flexibility, and all your json fields will be populated in the headers&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 10 Dec 2015 23:11:25 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35073#M11714</guid>
      <dc:creator>pdvorak</dc:creator>
      <dc:date>2015-12-10T23:11:25Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35088#M11715</link>
      <description>&lt;P&gt;I changed the regex but still not working. the whole config file is this:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;# Sources, channels, and sinks are defined per&lt;BR /&gt;# agent name, in this case flume1.&lt;BR /&gt;flume1.sources = kafka-source-1&lt;BR /&gt;flume1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 logChannel &lt;BR /&gt;flume1.sinks = hdfs-sink-1 hdfs-sink-2 hdfs-sink-3 hdfs-sink-4 hdfs-sink-5 hdfs-sink-6 hdfs-sink-7 logSink&lt;BR /&gt; &lt;BR /&gt;# For each source, channel, and sink, set&lt;BR /&gt;# standard properties.&lt;BR /&gt;flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource&lt;BR /&gt;flume1.sources.kafka-source-1.zookeeperConnect = 192.168.70.23:2181&lt;BR /&gt;flume1.sources.kafka-source-1.topic = kafkatopic&lt;BR /&gt;flume1.sources.kafka-source-1.batchSize = 1000&lt;BR /&gt;flume1.sources.kafka-source-1.channels = hdfs-channel-1 hdfs-channel-2 hdfs-channel-3 hdfs-channel-4 hdfs-channel-5 hdfs-channel-6 hdfs-channel-7 logChannel&lt;BR /&gt;&lt;BR /&gt;flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1&lt;BR /&gt;flume1.sinks.hdfs-sink-2.channel = hdfs-channel-2&lt;BR /&gt;flume1.sinks.hdfs-sink-3.channel = hdfs-channel-3&lt;BR /&gt;flume1.sinks.hdfs-sink-4.channel = hdfs-channel-4&lt;BR /&gt;flume1.sinks.hdfs-sink-5.channel = hdfs-channel-5&lt;BR /&gt;flume1.sinks.hdfs-sink-6.channel = hdfs-channel-6&lt;BR /&gt;flume1.sinks.hdfs-sink-7.channel = hdfs-channel-7&lt;BR /&gt;flume1.sinks.logSink.channel = logChannel&lt;BR /&gt; &lt;BR /&gt;flume1.channels.hdfs-channel-1.type = memory&lt;BR /&gt;flume1.channels.hdfs-channel-2.type = memory&lt;BR /&gt;flume1.channels.hdfs-channel-3.type = memory&lt;BR /&gt;flume1.channels.hdfs-channel-4.type = memory&lt;BR /&gt;flume1.channels.hdfs-channel-5.type = memory&lt;BR /&gt;flume1.channels.hdfs-channel-6.type = memory&lt;BR /&gt;flume1.channels.hdfs-channel-7.type = memory&lt;BR /&gt;flume1.channels.logChannel.type = memory&lt;BR /&gt;&lt;BR /&gt;flume1.channels.hdfs-channel-1.capacity = 10000&lt;BR /&gt;flume1.channels.hdfs-channel-1.transactionCapacity = 1000&lt;BR /&gt;flume1.channels.hdfs-channel-2.capacity = 10000&lt;BR /&gt;flume1.channels.hdfs-channel-2.transactionCapacity = 1000&lt;BR /&gt;flume1.channels.hdfs-channel-3.capacity = 10000&lt;BR /&gt;flume1.channels.hdfs-channel-3.transactionCapacity = 1000&lt;BR /&gt;flume1.channels.hdfs-channel-4.capacity = 10000&lt;BR /&gt;flume1.channels.hdfs-channel-4.transactionCapacity = 1000&lt;BR /&gt;flume1.channels.hdfs-channel-5.capacity = 10000&lt;BR /&gt;flume1.channels.hdfs-channel-5.transactionCapacity = 1000&lt;BR /&gt;flume1.channels.hdfs-channel-6.capacity = 10000&lt;BR /&gt;flume1.channels.hdfs-channel-6.transactionCapacity = 1000&lt;BR /&gt;flume1.channels.hdfs-channel-7.capacity = 10000&lt;BR /&gt;flume1.channels.hdfs-channel-7.transactionCapacity = 1000&lt;BR /&gt;flume1.channels.logChannel.capacity = 10000&lt;BR /&gt;flume1.channels.logChannel.transactionCapacity = 1000&lt;BR /&gt;&lt;BR /&gt;#Interceptors setup&lt;BR /&gt;flume1.sources.kafka-source-1.interceptors = i1&lt;BR /&gt;flume1.sources.kafka-source-1.interceptors.i1.type = regex_extractor&lt;BR /&gt;flume1.sources.kafka-source-1.interceptors.i1.regex = "product":"(\\w+)"&lt;BR /&gt;flume1.sources.kafka-source-1.interceptors.i1.serializers = ser1&lt;BR /&gt;flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default&lt;BR /&gt;flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.name = product&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;#checkpoint,smgsyslog, sepsyslog, pgp, bluecoat-syslog,bluecoat&lt;BR /&gt;# channel selector configuration&lt;BR /&gt;flume1.sources.kafka-source-1.selector.type = multiplexing&lt;BR /&gt;flume1.sources.kafka-source-1.selector.header = product&lt;BR /&gt;flume1.sources.kafka-source-1.selector.mapping.ckeckpoint = hdfs-channel-1&lt;BR /&gt;flume1.sources.kafka-source-1.selector.mapping.smgsyslog = hdfs-channel-2&lt;BR /&gt;flume1.sources.kafka-source-1.selector.mapping.sepsyslog = hdfs-channel-3&lt;BR /&gt;flume1.sources.kafka-source-1.selector.mapping.pgp = hdfs-channel-4&lt;BR /&gt;flume1.sources.kafka-source-1.selector.mapping.bluecoat-syslog = hdfs-channel-5&lt;BR /&gt;flume1.sources.kafka-source-1.selector.mapping.bluecoat = hdfs-channel-6&lt;BR /&gt;flume1.sources.kafka-source-1.selector.default = hdfs-channel-7 logChannel&lt;BR /&gt;&lt;BR /&gt;# sinks configuration&lt;BR /&gt;flume1.sinks.hdfs-sink-1.type = hdfs&lt;BR /&gt;flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text&lt;BR /&gt;flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream&lt;BR /&gt;flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events&lt;BR /&gt;flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true&lt;BR /&gt;flume1.sinks.hdfs-sink-1.hdfs.path = /user/root/logs/checkpoint&lt;BR /&gt;flume1.sinks.hdfs-sink-1.hdfs.rollCount=1000&lt;BR /&gt;flume1.sinks.hdfs-sink-1.hdfs.rollSize=0&lt;BR /&gt;&lt;BR /&gt;flume1.sinks.hdfs-sink-2.type = hdfs&lt;BR /&gt;flume1.sinks.hdfs-sink-2.hdfs.writeFormat = Text&lt;BR /&gt;flume1.sinks.hdfs-sink-2.hdfs.fileType = DataStream&lt;BR /&gt;flume1.sinks.hdfs-sink-2.hdfs.filePrefix = test-events&lt;BR /&gt;flume1.sinks.hdfs-sink-2.hdfs.useLocalTimeStamp = true&lt;BR /&gt;flume1.sinks.hdfs-sink-2.hdfs.path = /user/root/logs/smgsyslog&lt;BR /&gt;flume1.sinks.hdfs-sink-2.hdfs.rollCount=1000&lt;BR /&gt;flume1.sinks.hdfs-sink-2.hdfs.rollSize=0&lt;BR /&gt;&lt;BR /&gt;flume1.sinks.hdfs-sink-3.type = hdfs&lt;BR /&gt;flume1.sinks.hdfs-sink-3.hdfs.writeFormat = Text&lt;BR /&gt;flume1.sinks.hdfs-sink-3.hdfs.fileType = DataStream&lt;BR /&gt;flume1.sinks.hdfs-sink-3.hdfs.filePrefix = test-events&lt;BR /&gt;flume1.sinks.hdfs-sink-3.hdfs.useLocalTimeStamp = true&lt;BR /&gt;flume1.sinks.hdfs-sink-3.hdfs.path = /user/root/logs/sepsyslog&lt;BR /&gt;flume1.sinks.hdfs-sink-3.hdfs.rollCount=1000&lt;BR /&gt;flume1.sinks.hdfs-sink-3.hdfs.rollSize=0&lt;BR /&gt;&lt;BR /&gt;flume1.sinks.hdfs-sink-4.type = hdfs&lt;BR /&gt;flume1.sinks.hdfs-sink-4.hdfs.writeFormat = Text&lt;BR /&gt;flume1.sinks.hdfs-sink-4.hdfs.fileType = DataStream&lt;BR /&gt;flume1.sinks.hdfs-sink-4.hdfs.filePrefix = test-events&lt;BR /&gt;flume1.sinks.hdfs-sink-4.hdfs.useLocalTimeStamp = true&lt;BR /&gt;flume1.sinks.hdfs-sink-4.hdfs.path = /user/root/logs/pgp&lt;BR /&gt;flume1.sinks.hdfs-sink-4.hdfs.rollCount=1000&lt;BR /&gt;flume1.sinks.hdfs-sink-4.hdfs.rollSize=0&lt;BR /&gt;&lt;BR /&gt;flume1.sinks.hdfs-sink-5.type = hdfs&lt;BR /&gt;flume1.sinks.hdfs-sink-5.hdfs.writeFormat = Text&lt;BR /&gt;flume1.sinks.hdfs-sink-5.hdfs.fileType = DataStream&lt;BR /&gt;flume1.sinks.hdfs-sink-5.hdfs.filePrefix = test-events&lt;BR /&gt;flume1.sinks.hdfs-sink-5.hdfs.useLocalTimeStamp = true&lt;BR /&gt;flume1.sinks.hdfs-sink-5.hdfs.path = /user/root/logs/bluecoatsyslog&lt;BR /&gt;flume1.sinks.hdfs-sink-5.hdfs.rollCount=1000&lt;BR /&gt;flume1.sinks.hdfs-sink-5.hdfs.rollSize=0&lt;BR /&gt;&lt;BR /&gt;flume1.sinks.hdfs-sink-6.type = hdfs&lt;BR /&gt;flume1.sinks.hdfs-sink-6.hdfs.writeFormat = Text&lt;BR /&gt;flume1.sinks.hdfs-sink-6.hdfs.fileType = DataStream&lt;BR /&gt;flume1.sinks.hdfs-sink-6.hdfs.filePrefix = test-events&lt;BR /&gt;flume1.sinks.hdfs-sink-6.hdfs.useLocalTimeStamp = true&lt;BR /&gt;flume1.sinks.hdfs-sink-6.hdfs.path = /user/root/logs/bluecoat&lt;BR /&gt;flume1.sinks.hdfs-sink-6.hdfs.rollCount=1000&lt;BR /&gt;flume1.sinks.hdfs-sink-6.hdfs.rollSize=0&lt;BR /&gt;&lt;BR /&gt;flume1.sinks.hdfs-sink-7.type = hdfs&lt;BR /&gt;flume1.sinks.hdfs-sink-7.hdfs.writeFormat = Text&lt;BR /&gt;flume1.sinks.hdfs-sink-7.hdfs.fileType = DataStream&lt;BR /&gt;flume1.sinks.hdfs-sink-7.hdfs.filePrefix = test-events&lt;BR /&gt;flume1.sinks.hdfs-sink-7.hdfs.useLocalTimeStamp = true&lt;BR /&gt;flume1.sinks.hdfs-sink-7.hdfs.path = /user/root/logs/otros&lt;BR /&gt;flume1.sinks.hdfs-sink-7.hdfs.rollCount=1000&lt;BR /&gt;flume1.sinks.hdfs-sink-7.hdfs.rollSize=0&lt;BR /&gt;&lt;BR /&gt;flume1.sinks.logSink.type = logger&lt;BR /&gt; &lt;BR /&gt;# Other properties are specific to each type of&lt;BR /&gt;# source, channel, or sink. In this case, we&lt;BR /&gt;# specify the capacity of the memory channel.&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I think that is somthing wrong with the channels but i dont know what is the problem. The logger output without the interceptor part has two headers,&amp;nbsp;&lt;STRONG&gt;timestamp&amp;nbsp;&lt;/STRONG&gt;and&amp;nbsp;&lt;STRONG&gt;topic&amp;nbsp;&lt;/STRONG&gt;.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 11 Dec 2015 13:27:57 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35088#M11715</guid>
      <dc:creator>JoaquinS</dc:creator>
      <dc:date>2015-12-11T13:27:57Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35104#M11716</link>
      <description>&lt;P&gt;Problem solved.&amp;nbsp;&lt;/P&gt;&lt;P&gt;Instead of using:&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = default&lt;/PRE&gt;&lt;P&gt;changed it for:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;flume1.sources.kafka-source-1.interceptors.i1.serializers.ser1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer&lt;/PRE&gt;&lt;P&gt;And it worked fine.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I have two more questions:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;1) the - (hyphen) cannot be readed as part of a header, so if the value of the header&amp;nbsp;has - its goes to the default and not to the corresponding mapper.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;2) I wanna add a second regex but how can I mapp two headers together, for example:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;flume1.sources.kafka-source-1.selector.header = header1 header2&lt;/P&gt;&lt;P&gt;flume1.sources.kafka-source-1.selector.mapping.(value1)&amp;amp;(value2)&amp;nbsp;= hdfs-channel-x&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;It is possible by doing it without programing it? Because im not a programer.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 11 Dec 2015 18:47:02 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35104#M11716</guid>
      <dc:creator>JoaquinS</dc:creator>
      <dc:date>2015-12-11T18:47:02Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35105#M11717</link>
      <description>&lt;P&gt;Glad to hear you got it working. &amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;You can't concatenate headers together with the flume config. However, the morphline interceptor will allow you more complex functionality for manipulating headers: &amp;nbsp;&lt;A href="http://flume.apache.org/FlumeUserGuide.html#morphline-interceptor" target="_blank"&gt;http://flume.apache.org/FlumeUserGuide.html#morphline-interceptor&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This will allow you to arbitrarily update/delete/modify headers as well as event body, prior to being passed to the channel selector. &amp;nbsp;You can write a morphline that will examine the body and set any headers that you wish.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is the morphlines command reference guide to help you get started:&amp;nbsp;&lt;A href="http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html" target="_blank"&gt;http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;HTH!&lt;/P&gt;</description>
      <pubDate>Fri, 11 Dec 2015 19:16:39 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35105#M11717</guid>
      <dc:creator>pdvorak</dc:creator>
      <dc:date>2015-12-11T19:16:39Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35221#M11718</link>
      <description>&lt;P&gt;Hello, I &amp;nbsp;created a java file with the custom interceptor, but I don't know how to compile it or transform it to a jar file properly. I tested the &lt;STRONG&gt;javac&amp;nbsp;&lt;/STRONG&gt;and the&amp;nbsp;&lt;STRONG&gt;jar&lt;/STRONG&gt; command, put&amp;nbsp;the interceptor builder is not found.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 14 Dec 2015 21:10:57 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35221#M11718</guid>
      <dc:creator>JoaquinS</dc:creator>
      <dc:date>2015-12-14T21:10:57Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35226#M11719</link>
      <description>&lt;P&gt;Did you try this one:&amp;nbsp;&lt;A href="https://github.com/szaharici/Flume-Json-Interceptor" target="_blank"&gt;https://github.com/szaharici/Flume-Json-Interceptor&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Once you have that compiled, you'll want to put that into the flume /var/lib/flume-ng/plugins.d directory in the proper subdirectory (and with proper permissions for flume to read), following the convention here:&amp;nbsp;&lt;A href="http://flume.apache.org/FlumeUserGuide.html#the-plugins-d-directory" target="_blank"&gt;http://flume.apache.org/FlumeUserGuide.html#the-plugins-d-directory&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 14 Dec 2015 22:39:51 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35226#M11719</guid>
      <dc:creator>pdvorak</dc:creator>
      <dc:date>2015-12-14T22:39:51Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35228#M11720</link>
      <description>&lt;P&gt;Yes I tried that. All the fields are set as headers but the message is transformed by:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;event.setBody("Message modified by Jsoninterceptor".getBytes());&lt;/PRE&gt;&lt;P&gt;And it becomes unusefull because I need the log as the original.&lt;/P&gt;&lt;P&gt;I tried to change the &lt;STRONG&gt;JsonIntersepter.java&lt;/STRONG&gt; file in the &lt;STRONG&gt;.jar&amp;nbsp;&lt;/STRONG&gt;using &lt;STRONG&gt;vim&lt;/STRONG&gt; but it can't be done, I think that is because the &lt;STRONG&gt;.class&lt;/STRONG&gt; file. Also tried to create a java morphline but i can't get it compile correctly.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;morphlines : [
    java {
        imports : """
            import java.util.List;
            import java.util.Map;
            import org.apache.flume.Context;
            import org.apache.flume.Event;
            import org.apache.flume.interceptor.Interceptor;
            import org.apache.log4j.Logger;
        """
    code: """ 
         Map&amp;lt;String, String&amp;gt; headers = event.getHeaders();
     
         // example: add / remove headers
         if (headers.containsKey("product")) {
        headers.put("product", headers.get("product"));
         }
         if (headers.containKey("client")){
        headers.put("client", headers.get("client"));  
         }
     
         return event;
      
      """
    }
]&lt;/PRE&gt;&lt;P&gt;Regards,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 14 Dec 2015 23:18:03 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35228#M11720</guid>
      <dc:creator>JoaquinS</dc:creator>
      <dc:date>2015-12-14T23:18:03Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35325#M11721</link>
      <description>&lt;P&gt;If you wanted to use the morphline interceptor, you could simply use a grok statement to extract the information you need and set it as a new field that becomes a header:&amp;nbsp;&lt;A href="http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#grok" target="_blank"&gt;http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#grok&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is a grok debugger that you can use to make sure your grok pattern will match your input string:&amp;nbsp;&lt;A href="http://grokdebug.herokuapp.com/" target="_blank"&gt;http://grokdebug.herokuapp.com/&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 16 Dec 2015 14:48:05 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35325#M11721</guid>
      <dc:creator>pdvorak</dc:creator>
      <dc:date>2015-12-16T14:48:05Z</dc:date>
    </item>
    <item>
      <title>Re: Flafka selector doesn't work</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35326#M11722</link>
      <description>&lt;P&gt;I solved the problem. I had to created a java custom interceptor (based in the one you&amp;nbsp;sent me), compile it with maven and paste it in the flume-ng dir.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Thanks pdvorak for all the help &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 16 Dec 2015 15:11:52 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Flafka-selector-doesn-t-work/m-p/35326#M11722</guid>
      <dc:creator>JoaquinS</dc:creator>
      <dc:date>2015-12-16T15:11:52Z</dc:date>
    </item>
  </channel>
</rss>

