Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to handle kafka.common.OffsetOutOfRangeException in Storm ?

avatar

I am getting following error with Storm topology failing to read the kafka as the data from kafka topic as data gets deleted based on retention(Size). Ideally we want to have storm to move (fast forward) to the first available message. Is there anyway to do this in Storm ?

[2015-10-13 14:12:28,204] ERROR [KafkaApi-2] Error when processing fetch request for partition [<topic_name>,0] offset 4231749539 from consumer with correlation id 0 (kafka.server.KafkaApis) kafka.common.OffsetOutOfRangeException: Request for offset 4231749539 but we only have log segments in the range 4255773954 to 4376049622. at kafka.log.Log.read(Log.scala:380)

1 ACCEPTED SOLUTION

avatar

Make sure you set the following config in kafkaspout's Spoutconfig

spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();

https://github.com/apache/storm/tree/master/external/storm-kafka

Apart from that

1. Make sure you log.retention.hours is long enough to retain topic data

2. Check kafka topic offsets

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hostname:6667 --topic topic_name --time -1

the above command will give you latest offset into kafka topic and now you need to check if storm kafkaspout is catching up.

2.1 login into zookeeper shell

2.2 ls /zkroot/id (zkroot is the one configured in spoutconfig and id from spoutconfig) as well

2.3 get /zkroot/id/topic_name/part_0 will give you a json structure with key "offset" this will tell you how far you read into topic and also how far you are behind reading the latest data.

If its too far apart and if log.retention.hours hit you kafkaspout might be requesting for older offset which might have been deleted.

View solution in original post

1 REPLY 1

avatar

Make sure you set the following config in kafkaspout's Spoutconfig

spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();

https://github.com/apache/storm/tree/master/external/storm-kafka

Apart from that

1. Make sure you log.retention.hours is long enough to retain topic data

2. Check kafka topic offsets

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hostname:6667 --topic topic_name --time -1

the above command will give you latest offset into kafka topic and now you need to check if storm kafkaspout is catching up.

2.1 login into zookeeper shell

2.2 ls /zkroot/id (zkroot is the one configured in spoutconfig and id from spoutconfig) as well

2.3 get /zkroot/id/topic_name/part_0 will give you a json structure with key "offset" this will tell you how far you read into topic and also how far you are behind reading the latest data.

If its too far apart and if log.retention.hours hit you kafkaspout might be requesting for older offset which might have been deleted.