Support Questions

Find answers, ask questions, and share your expertise

How can i consume kafka offsets based on timestamp?

avatar
Contributor
 
1 ACCEPTED SOLUTION

avatar
Super Guru

if you commit the offset based on timestamp you can start consuming from kafka at next batch cycle

like this

--commit last consumed consumer = KafkaConsumer(bootstrap_servers='localhost:9092')

tp = TopicPartition(topic, partition) consumer.seek(tp, end) consumer.commit() --now start consuming offset from Kafka when the job restarts at the next batch cycle: consumer.assign([tp])

start = consumer.committed(tp)

View solution in original post

1 REPLY 1

avatar
Super Guru

if you commit the offset based on timestamp you can start consuming from kafka at next batch cycle

like this

--commit last consumed consumer = KafkaConsumer(bootstrap_servers='localhost:9092')

tp = TopicPartition(topic, partition) consumer.seek(tp, end) consumer.commit() --now start consuming offset from Kafka when the job restarts at the next batch cycle: consumer.assign([tp])

start = consumer.committed(tp)