Created on 11-20-2019 06:35 AM - last edited on 11-20-2019 05:40 PM by ask_bill_brooks
Hello, I'm developing a use case using flume to save data in kafka topic.
I'm traing to tail files from a local data source directory and save them in the same partition of a kafka topic. I want to use as a key of the message the absolute path of the file.
I have configured a TailDir source that save the absolute path of a file but I can't configure the kafka sink in order to assign the path as the key of the message in order to save all the events from the same file in the same kafka topic partition.
In more details, when I consume the message with the kafka-console-consumer, I can see that the key is 'null' and the absolute path is in the body message.
In the flume documentation I didn't find much information on how to deal with this issue, could someone halps me? tnks 🙂
source agent3a.sources.source3a.type = TAILDIR agent3a.sources.source3a.filegroups= f1 agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/ agent3a.sources.source3a.channels= channel3a agent3a.sources.source3a.batchSize=1 agent3a.sources.source3a.fileHeader= True Memory Channel agent3a.channels.channel3a.type = memory agent3a.channels.channel3a.transactionCapacity = 100 agent3a.channels.channel3a.capacity = 100 Sink agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port agent3a.sinks.sink3a.kafka.topic= topic_test agent3a.sinks.sink3a.kafka.producer.acks=1 agent3a.sinks.sink3a.channel= channel3a agent3a.sinks.sink3a.FlumeBatchSize=1 agent3a.sinks.sink3a.useFlumeEventFormat = true agent3a.sinks.sink3a.kafka.producer.batch.size=10
Created 11-21-2019 07:37 AM
Finally I found how to configure the agent in order to assign the key for partitionin in kafka topic using the absolute path of the files in the data source.
I post the answer in case someone will need it.
It is necessary to set the property 'fileHeaderKey=key". In this way, when the event is passed to kafka sink the header contains the pairs key =absolute/path/of /the/file and kafka can use it as the key in its message.
agent3a.sources= source3a agent3a.channels= channel3a agent3a.sinkss= sink3a source agent3a.sources.source3a.type = TAILDIR agent3a.sources.source3a.filegroups= f1 agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/ agent3a.sources.source3a.channels= channel3a agent3a.sources.source3a.batchSize=1 agent3a.sources.source3a.fileHeader= True agent3a.sources.source3a.fileHeaderKey= key #####property to set the fileHeader the key for partitioning### Memory Channel agent3a.channels.channel3a.type = memory agent3a.channels.channel3a.transactionCapacity = 100 agent3a.channels.channel3a.capacity = 100 Sink agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port agent3a.sinks.sink3a.kafka.topic= topic_test agent3a.sinks.sink3a.kafka.producer.acks=1 agent3a.sinks.sink3a.channel= channel3a agent3a.sinks.sink3a.FlumeBatchSize=1 agent3a.sinks.sink3a.useFlumeEventFormat = true agent3a.sinks.sink3a.kafka.producer.batch.size=10
It is, alternatively possibile to set the key for partitioning also using interceptors: it is only necessary to set its property '.key=key'
bye 🙂
Created 11-21-2019 07:37 AM
Finally I found how to configure the agent in order to assign the key for partitionin in kafka topic using the absolute path of the files in the data source.
I post the answer in case someone will need it.
It is necessary to set the property 'fileHeaderKey=key". In this way, when the event is passed to kafka sink the header contains the pairs key =absolute/path/of /the/file and kafka can use it as the key in its message.
agent3a.sources= source3a agent3a.channels= channel3a agent3a.sinkss= sink3a source agent3a.sources.source3a.type = TAILDIR agent3a.sources.source3a.filegroups= f1 agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/ agent3a.sources.source3a.channels= channel3a agent3a.sources.source3a.batchSize=1 agent3a.sources.source3a.fileHeader= True agent3a.sources.source3a.fileHeaderKey= key #####property to set the fileHeader the key for partitioning### Memory Channel agent3a.channels.channel3a.type = memory agent3a.channels.channel3a.transactionCapacity = 100 agent3a.channels.channel3a.capacity = 100 Sink agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port agent3a.sinks.sink3a.kafka.topic= topic_test agent3a.sinks.sink3a.kafka.producer.acks=1 agent3a.sinks.sink3a.channel= channel3a agent3a.sinks.sink3a.FlumeBatchSize=1 agent3a.sinks.sink3a.useFlumeEventFormat = true agent3a.sinks.sink3a.kafka.producer.batch.size=10
It is, alternatively possibile to set the key for partitioning also using interceptors: it is only necessary to set its property '.key=key'
bye 🙂