Member since
11-20-2019
2
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1042 | 11-21-2019 07:37 AM |
11-21-2019
07:37 AM
Finally I found how to configure the agent in order to assign the key for partitionin in kafka topic using the absolute path of the files in the data source. I post the answer in case someone will need it. It is necessary to set the property 'fileHeaderKey=key". In this way, when the event is passed to kafka sink the header contains the pairs key =absolute/path/of /the/file and kafka can use it as the key in its message. agent3a.sources= source3a
agent3a.channels= channel3a
agent3a.sinkss= sink3a
source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True
agent3a.sources.source3a.fileHeaderKey= key #####property to set the fileHeader the
key for partitioning###
Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100
Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10 It is, alternatively possibile to set the key for partitioning also using interceptors: it is only necessary to set its property '.key=key' bye 🙂
... View more