Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Super Guru

Env : HDP 2.5

2 node kafka cluster having topic name 'testtopic' with partition set as 2 and replication set as 2.

I am running two consumer with consumer id 'test'.

1. what happen when Consumer start fresh

Consumer NetworkClient will request metadata <- return cluster information

2016-12-17 23:21:05 DEBUG clients.NetworkClient:619 - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=1,client_id=consumer-1}, body={topics=[testtopic]}), isInitiatedByNetworkClient, createdTimeMs=1481997065894, sendTimeMs=0) to node -2 2016-12-17 23:21:06 DEBUG clients.Metadata:172 - Updated cluster metadata version 2 to Cluster(nodes = [Node(1002, rkk2.hdp.local, 6667), Node(1001, rkk1.hdp.local, 6667)], partitions = [Partition(topic = testtopic, partition = 1, leader = 1002, replicas = [1002,1001,], isr = [1002,1001,], Partition(topic = testtopic, partition = 0, leader = 1001, replicas = [1002,1001,], isr = [1001,1002,]]) 

Sends a GroupMetadata request to one of the brokers

2016-12-17 23:21:06 DEBUG internals.AbstractCoordinator:471 - Issuing group metadata request to broker 1001 

as a response get the current coordinator

2016-12-17 23:21:11 DEBUG internals.AbstractCoordinator:484 - Group metadata response ClientResponse(receivedTimeMs=1481997071648, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@31df58ee, request=RequestSend(header={api_key=10,api_version=0,correlation_id=2,client_id=consumer-1}, body={group_id=test}), createdTimeMs=1481997066150, sendTimeMs=1481997071389), responseBody={error_code=0,coordinator={node_id=1002,host=rkk2.hdp.local,port=6667}}) 

node_id=1002 is designated as a coordinator now start sending JOIN_GROUP request to coordinator

2016-12-17 23:21:16 DEBUG internals.AbstractCoordinator:324 - Issuing request (JOIN_GROUP: {group_id=test,session_timeout=10000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=21 cap=21]}]}) to coordinator 2147482645 2016-12-17 23:21:19 DEBUG internals.AbstractCoordinator:342 - Joined group: {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-b53995e2-ac0b-43dd-a9a3-1f19f03d679c,member_id=consumer-1-b53995e2-ac0b-43dd-a9a3-1f19f03d679c,members=[{member_id=consumer-1-b53995e2-ac0b-43dd-a9a3-1f19f03d679c,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=21 cap=21]}]} 

the first consume who join the group will become a group leader with some leader id, in our case this is leader_id=consumer-1-b53995e2-ac0b-43dd-a9a3-1f19f03d679c, thing to notice here is that leader id and and member id is same here because this is the only consumer at this point.

the leader knows about all the consumer through group coordinator(group coordinator will know all the consumer through the heartbeat mechanism of consumer handled in consumer.poll), after getting the list of all the consumer leader start partition assignment based on the pre configured policy which is by default Range partitioning(refer kafka.consumer.RangeAssignor to understand how it do assignment)

leader consumer do partition assignment

2016-12-17 23:21:19 DEBUG internals.ConsumerCoordinator:219 - Performing range assignment for subscriptions {consumer-1-b53995e2-ac0b-43dd-a9a3-1f19f03d679c=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Subscription@75bbb4f4} 2016-12-17 23:21:19 DEBUG internals.ConsumerCoordinator:223 - Finished assignment: {consumer-1-b53995e2-ac0b-43dd-a9a3-1f19f03d679c=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Assignment@2919c2af} 

after assignment it sends the assignmet back to the coordinator which will send the respective partition to the other consumer in the group. consumer can see only the partition assign to them only

2016-12-17 23:21:19 DEBUG internals.AbstractCoordinator:403 - Issuing leader SyncGroup (SYNC_GROUP: {group_id=test,generation_id=1,member_id=consumer-1-b53995e2-ac0b-43dd-a9a3-1f19f03d679c,group_assignment=[{member_id=consumer-1-b53995e2-ac0b-43dd-a9a3-1f19f03d679c,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]}]}) to coordinator 2147482645 2016-12-17 23:21:20 DEBUG internals.AbstractCoordinator:429 - Received successful sync group response for group test: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]} 

