<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Failing Checkpoint Spark Streaming in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Failing-Checkpoint-Spark-Streaming/m-p/233570#M67334</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;My program runs fine without checkpoint and when I modified the program to make it fault tolerant , I get the error as attached in the file&lt;/P&gt;&lt;P&gt;The program runs fine when it starts fresh...but if it comes from a checkpoint it fails...not sure where I am i doing wrong. Any help will be appreciated.&lt;/P&gt;&lt;P&gt;&lt;A href="https://community.cloudera.com/legacyfs/online/attachments/38431-error.txt"&gt;error.txt&lt;/A&gt;&lt;/P&gt;&lt;PRE&gt;package ca.twitter2
import org.apache.kafka.clients._
import org.apache.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import java.util.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}


object NGINXLogProcessingWindowedwithcheckpointappl {    
     case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method: String, serverip2: String, responsetime: String, operation: String, application: String)
    val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
   
   def creatingFunc(): StreamingContext = {
   println("Creating new context")
   val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
   
  .setMaster("local")
   val ssc = new StreamingContext(sparkConf, Seconds(120))
  
	 ssc.checkpoint(checkpointDir)
	 sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
	 val spark = SparkSession
                   .builder()                  
                   .getOrCreate()
                   
       val topics = List("REST").toSet
       // Logger.getLogger("org").setLevel(Level.ERROR)
        //Logger.getLogger("akka").setLevel(Level.ERROR)
       val kafkaParams = Map[String, Object](
                         "bootstrap.servers" -&amp;gt; "10.24.18.36:6667",
                          //"bootstrap.servers" -&amp;gt; "10.71.52.119:9092",
          // "bootstrap.servers" -&amp;gt; "192.168.123.36:6667",
                         "group.id" -&amp;gt; "2",
                          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -&amp;gt;"org.apache.kafka.common.serialization.StringDeserializer",
                          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -&amp;gt; "org.apache.kafka.common.serialization.StringDeserializer",
                          "auto.offset.reset" -&amp;gt; "latest",
                          "enable.auto.commit" -&amp;gt; (false: java.lang.Boolean)
                                             )
      val WINDOW_LENGTH = Seconds(43200)
      val SLIDE_INTERVAL = Seconds(120)
               // Create the direct stream with the Kafka parameters and topics
      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      val  kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
      val lines = kafkaStream.map(_.value()).repartition(4)
      val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
      val lines2= lineswindowed.map(_.split(","))
      val lines4slide= lines2.map(p =&amp;gt; AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, p(7).toString, p(10), p(12)))
            lines4slide.foreachRDD { rdd2 =&amp;gt;
               if (!rdd2.isEmpty) {
                    val count = rdd2.count
                    println("count received " + count)
                    import org.apache.spark.sql.functions._
                    import spark.implicits._
                    val LogDF = rdd2.toDF()
                               
                    LogDF.createOrReplaceTempView("Log")
                    val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method, ' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation, '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
                    LogDFslide.createOrReplaceTempView("LogDFslide")
                    //LogDFslide.printSchema()
                    //LogDFslide.show
                    val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application FROM LogDFslide")
                    Log2DFslide.createOrReplaceTempView("Log2DFslide")
                    val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, application FROM Log2DFslide")                    // Log2DFslide.show   
                    //println("printing line3")
                    //Log2DFslideoutput.show
                   // Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
                    val   log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, application from  Log2DFslide where responsetime &amp;lt;&amp;gt;'-' and responsetime &amp;lt;&amp;gt;'' ")  
                    log2DFFilter.createOrReplaceTempView("log2DFFilter")
                    //log2DFFilter.printSchema()
                    log2DFFilter.show
                    val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice, round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 &amp;lt;&amp;gt;'' group by initcap(webservice2) ")


                    // val  Log3DFslide =  log2DFFilter.select(expr("initcap(webservice2)"), expr("round(avg(responsetime),4)").as("Averageresponsetime")  ).groupBy(expr("initcap(webservice2)"))
                
                    // Log3DFslide.printSchema()
                    Log3DFslide.createOrReplaceTempView("Log3DFslide")
                            
                     Log3DFslide.show
                    //Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
                            
                                 }
                                  } 
								  ssc
   
      } 
   def main(args: Array[String]) { 
        val stopActiveContext = true
        if (stopActiveContext) 
     {    
        StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
        // Get or create a streaming context.
        val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
       
       // val ssc = StreamingContext.getOrCreate(checkpointDir, () =&amp;gt; creatingFunc(checkpointDir))
        ssc.start() 
        ssc.awaitTermination() 
          } 
             
    }
}
&lt;/PRE&gt;</description>
    <pubDate>Sat, 26 Aug 2017 04:08:23 GMT</pubDate>
    <dc:creator>Chandra</dc:creator>
    <dc:date>2017-08-26T04:08:23Z</dc:date>
    <item>
      <title>Failing Checkpoint Spark Streaming</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Failing-Checkpoint-Spark-Streaming/m-p/233570#M67334</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;My program runs fine without checkpoint and when I modified the program to make it fault tolerant , I get the error as attached in the file&lt;/P&gt;&lt;P&gt;The program runs fine when it starts fresh...but if it comes from a checkpoint it fails...not sure where I am i doing wrong. Any help will be appreciated.&lt;/P&gt;&lt;P&gt;&lt;A href="https://community.cloudera.com/legacyfs/online/attachments/38431-error.txt"&gt;error.txt&lt;/A&gt;&lt;/P&gt;&lt;PRE&gt;package ca.twitter2
