Support Questions

Find answers, ask questions, and share your expertise

Kafka spout not reading from beginning even if the fKakfaSpout.forceFromStart is set to true

avatar
Rising Star

Spout try to query zookeeper to get the state and set the offset to prior location. We have retention policy of three days in kafka. So I guess offset moves back to location which does not exists in kafka and its does not pull anymore data.

Already checked that in the spout config, the Zookeeper configuration is pointing to Kafka ZooKeeper.

Anything else that we're missing here? Thank you.

1 ACCEPTED SOLUTION

avatar
Master Mentor

you need to set

spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;

spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

View solution in original post

4 REPLIES 4

avatar

Kafka Consumer configs has an option called "auto.offset.reset", which can be used to decide what to do when the requested offset does not exist. Per documentation, it will:

What to do when there is no initial offset in ZooKeeper or if an offset is out of range: * smallest : automatically reset the offset to the smallest offset * largest : automatically reset the offset to the largest offset * anything else: throw exception to the consumer

Therefore, I would try setting it to "smallest".

avatar
Master Mentor

you need to set

spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;

spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

avatar

@Paul Hargis

any pointers on how to set "auto.offset.reset" in storm ?

@Artem Ervits

if I use "kafka.api.OffsetRequest.EarliestTime();" everytime I restart topology will it consume old data(I mean from beginning) ?

My use case is this:

My topology was down for over 2 weeks, but kafka has only 1 weeks data. so I am thinking that when I start the topology it will not find the offset. Should I use "kafka.api.OffsetRequest.EarliestTime();" ?

update:

tried restarting the topoloy

I am using OpaqueTridentKafkaSpout, getting storm.kafka.UpdateOffsetException (using storm 0.9.4)

checked KafkaConfig it has

public boolean useStartOffsetTimeIfOffsetOutOfRange = true;

public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();

So what I am not getting is that why it is not setting offset to earliest and failing with UpdateOffsetException instead 😞

Could you guys please help ?

avatar
Master Mentor

@Amber Kulkarni please open this as a new thread.