Created 11-03-2015 11:21 PM
Spout try to query zookeeper to get the state and set the offset to prior location. We have retention policy of three days in kafka. So I guess offset moves back to location which does not exists in kafka and its does not pull anymore data.
Already checked that in the spout config, the Zookeeper configuration is pointing to Kafka ZooKeeper.
Anything else that we're missing here? Thank you.
Created 11-04-2015 04:27 PM
you need to set
spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Created 11-04-2015 01:19 AM
Kafka Consumer configs has an option called "auto.offset.reset", which can be used to decide what to do when the requested offset does not exist. Per documentation, it will:
What to do when there is no initial offset in ZooKeeper or if an offset is out of range: * smallest : automatically reset the offset to the smallest offset * largest : automatically reset the offset to the largest offset * anything else: throw exception to the consumer
Therefore, I would try setting it to "smallest".
Created 11-04-2015 04:27 PM
you need to set
spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Created 01-30-2017 06:27 AM
any pointers on how to set "auto.offset.reset" in storm ?
if I use "kafka.api.OffsetRequest.EarliestTime();" everytime I restart topology will it consume old data(I mean from beginning) ?
My use case is this:
My topology was down for over 2 weeks, but kafka has only 1 weeks data. so I am thinking that when I start the topology it will not find the offset. Should I use "kafka.api.OffsetRequest.EarliestTime();" ?
update:
tried restarting the topoloy
I am using OpaqueTridentKafkaSpout, getting storm.kafka.UpdateOffsetException (using storm 0.9.4)
checked KafkaConfig it has
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
So what I am not getting is that why it is not setting offset to earliest and failing with UpdateOffsetException instead 😞
Could you guys please help ?
Created 01-30-2017 02:09 PM
@Amber Kulkarni please open this as a new thread.