if you commit the offset based on timestamp you can start consuming from kafka at next batch cycle
like this
--commit last consumed
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
tp = TopicPartition(topic, partition)
consumer.seek(tp, end)
consumer.commit()
--now start consuming offset from Kafka when the job restarts at the next batch cycle:
consumer.assign([tp])
start = consumer.committed(tp)