Support Questions

Find answers, ask questions, and share your expertise

Flafka: assign key message to kafka sink from taildir source header

avatar
New Contributor

Hello, I'm  developing a use case using flume to save data in kafka topic.

I'm traing to tail  files from a local data source  directory and  save them in the same  partition of a kafka topic. I want to use as a key of the message the absolute path of the file.

I have configured a TailDir source that save the absolute path of a file but I can't configure the kafka sink in order to assign the path as the key of the message in order to save all the events from the same file in the same kafka topic partition.

In more details, when I consume the message with the kafka-console-consumer, I can see that the key is 'null' and the absolute path is in the body message.

In the flume documentation I didn't find much information on how to deal with this issue, could someone halps me? tnks 🙂

source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True

Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100

Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink 
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10 

 

1 ACCEPTED SOLUTION

avatar
New Contributor

Finally I found how to configure the agent in order to assign the key for partitionin in kafka topic using the absolute path of the files in the data source.

I post the answer in case someone will need it.

It is necessary to set the property 'fileHeaderKey=key". In this way, when the event is passed to kafka sink the header contains the pairs key =absolute/path/of /the/file and kafka can use it as the key in its message.

agent3a.sources= source3a 
agent3a.channels= channel3a
agent3a.sinkss= sink3a

source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True
agent3a.sources.source3a.fileHeaderKey= key #####property to set the fileHeader the 
                                                  key for partitioning###

Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100

Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink 
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10

 It is, alternatively possibile to set the key for partitioning also using interceptors: it is only necessary to set its property '.key=key'

bye 🙂

View solution in original post

1 REPLY 1

avatar
New Contributor

Finally I found how to configure the agent in order to assign the key for partitionin in kafka topic using the absolute path of the files in the data source.

I post the answer in case someone will need it.

It is necessary to set the property 'fileHeaderKey=key". In this way, when the event is passed to kafka sink the header contains the pairs key =absolute/path/of /the/file and kafka can use it as the key in its message.

agent3a.sources= source3a 
agent3a.channels= channel3a
agent3a.sinkss= sink3a

source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True
agent3a.sources.source3a.fileHeaderKey= key #####property to set the fileHeader the 
                                                  key for partitioning###

Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100

Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink 
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10

 It is, alternatively possibile to set the key for partitioning also using interceptors: it is only necessary to set its property '.key=key'

bye 🙂