Support Questions
Find answers, ask questions, and share your expertise

consuming from __consumer_offsets with java to retrieve topic offsets & calculate lag


I am trying to consume from the __consumer_offsets topics to obtain the offsets for each consumer group from a topic to use to calculate total lag. I was told to obtain the logendoffset from the jvm using jmx and the different between this value and offset would give me the total lag. I am having difficulty implementing this. I keep getting an error trying to read the consum_offsets topic and in order to decode the content I opted for a different route as the normal consumer I have does not decode the content. I am a little unfamiliar with formatting on this site as each time I include a code box, I cannot then post normal text beneath it, so please excuse the format here:

Current code tried and error received:

            try {
		ByteBuffer bb = ByteBuffer.allocate(1056);
                while (true) {
	          ConsumerRecords<byte[], byte[]> records = kconsumer.poll(100);
	            for (ConsumerRecord<byte[], byte[]> record : records)
	            	if(record != null){
	            		counter += 1;
	            		bb = ByteBuffer.wrap(record.key());
	            		short version = bb.getShort();
	            		String group = (String);
	        	    	String topic = (String);
	        	    	int partition = bb.getInt();
	        	    	long offset = (long)(record.offset());

The error occurs on the following line: String topic = (String);

	at java.nio.Buffer.nextGetIndex(Unknown Source)
	at java.nio.HeapByteBuffer.getShort(Unknown Source)
	at org.apache.kafka.common.protocol.types.Type$
	at org.apache.kafka.common.protocol.types.Type$

Can anyone suggest an easier way to go about this? I found a couple tools created by others but none were compatible with the latest kafka version. I am looking into burrow now but would ideally be expected to create my own.


Super Collaborator

Hi @Zach, please see my answer on StackOverflow here.

Burrow does essentially the same thing, but in Golang

How you read the data and perform the lag calculations also depends on what is currently being consumed, however, which is not being stored immediately within the offsets topic.

Cloudera Employee

Also consider using AdminClient to query consumer progress rather than consuming from the internal topic.