Created 09-22-2018 12:38 PM
I am trying to consume from the __consumer_offsets topics to obtain the offsets for each consumer group from a topic to use to calculate total lag. I was told to obtain the logendoffset from the jvm using jmx and the different between this value and offset would give me the total lag. I am having difficulty implementing this. I keep getting an error trying to read the consum_offsets topic and in order to decode the content I opted for a different route as the normal consumer I have does not decode the content. I am a little unfamiliar with formatting on this site as each time I include a code box, I cannot then post normal text beneath it, so please excuse the format here:
Current code tried and error received:
try { ByteBuffer bb = ByteBuffer.allocate(1056); while (true) { ConsumerRecords<byte[], byte[]> records = kconsumer.poll(100); for (ConsumerRecord<byte[], byte[]> record : records) if(record != null){ counter += 1; bb = ByteBuffer.wrap(record.key()); short version = bb.getShort(); String group = (String)Type.STRING.read(bb); String topic = (String)Type.STRING.read(bb); int partition = bb.getInt(); long offset = (long)(record.offset());
The error occurs on the following line: String topic = (String)Type.STRING.read(bb);
java.nio.BufferUnderflowException at java.nio.Buffer.nextGetIndex(Unknown Source) at java.nio.HeapByteBuffer.getShort(Unknown Source) at org.apache.kafka.common.protocol.types.Type$7.read(Type.java:269) at org.apache.kafka.common.protocol.types.Type$7.read(Type.java:257)
Can anyone suggest an easier way to go about this? I found a couple tools created by others but none were compatible with the latest kafka version. I am looking into burrow now but would ideally be expected to create my own.
Created 09-24-2018 06:35 PM
Hi @Zach, please see my answer on StackOverflow here. https://stackoverflow.com/a/52266219/2308683
Burrow does essentially the same thing, but in Golang
How you read the data and perform the lag calculations also depends on what is currently being consumed, however, which is not being stored immediately within the offsets topic.
Created 10-01-2018 10:08 PM
Also consider using AdminClient to query consumer progress rather than consuming from the internal topic.