Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

How can i consume kafka offsets based on timestamp?

avatar
 
1 ACCEPTED SOLUTION

avatar
Super Guru

if you commit the offset based on timestamp you can start consuming from kafka at next batch cycle

like this

--commit last consumed consumer = KafkaConsumer(bootstrap_servers='localhost:9092')

tp = TopicPartition(topic, partition) consumer.seek(tp, end) consumer.commit() --now start consuming offset from Kafka when the job restarts at the next batch cycle: consumer.assign([tp])

start = consumer.committed(tp)

View solution in original post

1 REPLY 1

avatar
Super Guru

if you commit the offset based on timestamp you can start consuming from kafka at next batch cycle

like this

--commit last consumed consumer = KafkaConsumer(bootstrap_servers='localhost:9092')

tp = TopicPartition(topic, partition) consumer.seek(tp, end) consumer.commit() --now start consuming offset from Kafka when the job restarts at the next batch cycle: consumer.assign([tp])

start = consumer.committed(tp)