Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark not picking older Kafka messages

avatar

I have a Spark consumer that reads messages from Kafka using KafkaUtils.CreateDirectStream() for a set of 100 topics. Then processes the messages and persists in the data lake. However, if my spark consumer suddenly crashes or dies and isn't up for a few hours or a day then all the millions of messages that have come in the duration in the Kafka topics are never picked up when I restart the consumer. How to get older messages without impacting performance?

1 ACCEPTED SOLUTION

avatar
Super Collaborator

You need to enable checkpointing. It will catalog the last read kafka offset and start reading from there. Heres the code you need to accomplish that:

def createContext(sc: SparkContext, checkpointDir: String, duration: Int): StreamingContext = {
  val ssc = new StreamingContext(sc,Seconds(duration))
  ssc.checkpoint(checkpointDir)
  ssc
}

val ssc = StreamingContext.getOrCreate(checkpointDir, createContext(sc, checkpointDir, duration))
KafkaUtils.createDirectStream(ssc,kafkaParams,topics)

View solution in original post

2 REPLIES 2

avatar
Super Collaborator

You need to enable checkpointing. It will catalog the last read kafka offset and start reading from there. Heres the code you need to accomplish that:

def createContext(sc: SparkContext, checkpointDir: String, duration: Int): StreamingContext = {
  val ssc = new StreamingContext(sc,Seconds(duration))
  ssc.checkpoint(checkpointDir)
  ssc
}

val ssc = StreamingContext.getOrCreate(checkpointDir, createContext(sc, checkpointDir, duration))
KafkaUtils.createDirectStream(ssc,kafkaParams,topics)

avatar

Thanks! Does it then write to the check point dir after parsing every message (i.e after processing each streaming RDD) ? Is it an HDFS path? Will the HDFS hit everytime slow down the process?

Also, If there are many kafka consumer groups then will this create a separate checkpoint dir for each consumer group?