Support Questions

Find answers, ask questions, and share your expertise

Kafka consumer group lag in one or two partition even without data ingestion into kafka

avatar
Explorer

Hi,

 

I have a strange problem at kafka channel topic like kafka consumer group lag( 15 lacs events) in one or two partition only.I'll give little background aboout problem:

 

Please find the data flow into system as shown below:

data ingestion ==> kafka  ABC(topic of 3 parition) ==> flume source (interceptor ) ==> Kafka DEF(topic of 6 partition ) ==> Hive (parquet file)

 

This is three node edge nodes ( kafka, flume) are running in all nodes. Meanwhile we have separate cluster where data lake is running ( HDFC,Hive, HBase, Cloudera manager,Phoenix client)

component versions:

Kafka ==> 0.10.1.1

Flume ==> 1.7

CDH ==> 5.14

 

 

We stopped all flume agent an hour or two hours and started data ingestion around 12lacs per min into kafka ABC topics. we stopped data ingestions into system and started all flume agents (in three nodes). Then data is comming into hive tables. after one or two mins. we observed either one or two or three paritions (assume 1,4 ,6) at DEF topic is showing as LAG is increasing constantly compare with flume source.But strangely kafka ABC (three partition) is working fine and lag is showing equally. 

 

Here those three paritions lag is increased without cleared at DEF topics and remaining parition is showing lag is zero. Kafka ABC topic data is cleared and but not at DEF topics of 1,4 and 6 partitions even though data is not coming into system. It did not cleared in next a day or two days also. We restarted kafka and flume couple of times. But sometimes one or two paritions lag is going to cleared, but not all paritions. 

 

Please find flume configurations:

------------------------------------------

agent.sources = \
flume-agent \

# Channels
agent.channels = \
kafkachannel \

# Sinks
agent.sinks = \
kite-sink-1 kite-sink-2 \

 

# Sources
agent.sources.flume-agent.channels = kafkachannel
agent.sources.flume-agent.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.flume-agent.kafka.bootstrap.servers = example1.host.com:9092,example2.host.com:9092,example3.host.com:9092
agent.sources.flume-agent.kafka.num.consumer.fetchers = 10
agent.sources.flume-agent.kafka.topics = abc
agent.sources.flume-agent.interceptors = abc-interceptor abcinterceptor
agent.sources.flume-agent.interceptors.abc-parameters-interceptor.type = static
agent.sources.flume-agent.interceptors.abcinterceptor.type = com.abc.flume.interceptor.ABCinterceptor$Builder
agent.sources.flume-agent.interceptors.abc-parameters-interceptor.key = flume.avro.schema.url
agent.sources.flume-agent.interceptors.abc-parameters-interceptor.value = hdfs://myhadoop/abc.avsc
agent.sources.flume-agent.interceptors.abc-parameters-interceptor.threadNum = 10
agent.sources.flume-agent.kafka.consumer.security.protocol = PLAINTEXT
agent.sources.flume-agent.kafka.consumer.group.id = my-group

# Sinks
agent.sinks.kite-sink-1.channel = kafkachannel
agent.sinks.kite-sink-1.type = org.apache.flume.sink.kite.DatasetSink
agent.sinks.kite-sink-1.kite.repo.uri = repo:hive
agent.sinks.kite-sink-1.kite.dataset.name = abc
agent.sinks.kite-sink-1.kite.batchSize = 100000
agent.sinks.kite-sink-1.kite.rollInterval = 30
agent.sinks.kite-sink-2.channel = kafkachannel
agent.sinks.kite-sink-2.type = org.apache.flume.sink.kite.DatasetSink
agent.sinks.kite-sink-2.kite.repo.uri = repo:hive
agent.sinks.kite-sink-2.kite.dataset.name = abc
agent.sinks.kite-sink-2.kite.batchSize = 100000
agent.sinks.kite-sink-2.kite.rollInterval = 30


# Channels
agent.channels.kafkachannel.type = org.apache.flume.channel.kafka.KafkaChannel
agent.channels.kafkachannel.brokerList = example1.host.com:9092,example2.host.com:9092,example3.host.com:9092
agent.channels.kafkachannel.kafka.topic = def
agent.channels.kafkachannel.kafka.consumer.group.id = my-group-kite
agent.channels.kafkachannel.parseAsFlumeEvent = true
agent.channels.kafkachannel.kafka.consumer.session.timeout.ms =60000
agent.channels.kafkachannel.kafka.consumer.request.timeout.ms=70000
agent.channels.kafkachannel.kafka.consumer.max.poll.records=100000
agent.channels.kafkachannel.kafka.num.consumer.fetchers = 10

 

 

kafka server.properties modified than defaults:

 

zookeeper.session.timeout.ms=9000

group.max.session.timeout.ms=60000

