while debugging kafka producer slowness, I observed following steps Kafka producer take while producing a single record to kafka broker.
ENV:HDP 2.5
2 node kafka cluster, single producer producing a record to 'testtopic' which has 2 partitions with 2 replicas
Kafka Producer start with the configured settings it start adding matrices sensors.
update cluster metadata version which includes cluster information like broker nodes and partitions, assign version id to this cluster metadata version.
Updated cluster metadata version 1 to Cluster(nodes = [Node(-2, rkk2, 6667), Node(-1, rkk1, 6667)], partitions = [])
Set up and start Kafka producer I/O thread aka Sender thread.
Request metadata update for topic testtopic.
Producer's NetworkClient metadata request to one of the broker which consist of api_key,api_version,correlation_id and client_id.
In the response get metadata from cluster and update it's own copy of metadata, the response include broker information along with topic partitions, its leader and ISR.
producer serialized key and value sent as produce record to the leader of that partition, the partition is decided based on the default partitioner scheme if not configured.
The default partitioning strategy has following flow while deciding partition,
If a partition is specified in the record, use it
If no partition is specified but a key is present choose a partition based on a hash of the key
If no partition or key is present choose a partition in a round-robin fashion
Producer allocate memory buffer for topic configured using batch.size Producer wake up Sender thread once the buffer is full or linger.ms reached or if it is a new batch.
Sender thread create a produce request to a leader of partition like this for a produce record with a correlation_id.
Created 1 produce requests: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@11b2c43e, request=RequestSend(header={api_key=0,api_version=1,correlation_id=1,client_id=producer-1}, body={acks=1,timeout=30000,topic_data=[{topic=testtopic,data=[{partition=1,record_set=java.nio.HeapByteBuffer[pos=0 lim=76 cap=100000]}]}]}), createdTimeMs=1482047460410, sendTimeMs=0)]
once the record written successfully to brokers based on ack settings, Sender thread get the response back for correlation_id and Callback get called.
Received produce response from node 1002 with correlation id 1