import org.apache.kafka.clients._
import org.apache.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import java.util.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}


object NGINXLogProcessingWindowedwithcheckpointappl {    
     case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method: String, serverip2: String, responsetime: String, operation: String, application: String)
    val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
   
   def creatingFunc(): StreamingContext = {
   println("Creating new context")
   val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
   
  .setMaster("local")
   val ssc = new StreamingContext(sparkConf, Seconds(120))
  
	 ssc.checkpoint(checkpointDir)
	 sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
	 val spark = SparkSession
                   .builder()                  
                   .getOrCreate()
                   
       val topics = List("REST").toSet
       // Logger.getLogger("org").setLevel(Level.ERROR)
        //Logger.getLogger("akka").setLevel(Level.ERROR)
       val kafkaParams = Map[String, Object](
                         "bootstrap.servers" -&amp;gt; "10.24.18.36:6667",
                          //"bootstrap.servers" -&amp;gt; "10.71.52.119:9092",
          // "bootstrap.servers" -&amp;gt; "192.168.123.36:6667",
                         "group.id" -&amp;gt; "2",
                          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -&amp;gt;"org.apache.kafka.common.serialization.StringDeserializer",
                          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -&amp;gt; "org.apache.kafka.common.serialization.StringDeserializer",
                          "auto.offset.reset" -&amp;gt; "latest",
                          "enable.auto.commit" -&amp;gt; (false: java.lang.Boolean)
                                             )
      val WINDOW_LENGTH = Seconds(43200)
      val SLIDE_INTERVAL = Seconds(120)
               // Create the direct stream with the Kafka parameters and topics
      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      val  kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
      val lines = kafkaStream.map(_.value()).repartition(4)
      val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
      val lines2= lineswindowed.map(_.split(","))
      val lines4slide= lines2.map(p =&amp;gt; AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, p(7).toString, p(10), p(12)))
            lines4slide.foreachRDD { rdd2 =&amp;gt;
               if (!rdd2.isEmpty) {
                    val count = rdd2.count
                    println("count received " + count)
                    import org.apache.spark.sql.functions._
                    import spark.implicits._
                    val LogDF = rdd2.toDF()
                               
                    LogDF.createOrReplaceTempView("Log")
                    val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method, ' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation, '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
                    LogDFslide.createOrReplaceTempView("LogDFslide")
                    //LogDFslide.printSchema()
                    //LogDFslide.show
                    val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application FROM LogDFslide")
                    Log2DFslide.createOrReplaceTempView("Log2DFslide")
                    val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, application FROM Log2DFslide")                    // Log2DFslide.show   
                    //println("printing line3")
                    //Log2DFslideoutput.show
                   // Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
                    val   log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, application from  Log2DFslide where responsetime &amp;lt;&amp;gt;'-' and responsetime &amp;lt;&amp;gt;'' ")  
                    log2DFFilter.createOrReplaceTempView("log2DFFilter")
                    //log2DFFilter.printSchema()
                    log2DFFilter.show
                    val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice, round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 &amp;lt;&amp;gt;'' group by initcap(webservice2) ")


                    // val  Log3DFslide =  log2DFFilter.select(expr("initcap(webservice2)"), expr("round(avg(responsetime),4)").as("Averageresponsetime")  ).groupBy(expr("initcap(webservice2)"))
                
                    // Log3DFslide.printSchema()
                    Log3DFslide.createOrReplaceTempView("Log3DFslide")
                            
                     Log3DFslide.show
                    //Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
                            
                                 }
                                  } 
								  ssc
   
      } 
   def main(args: Array[String]) { 
        val stopActiveContext = true
        if (stopActiveContext) 
     {    
        StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
        // Get or create a streaming context.
        val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
       
       // val ssc = StreamingContext.getOrCreate(checkpointDir, () =&amp;gt; creatingFunc(checkpointDir))
        ssc.start() 
        ssc.awaitTermination() 
          } 
             
    }
}
&lt;/PRE&gt;</description>
      <pubDate>Sat, 26 Aug 2017 04:08:23 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Failing-Checkpoint-Spark-Streaming/m-p/233570#M67334</guid>
      <dc:creator>Chandra</dc:creator>
      <dc:date>2017-08-26T04:08:23Z</dc:date>
    </item>
    <item>
      <title>Re: Failing Checkpoint Spark Streaming</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Failing-Checkpoint-Spark-Streaming/m-p/233571#M67335</link>
      <description>&lt;P&gt;Figured out that I had to use dataset to make sure checkpointing works....&lt;/P&gt;</description>
      <pubDate>Wed, 13 Sep 2017 02:24:52 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Failing-Checkpoint-Spark-Streaming/m-p/233571#M67335</guid>
      <dc:creator>Chandra</dc:creator>
      <dc:date>2017-09-13T02:24:52Z</dc:date>
    </item>
  </channel>
</rss>

