Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Flume Kafka Sink specify partition to send data to

Highlighted

Flume Kafka Sink specify partition to send data to

Expert Contributor

Hi guys,

 

i am using Flume 1.7 , and Kafka 0.10.0 , i have 1 topic with 2 partitions , and 2 flume sources i want each source to send its messages to certain partition id , also assign client.id to each source, below my configuration 

 

a1.sources = r1 r2
a1.channels = c1 c2
a1.sinks = k1 k2
 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000


a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 10000
a1.channels.c2.byteCapacityBufferPercentage = 20
a1.channels.c2.byteCapacity = 800000

a1.sources.r1.type = taildir
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/cloudera/FlumeDir/taildir_position2.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/cloudera/Public/another.*

a1.sources.r2.type = taildir
a1.sources.r2.channels = c2
a1.sources.r2.positionFile = /home/cloudera/FlumeDir/taildir_position3.json
a1.sources.r2.filegroups = f2
a1.sources.r2.filegroups.f2 = /home/cloudera/Public/file_log.log

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.kafka.topic = multiple_partitions
a1.sinks.k1.kafka.client.id = agent0
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.184.132:9092
a1.sinks.k1.kafka.flumeBatchSize = 1000
a1.sinks.k1.kafka.defaultPartitionId = 0
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.channel = c2
a1.sinks.k2.kafka.topic = multiple_partitions
a1.sinks.k2.kafka.defaultPartitionId = 1
a1.sinks.k2.kafka.client.id = agent1
a1.sinks.k2.kafka.bootstrap.servers = 192.168.184.132:9092
a1.sinks.k2.kafka.flumeBatchSize = 1000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.kafka.producer.linger.ms = 1
a1.sinks.k2.kafka.producer.compression.type = snappy

when i start flume agent it , it doesn't read either client.id or partitionid 

 

INFO producer.ProducerConfig: ProducerConfig values: 
	compression.type = snappy
	metric.reporters = []
	metadata.max.age.ms = 300000
	metadata.fetch.timeout.ms = 60000
	reconnect.backoff.ms = 50
	sasl.kerberos.ticket.renew.window.factor = 0.8
	bootstrap.servers = [192.168.184.132:9092]
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	buffer.memory = 33554432
	timeout.ms = 30000
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	ssl.keystore.type = JKS
	ssl.trustmanager.algorithm = PKIX
	block.on.buffer.full = false
	ssl.key.password = null
	max.block.ms = 60000
	sasl.kerberos.min.time.before.relogin = 60000
	connections.max.idle.ms = 540000
	ssl.truststore.password = null
	max.in.flight.requests.per.connection = 5
	metrics.num.samples = 2
	client.id = 
	ssl.endpoint.identification.algorithm = null
	ssl.protocol = TLS
	request.timeout.ms = 30000
	ssl.provider = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	acks = 1
	batch.size = 16384
	ssl.keystore.location = null
	receive.buffer.bytes = 32768
	ssl.cipher.suites = null
	ssl.truststore.type = JKS
	security.protocol = PLAINTEXT
	retries = 0
	max.request.size = 1048576
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	ssl.truststore.location = null
	ssl.keystore.password = null
	ssl.keymanager.algorithm = SunX509
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	send.buffer.bytes = 131072
	linger.ms = 1

from configuration file , i assigned "agent 1" to send data to partiton id "1" , and "agent 0" to send data to partion id "0" , when i write to "agent 1" i got half the data in partition 1 and other half in "agent 0 " which means configuration "kafka.defaultPartitionId = 1" is overriden to be the default partitioning "roundrobin" .

3 REPLIES 3

Re: Flume Kafka Sink specify partition to send data to

Super Collaborator

I have not used flume so take my answer with all the care it needs.

 

When a message is "produced" and put into Kafka, it will determine on which partition the message will be "written".

There is 3 differents behaviour :

- You don't specify a KEY with the message : message wil be distributed in a round robin fashion between the partitions

- You specify a KEY with the message : All messages with the same key will be put into the same partition (according to the logic of the default partionner)

- You specify a KEY AND you override the partitionner logic yourself

 

From your use case, I think you are on the 1st option. Your messages do not have a key attached, so they are distributed "evenly" between the partitions.

This is not like the "subscriber" that can read directly from a partition.

 

If you really want to write inside 1 partition per flume source, then you need to determined a static key value per flume source and then attached it to each messages.

 

I don't know if this is something the sink can take care of.

I also don't know if the flume KafkaSink "override" this normal behavior of Kafka on this mater.

 

 

https://simplydistributed.wordpress.com/2016/12/13/kafka-partitioning/

Chapter "Data and Partitions: the relationship"

Re: Flume Kafka Sink specify partition to send data to

Expert Contributor
" You specify a KEY AND you override the partitionner logic yourself" actually i didn't specify a key for the message i just assigned it to certain partition, my question is about how to set the client ID and receive it in the consumer side, i found that it possible using interceptor but still i don't receive that in the event header

Re: Flume Kafka Sink specify partition to send data to

Super Collaborator
You need to specify kafka.consumer in the flume config, for consumer properties:

a1.sinks.k2.kafka.consumer.client.id = agent1
a1.sinks.k1.kafka.consumer.client.id = agent0

-pd