Member since
11-20-2019
2
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1041 | 11-21-2019 07:37 AM |
11-21-2019
07:37 AM
Finally I found how to configure the agent in order to assign the key for partitionin in kafka topic using the absolute path of the files in the data source. I post the answer in case someone will need it. It is necessary to set the property 'fileHeaderKey=key". In this way, when the event is passed to kafka sink the header contains the pairs key =absolute/path/of /the/file and kafka can use it as the key in its message. agent3a.sources= source3a
agent3a.channels= channel3a
agent3a.sinkss= sink3a
source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True
agent3a.sources.source3a.fileHeaderKey= key #####property to set the fileHeader the
key for partitioning###
Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100
Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10 It is, alternatively possibile to set the key for partitioning also using interceptors: it is only necessary to set its property '.key=key' bye 🙂
... View more
11-20-2019
06:35 AM
Hello, I'm developing a use case using flume to save data in kafka topic.
I'm traing to tail files from a local data source directory and save them in the same partition of a kafka topic. I want to use as a key of the message the absolute path of the file.
I have configured a TailDir source that save the absolute path of a file but I can't configure the kafka sink in order to assign the path as the key of the message in order to save all the events from the same file in the same kafka topic partition.
In more details, when I consume the message with the kafka-console-consumer, I can see that the key is 'null' and the absolute path is in the body message.
In the flume documentation I didn't find much information on how to deal with this issue, could someone halps me? tnks 🙂
source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True
Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100
Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Kafka