Reply
Contributor
Posts: 55
Registered: ‎02-09-2015

Flume Kafka Sink specify partition to send data to

Hi guys,

 

i am using Flume 1.7 , and Kafka 0.10.0 , i have 1 topic with 2 partitions , and 2 flume sources i want each source to send its messages to certain partition id , also assign client.id to each source, below my configuration 

 

a1.sources = r1 r2
a1.channels = c1 c2
a1.sinks = k1 k2
 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000


a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 10000
a1.channels.c2.byteCapacityBufferPercentage = 20
a1.channels.c2.byteCapacity = 800000

a1.sources.r1.type = taildir
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/cloudera/FlumeDir/taildir_position2.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/cloudera/Public/another.*

a1.sources.r2.type = taildir
a1.sources.r2.channels = c2
a1.sources.r2.positionFile = /home/cloudera/FlumeDir/taildir_position3.json
a1.sources.r2.filegroups = f2
a1.sources.r2.filegroups.f2 = /home/cloudera/Public/file_log.log

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.kafka.topic = multiple_partitions
a1.sinks.k1.kafka.client.id = agent0
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.184.132:9092
a1.sinks.k1.kafka.flumeBatchSize = 1000
a1.sinks.k1.kafka.defaultPartitionId = 0
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.channel = c2
a1.sinks.k2.kafka.topic = multiple_partitions
a1.sinks.k2.kafka.defaultPartitionId = 1
a1.sinks.k2.kafka.client.id = agent1
a1.sinks.k2.kafka.bootstrap.servers = 192.168.184.132:9092
a1.sinks.k2.kafka.flumeBatchSize = 1000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.kafka.producer.linger.ms = 1
a1.sinks.k2.kafka.producer.compression.type = snappy

when i start flume agent it , it doesn't read either client.id or partitionid 

 

INFO producer.ProducerConfig: ProducerConfig values: 
	compression.type = snappy
	metric.reporters = []
	metadata.max.age.ms = 300000
	metadata.fetch.timeout.ms = 60000
	reconnect.backoff.ms = 50
	sasl.kerberos.ticket.renew.window.factor = 0.8
	bootstrap.servers = [192.168.184.132:9092]
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	buffer.memory = 33554432
	timeout.ms = 30000
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	ssl.keystore.type = JKS
	ssl.trustmanager.algorithm = PKIX
	block.on.buffer.full = false
	ssl.key.password = null
	max.block.ms = 60000
	sasl.kerberos.min.time.before.relogin = 60000
	connections.max.idle.ms = 540000
	ssl.truststore.password = null
	max.in.flight.requests.per.connection = 5
	metrics.num.samples = 2
	client.id = 
	ssl.endpoint.identification.algorithm = null
	ssl.protocol = TLS
	request.timeout.ms = 30000
	ssl.provider = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	acks = 1
	batch.size = 16384
	ssl.keystore.location = null
	receive.buffer.bytes = 32768
	ssl.cipher.suites = null
	ssl.truststore.type = JKS
	security.protocol = PLAINTEXT
	retries = 0
	max.request.size = 1048576
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	ssl.truststore.location = null
	ssl.keystore.password = null
	ssl.keymanager.algorithm = SunX509
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	send.buffer.bytes = 131072
	linger.ms = 1

from configuration file , i assigned "agent 1" to send data to partiton id "1" , and "agent 0" to send data to partion id "0" , when i write to "agent 1" i got half the data in partition 1 and other half in "agent 0 " which means configuration "kafka.defaultPartitionId = 1" is overriden to be the default partitioning "roundrobin" .

Posts: 142
Topics: 7
Kudos: 15
Solutions: 14
Registered: ‎07-16-2015

Re: Flume Kafka Sink specify partition to send data to

[ Edited ]

I have not used flume so take my answer with all the care it needs.

 

When a message is "produced" and put into Kafka, it will determine on which partition the message will be "written".

There is 3 differents behaviour :

- You don't specify a KEY with the message : message wil be distributed in a round robin fashion between the partitions

- You specify a KEY with the message : All messages with the same key will be put into the same partition (according to the logic of the default partionner)

- You specify a KEY AND you override the partitionner logic yourself

 

From your use case, I think you are on the 1st option. Your messages do not have a key attached, so they are distributed "evenly" between the partitions.

This is not like the "subscriber" that can read directly from a partition.

 

If you really want to write inside 1 partition per flume source, then you need to determined a static key value per flume source and then attached it to each messages.

 

I don't know if this is something the sink can take care of.

I also don't know if the flume KafkaSink "override" this normal behavior of Kafka on this mater.

 

 

https://simplydistributed.wordpress.com/2016/12/13/kafka-partitioning/

Chapter "Data and Partitions: the relationship"

Contributor
Posts: 55
Registered: ‎02-09-2015

Re: Flume Kafka Sink specify partition to send data to

" You specify a KEY AND you override the partitionner logic yourself" actually i didn't specify a key for the message i just assigned it to certain partition, my question is about how to set the client ID and receive it in the consumer side, i found that it possible using interceptor but still i don't receive that in the event header
Highlighted
Cloudera Employee
Posts: 184
Registered: ‎01-09-2014

Re: Flume Kafka Sink specify partition to send data to

You need to specify kafka.consumer in the flume config, for consumer properties:

a1.sinks.k2.kafka.consumer.client.id = agent1
a1.sinks.k1.kafka.consumer.client.id = agent0

-pd
Announcements