group.min.session.timeout.ms=6000

inter.broker.protocol.version=0.10.1.1

 

Kindly help me. What was wrong in my configuration.

 

 

2 ACCEPTED SOLUTIONS

avatar
Explorer

This is a bug in flume 1.7 version (https://issues.apache.org/jira/browse/FLUME-3027).This issue is resolved with adding code offsets.clear() in below method after consumer.commitSync method otherwise it will create problem when consumer rebalancing time. 

 

void commitOffsets() {
      this.consumer.commitSync(offsets);
    }

Either we have to upgrade to 1.8 of flume or adding offsets.clear() code end of the method.

 

Thanks

Srinivas

View solution in original post

avatar

FLUME-3027 has been backported to CDH5.11.0 and above, so if you are able to upgrade, it would prevent the issue of offsets bouncing back and forward.

One thing you may want to consider, if you are getting rebalances, it may be because it is taking too long to deliver by your sink, before polling kafka again. You may want to consider lowering your sink batch size in order to deliver and ack the messages in a timely fashion.

Additionally, if you upgrade to CDH5.14 or higher, the flume kafka client is 0.10.2, and you would be able to set max.poll.records to match the batchSize you are using for the flume sink. Additionally, you could increase the max.poll.interval.ms, which is decoupled from the session.timeout.ms in 0.10.0 and above. This would prevent the rebalancing from occurring since the client would still heartbeat without having to do a poll to pull more records before session.timeout.ms expires.

-pd

View solution in original post

4 REPLIES 4

avatar
Explorer

I did not observe any error message in kafka or flume logs.But, i saw below messages in flume logs

 

2018-08-24 04:35:47 DEBUG Fetcher:677 - Discarding stale fetch response for partition def-3 since its offset 395690 does not match the expected offset 394118
2018-08-24 04:35:47 DEBUG Fetcher:677 - Discarding stale fetch response for partition def-0 since its offset 81880195 does not match the expected offset 81878380
2018-08-24 04:35:47 DEBUG Fetcher:671 - Ignoring fetched records for partition def-4 since it is no longer fetchable

2018-08-24 04:35:47 DEBUG Fetcher:671 - Ignoring fetched records for partition def-5 since it is no longer fetchable
2018-08-24 04:35:47 DEBUG Fetcher:671 - Ignoring fetched records for partition def-1 since it is no longer fetchable

2018-08-24 04:35:47 DEBUG Fetcher:677 - Discarding stale fetch response for partition def-3 since its offset 395690 does not match the expected offset 394118
2018-08-24 04:35:47 DEBUG Fetcher:677 - Discarding stale fetch response for partition def-0 since its offset 81880195 does not match the expected offset 81878380
2018-08-24 06:11:03 DEBUG Fetcher:677 - Discarding stale fetch response for partition def-4 since its offset 482480 does not match the expected offset 482450

 

No error or exceptions apart from above messages.

 

Sometimes, total data is reached at hive tables.But,lag will be remain same; why flume sink consumed offset is not updating at group coordinator level?

avatar
New Contributor

 

I observed strange behaviour ingested data around 50 records into sytem when we have data lag at Kafka channel topic. Strangely lag data is flushed but remaining partition reset current-offset to previous offset even though no data in those particular partitions. How it is possible reset consumer current-offset when we are using as flume sinks. please see offset reset to earlier below screenshot. 

when_initially_lag.PNGoffset_reset_after_ingest_50records.PNG

When usually current offset is going to reset previous/earlier position even though without any restart of flume component (i.e.consumer).

avatar
Explorer

This is a bug in flume 1.7 version (https://issues.apache.org/jira/browse/FLUME-3027).This issue is resolved with adding code offsets.clear() in below method after consumer.commitSync method otherwise it will create problem when consumer rebalancing time. 

 

void commitOffsets() {
      this.consumer.commitSync(offsets);
    }

Either we have to upgrade to 1.8 of flume or adding offsets.clear() code end of the method.

 

Thanks

Srinivas

avatar

FLUME-3027 has been backported to CDH5.11.0 and above, so if you are able to upgrade, it would prevent the issue of offsets bouncing back and forward.

One thing you may want to consider, if you are getting rebalances, it may be because it is taking too long to deliver by your sink, before polling kafka again. You may want to consider lowering your sink batch size in order to deliver and ack the messages in a timely fashion.

Additionally, if you upgrade to CDH5.14 or higher, the flume kafka client is 0.10.2, and you would be able to set max.poll.records to match the batchSize you are using for the flume sink. Additionally, you could increase the max.poll.interval.ms, which is decoupled from the session.timeout.ms in 0.10.0 and above. This would prevent the rebalancing from occurring since the client would still heartbeat without having to do a poll to pull more records before session.timeout.ms expires.

-pd