consumer = KafkaConsumer("topic", bootstrap_servers=self.kafka_server + ":" + str(self.kafka_port),
for msg in consumer:
dict = json.loads(msg.value.decode('UTF-8'))
self.lots_of_work(dict["id"]) //this can last for 5-15 minutes
I start up my consumer, and it starts working on stuff. It will process a few messages and then when it runs into a message that runs over the default 10s session_timeout_ms it blows an error stating its getting kicked out of group. If I increase the session_timeout_ms it works (until it goes past the new longer timeout). The heartbeat thread should be in its own thread started by the KafkaConsumer call is it not? Where is the heartbeat thread being started and maintained? What can I check to figure out why the heartbeat is timing out? I could completely understand increasing the max_poll_interval_ms as thats "my" thread (e.g. the lots_of_work), but I don't quite get why the session_timeout_ms would need to be changed.