Support Questions

Find answers, ask questions, and share your expertise

Failing Checkpoint Spark Streaming

avatar
Expert Contributor

Hi,

My program runs fine without checkpoint and when I modified the program to make it fault tolerant , I get the error as attached in the file

The program runs fine when it starts fresh...but if it comes from a checkpoint it fails...not sure where I am i doing wrong. Any help will be appreciated.

error.txt

package ca.twitter2
import org.apache.kafka.clients._
import org.apache.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import java.util.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}


object NGINXLogProcessingWindowedwithcheckpointappl {    
     case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method: String, serverip2: String, responsetime: String, operation: String, application: String)
    val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
   
   def creatingFunc(): StreamingContext = {
   println("Creating new context")
   val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
   
  .setMaster("local")
   val ssc = new StreamingContext(sparkConf, Seconds(120))
  
	 ssc.checkpoint(checkpointDir)
	 sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
	 val spark = SparkSession
                   .builder()                  
                   .getOrCreate()
                   
       val topics = List("REST").toSet
       // Logger.getLogger("org").setLevel(Level.ERROR)
        //Logger.getLogger("akka").setLevel(Level.ERROR)
       val kafkaParams = Map[String, Object](
                         "bootstrap.servers" -> "10.24.18.36:6667",
                          //"bootstrap.servers" -> "10.71.52.119:9092",
          // "bootstrap.servers" -> "192.168.123.36:6667",
                         "group.id" -> "2",
                          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",
                          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
                          "auto.offset.reset" -> "latest",
                          "enable.auto.commit" -> (false: java.lang.Boolean)
                                             )
      val WINDOW_LENGTH = Seconds(43200)
      val SLIDE_INTERVAL = Seconds(120)
               // Create the direct stream with the Kafka parameters and topics
      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      val  kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
      val lines = kafkaStream.map(_.value()).repartition(4)
      val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
      val lines2= lineswindowed.map(_.split(","))
      val lines4slide= lines2.map(p => AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, p(7).toString, p(10), p(12)))
            lines4slide.foreachRDD { rdd2 =>
               if (!rdd2.isEmpty) {
                    val count = rdd2.count
                    println("count received " + count)
                    import org.apache.spark.sql.functions._
                    import spark.implicits._
                    val LogDF = rdd2.toDF()
                               
                    LogDF.createOrReplaceTempView("Log")
                    val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method, ' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation, '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
                    LogDFslide.createOrReplaceTempView("LogDFslide")
                    //LogDFslide.printSchema()
                    //LogDFslide.show
                    val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application FROM LogDFslide")
                    Log2DFslide.createOrReplaceTempView("Log2DFslide")
                    val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, application FROM Log2DFslide")                    // Log2DFslide.show   
                    //println("printing line3")
                    //Log2DFslideoutput.show
                   // Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
                    val   log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, application from  Log2DFslide where responsetime <>'-' and responsetime <>'' ")  
                    log2DFFilter.createOrReplaceTempView("log2DFFilter")
                    //log2DFFilter.printSchema()
                    log2DFFilter.show
                    val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice, round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 <>'' group by initcap(webservice2) ")


                    // val  Log3DFslide =  log2DFFilter.select(expr("initcap(webservice2)"), expr("round(avg(responsetime),4)").as("Averageresponsetime")  ).groupBy(expr("initcap(webservice2)"))
                
                    // Log3DFslide.printSchema()
                    Log3DFslide.createOrReplaceTempView("Log3DFslide")
                            
                     Log3DFslide.show
                    //Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
                            
                                 }
                                  } 
								  ssc
   
      } 
   def main(args: Array[String]) { 
        val stopActiveContext = true
        if (stopActiveContext) 
     {    
        StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
        // Get or create a streaming context.
        val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
       
       // val ssc = StreamingContext.getOrCreate(checkpointDir, () => creatingFunc(checkpointDir))
        ssc.start() 
        ssc.awaitTermination() 
          } 
             
    }
}
1 ACCEPTED SOLUTION

avatar
Expert Contributor

Figured out that I had to use dataset to make sure checkpointing works....

View solution in original post

1 REPLY 1

avatar
Expert Contributor

Figured out that I had to use dataset to make sure checkpointing works....