now consumer will start fetching from the respective partitions and do normal heartbeat process.this is the first consumer so all the partition of topic is assigned to it(testtopic-1, testtopic-0)

2016-12-17 23:21:20 DEBUG internals.ConsumerCoordinator:185 - Setting newly assigned partitions [testtopic-1, testtopic-0] 2016-12-17 23:21:20 DEBUG internals.ConsumerCoordinator:575 - Fetching committed offsets for partitions: [testtopic-1, testtopic-0] 

2. now lets start the second consumer and see how it behave send metadata request

2016-12-17 23:22:10 DEBUG clients.NetworkClient:619 - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=1,client_id=consumer-1}, body={topics=[testtopic]}), isInitiatedByNetworkClient, createdTimeMs=1481997130251, sendTimeMs=0) to node -1 2016-12-17 23:22:10 DEBUG clients.Metadata:172 - Updated cluster metadata version 2 to Cluster(nodes = [Node(1002, rkk2.hdp.local, 6667), Node(1001, rkk1.hdp.local, 6667)], partitions = [Partition(topic = testtopic, partition = 1, leader = 1002, replicas = [1002,1001,], isr = [1002,1001,], Partition(topic = testtopic, partition = 0, leader = 1001, replicas = [1002,1001,], isr = [1001,1002,]]) 2016-12-17 23:22:10 DEBUG internals.AbstractCoordinator:471 - Issuing group metadata request to broker 1001 will know and connect to coordinator 016-12-17 23:22:16 DEBUG internals.AbstractCoordinator:484 - Group metadata response ClientResponse(receivedTimeMs=1481997135999, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@31df58ee, request=RequestSend(header={api_key=10,api_version=0,correlation_id=2,client_id=consumer-1}, body={group_id=test}), createdTimeMs=1481997130517, sendTimeMs=1481997135760), responseBody={error_code=0,coordinator={node_id=1002,host=rkk2.hdp.local,port=6667}}) 2016-12-17 23:22:16 DEBUG clients.NetworkClient:487 - Initiating connection to node 2147482645 at rkk2.hdp.local:6667. 

revoke previously assigned partition, in this case it is none

2016-12-17 23:22:21 DEBUG internals.ConsumerCoordinator:241 - Revoking previously assigned partitions [] join group request 2016-12-17 23:22:21 DEBUG internals.AbstractCoordinator:324 - Issuing request (JOIN_GROUP: {group_id=test,session_timeout=10000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=21 cap=21]}]}) to coordinator 2147482645 

Join group as follower (notice using follower SyncGroup with some member_id and leader_id is the same of as first consumer)

2016-12-17 23:22:23 DEBUG internals.AbstractCoordinator:342 - Joined group: {error_code=0,generation_id=2,group_protocol=range,leader_id=consumer-1-b53995e2-ac0b-43dd-a9a3-1f19f03d679c,member_id=consumer-1-fd31194d-469c-4d9d-a66e-09ee2db44645,members=[]} 2016-12-17 23:22:23 DEBUG internals.AbstractCoordinator:392 - Issuing follower SyncGroup (SYNC_GROUP: {group_id=test,generation_id=2,member_id=consumer-1-fd31194d-469c-4d9d-a66e-09ee2db44645,group_assignment=[]}) to coordinator 2147482645 2016-12-17 23:22:24 DEBUG internals.AbstractCoordinator:429 - Received successful sync group response for group test: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=29 cap=29]} 

After sync a new partition (testtopic-1) is assigned to this consumer

 2016-12-17 23:22:24 DEBUG internals.ConsumerCoordinator:185 - Setting newly assigned partitions [testtopic-1] 2016-12-17 23:22:24 DEBUG internals.ConsumerCoordinator:575 - Fetching committed offsets for partitions: [testtopic-1]
26,119 Views
Comments

Very well explained @Rajkumar Singh !!

avatar
Expert Contributor

Nice and very useful Article @Rajkumar Singh ..