Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark not picking older Kafka messages

Solved Go to solution

Spark not picking older Kafka messages

I have a Spark consumer that reads messages from Kafka using KafkaUtils.CreateDirectStream() for a set of 100 topics. Then processes the messages and persists in the data lake. However, if my spark consumer suddenly crashes or dies and isn't up for a few hours or a day then all the millions of messages that have come in the duration in the Kafka topics are never picked up when I restart the consumer. How to get older messages without impacting performance?

1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: Spark not picking older Kafka messages

Expert Contributor

You need to enable checkpointing. It will catalog the last read kafka offset and start reading from there. Heres the code you need to accomplish that:

def createContext(sc: SparkContext, checkpointDir: String, duration: Int): StreamingContext = {
  val ssc = new StreamingContext(sc,Seconds(duration))
  ssc.checkpoint(checkpointDir)
  ssc
}

val ssc = StreamingContext.getOrCreate(checkpointDir, createContext(sc, checkpointDir, duration))
KafkaUtils.createDirectStream(ssc,kafkaParams,topics)

View solution in original post

2 REPLIES 2
Highlighted

Re: Spark not picking older Kafka messages

Expert Contributor

You need to enable checkpointing. It will catalog the last read kafka offset and start reading from there. Heres the code you need to accomplish that:

def createContext(sc: SparkContext, checkpointDir: String, duration: Int): StreamingContext = {
  val ssc = new StreamingContext(sc,Seconds(duration))
  ssc.checkpoint(checkpointDir)
  ssc
}

val ssc = StreamingContext.getOrCreate(checkpointDir, createContext(sc, checkpointDir, duration))
KafkaUtils.createDirectStream(ssc,kafkaParams,topics)

View solution in original post

Highlighted

Re: Spark not picking older Kafka messages

Thanks! Does it then write to the check point dir after parsing every message (i.e after processing each streaming RDD) ? Is it an HDFS path? Will the HDFS hit everytime slow down the process?

Also, If there are many kafka consumer groups then will this create a separate checkpoint dir for each consumer group?

Don't have an account?
Coming from Hortonworks? Activate your account here