- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Kafka spout not reading from beginning even if the fKakfaSpout.forceFromStart is set to true
- Labels:
-
Apache Kafka
Created ‎11-03-2015 11:21 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Spout try to query zookeeper to get the state and set the offset to prior location. We have retention policy of three days in kafka. So I guess offset moves back to location which does not exists in kafka and its does not pull anymore data.
Already checked that in the spout config, the Zookeeper configuration is pointing to Kafka ZooKeeper.
Anything else that we're missing here? Thank you.
Created ‎11-04-2015 04:27 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
you need to set
spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Created ‎11-04-2015 01:19 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Kafka Consumer configs has an option called "auto.offset.reset", which can be used to decide what to do when the requested offset does not exist. Per documentation, it will:
What to do when there is no initial offset in ZooKeeper or if an offset is out of range: * smallest : automatically reset the offset to the smallest offset * largest : automatically reset the offset to the largest offset * anything else: throw exception to the consumer
Therefore, I would try setting it to "smallest".
Created ‎11-04-2015 04:27 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
you need to set
spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Created ‎01-30-2017 06:27 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
any pointers on how to set "auto.offset.reset" in storm ?
if I use "kafka.api.OffsetRequest.EarliestTime();" everytime I restart topology will it consume old data(I mean from beginning) ?
My use case is this:
My topology was down for over 2 weeks, but kafka has only 1 weeks data. so I am thinking that when I start the topology it will not find the offset. Should I use "kafka.api.OffsetRequest.EarliestTime();" ?
update:
tried restarting the topoloy
I am using OpaqueTridentKafkaSpout, getting storm.kafka.UpdateOffsetException (using storm 0.9.4)
checked KafkaConfig it has
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
So what I am not getting is that why it is not setting offset to earliest and failing with UpdateOffsetException instead 😞
Could you guys please help ?
Created ‎01-30-2017 02:09 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Amber Kulkarni please open this as a new thread.
