Created 02-09-2017 08:29 PM
I have a Spark consumer that reads messages from Kafka using KafkaUtils.CreateDirectStream() for a set of 100 topics. Then processes the messages and persists in the data lake. However, if my spark consumer suddenly crashes or dies and isn't up for a few hours or a day then all the millions of messages that have come in the duration in the Kafka topics are never picked up when I restart the consumer. How to get older messages without impacting performance?
Created 02-09-2017 11:00 PM
You need to enable checkpointing. It will catalog the last read kafka offset and start reading from there. Heres the code you need to accomplish that:
def createContext(sc: SparkContext, checkpointDir: String, duration: Int): StreamingContext = { val ssc = new StreamingContext(sc,Seconds(duration)) ssc.checkpoint(checkpointDir) ssc } val ssc = StreamingContext.getOrCreate(checkpointDir, createContext(sc, checkpointDir, duration)) KafkaUtils.createDirectStream(ssc,kafkaParams,topics)
Created 02-09-2017 11:00 PM
You need to enable checkpointing. It will catalog the last read kafka offset and start reading from there. Heres the code you need to accomplish that:
def createContext(sc: SparkContext, checkpointDir: String, duration: Int): StreamingContext = { val ssc = new StreamingContext(sc,Seconds(duration)) ssc.checkpoint(checkpointDir) ssc } val ssc = StreamingContext.getOrCreate(checkpointDir, createContext(sc, checkpointDir, duration)) KafkaUtils.createDirectStream(ssc,kafkaParams,topics)
Created 02-15-2017 10:27 PM
Thanks! Does it then write to the check point dir after parsing every message (i.e after processing each streaming RDD) ? Is it an HDFS path? Will the HDFS hit everytime slow down the process?
Also, If there are many kafka consumer groups then will this create a separate checkpoint dir for each consumer group?