Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (1)

while debugging kafka producer slowness, I observed following steps Kafka producer take while producing a single record to kafka broker.

ENV: HDP 2.5

2 node kafka cluster, single producer producing a record to 'testtopic' which has 2 partitions with 2 replicas

Kafka Producer start with the configured settings it start adding matrices sensors.

update cluster metadata version which includes cluster information like broker nodes and partitions, assign version id to this cluster metadata version.

Updated cluster metadata version 1 to Cluster(nodes = [Node(-2, rkk2, 6667), Node(-1, rkk1, 6667)], partitions = [])

Set up and start Kafka producer I/O thread aka Sender thread.

Request metadata update for topic testtopic.

Producer's NetworkClient metadata request to one of the broker which consist of api_key,api_version,correlation_id and client_id.

Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-1}, body={topics=[testtopic]}), isInitiatedByNetworkClient, createdTimeMs=1482047450018, sendTimeMs=0) to node -1

In the response get metadata from cluster and update it's own copy of metadata, the response include broker information along with topic partitions, its leader and ISR.

Updated cluster metadata version 2 to Cluster(nodes = [Node(1002, rkk2.hdp.local, 6667), Node(1001, rkk1.hdp.local, 6667)], partitions = [Partition(topic = testtopic, partition = 1, leader = 1002, replicas = [1002,1001,], isr = [1002,1001,], Partition(topic = testtopic, partition = 0, leader = 1001, replicas = [1002,1001,], isr = [1001,1002,]])

producer serialized key and value sent as produce record to the leader of that partition, the partition is decided based on the default partitioner scheme if not configured.

The default partitioning strategy has following flow while deciding partition,

If a partition is specified in the record, use it

If no partition is specified but a key is present choose a partition based on a hash of the key

If no partition or key is present choose a partition in a round-robin fashion

Producer allocate memory buffer for topic configured using batch.size Producer wake up Sender thread once the buffer is full or linger.ms reached or if it is a new batch.

Sender thread create a produce request to a leader of partition like this for a produce record with a correlation_id.

Created 1 produce requests: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@11b2c43e, request=RequestSend(header={api_key=0,api_version=1,correlation_id=1,client_id=producer-1}, body={acks=1,timeout=30000,topic_data=[{topic=testtopic,data=[{partition=1,record_set=java.nio.HeapByteBuffer[pos=0 lim=76 cap=100000]}]}]}), createdTimeMs=1482047460410, sendTimeMs=0)]

once the record written successfully to brokers based on ack settings, Sender thread get the response back for correlation_id and Callback get called.

Received produce response from node 1002 with correlation id 1
3,523 Views
Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
1 of 1
Last update:
‎12-18-2016 12:29 PM
Updated by:
 
Contributors
Top Kudoed Authors