<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Spark not picking older Kafka messages in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-not-picking-older-Kafka-messages/m-p/168783#M53949</link>
    <description>&lt;P&gt;I have a Spark consumer that reads messages from Kafka using KafkaUtils.CreateDirectStream() for a set of 100 topics. Then processes the messages and persists in the data lake. However, if my spark consumer suddenly crashes or dies and isn't up for a few hours or a day then all the millions of messages that have come in the duration in the Kafka topics are never picked up when I restart the consumer. How to get older messages without impacting performance?&lt;/P&gt;</description>
    <pubDate>Fri, 10 Feb 2017 04:29:29 GMT</pubDate>
    <dc:creator>excitingtimes03</dc:creator>
    <dc:date>2017-02-10T04:29:29Z</dc:date>
    <item>
      <title>Spark not picking older Kafka messages</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-not-picking-older-Kafka-messages/m-p/168783#M53949</link>
      <description>&lt;P&gt;I have a Spark consumer that reads messages from Kafka using KafkaUtils.CreateDirectStream() for a set of 100 topics. Then processes the messages and persists in the data lake. However, if my spark consumer suddenly crashes or dies and isn't up for a few hours or a day then all the millions of messages that have come in the duration in the Kafka topics are never picked up when I restart the consumer. How to get older messages without impacting performance?&lt;/P&gt;</description>
      <pubDate>Fri, 10 Feb 2017 04:29:29 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-not-picking-older-Kafka-messages/m-p/168783#M53949</guid>
      <dc:creator>excitingtimes03</dc:creator>
      <dc:date>2017-02-10T04:29:29Z</dc:date>
    </item>
    <item>
      <title>Re: Spark not picking older Kafka messages</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-not-picking-older-Kafka-messages/m-p/168784#M53950</link>
      <description>&lt;P&gt;You need to enable checkpointing.  It will catalog the last read kafka offset and start reading from there.  Heres the code you need to accomplish that:&lt;/P&gt;&lt;PRE&gt;def createContext(sc: SparkContext, checkpointDir: String, duration: Int): StreamingContext = {
  val ssc = new StreamingContext(sc,Seconds(duration))
  ssc.checkpoint(checkpointDir)
  ssc
}

val ssc = StreamingContext.getOrCreate(checkpointDir, createContext(sc, checkpointDir, duration))
KafkaUtils.createDirectStream(ssc,kafkaParams,topics)
&lt;/PRE&gt;</description>
      <pubDate>Fri, 10 Feb 2017 07:00:18 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-not-picking-older-Kafka-messages/m-p/168784#M53950</guid>
      <dc:creator>jwiden</dc:creator>
      <dc:date>2017-02-10T07:00:18Z</dc:date>
    </item>
    <item>
      <title>Re: Spark not picking older Kafka messages</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-not-picking-older-Kafka-messages/m-p/168785#M53951</link>
      <description>&lt;P&gt;Thanks! Does it then write to the check point dir after parsing every message (i.e after processing each streaming RDD) ? Is it an HDFS path? Will the HDFS hit everytime slow down the process? &lt;/P&gt;&lt;P&gt;Also, If there are many kafka consumer groups then will this create a separate checkpoint dir for each consumer group?&lt;/P&gt;</description>
      <pubDate>Thu, 16 Feb 2017 06:27:25 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-not-picking-older-Kafka-messages/m-p/168785#M53951</guid>
      <dc:creator>excitingtimes03</dc:creator>
      <dc:date>2017-02-16T06:27:25Z</dc:date>
    </item>
  </channel>
</rss>

