Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Failing Checkpoint Spark Streaming

Solved Go to solution

Failing Checkpoint Spark Streaming

Contributor

Hi,

My program runs fine without checkpoint and when I modified the program to make it fault tolerant , I get the error as attached in the file

The program runs fine when it starts fresh...but if it comes from a checkpoint it fails...not sure where I am i doing wrong. Any help will be appreciated.

error.txt

package ca.twitter2
import org.apache.kafka.clients._
import org.apache.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import java.util.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}


object NGINXLogProcessingWindowedwithcheckpointappl {    
     case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method: String, serverip2: String, responsetime: String, operation: String, application: String)
    val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
   
   def creatingFunc(): StreamingContext = {
   println("Creating new context")
   val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
   
  .setMaster("local")
   val ssc = new StreamingContext(sparkConf, Seconds(120))
  
	 ssc.checkpoint(checkpointDir)
	 sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
	 val spark = SparkSession
                   .builder()                  
                   .getOrCreate()
                   
       val topics = List("REST").toSet
       // Logger.getLogger("org").setLevel(Level.ERROR)
        //Logger.getLogger("akka").setLevel(Level.ERROR)
       val kafkaParams = Map[String, Object](
                         "bootstrap.servers" -> "10.24.18.36:6667",
                          //"bootstrap.servers" -> "10.71.52.119:9092",
          // "bootstrap.servers" -> "192.168.123.36:6667",
                         "group.id" -> "2",
                          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",
                          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
                          "auto.offset.reset" -> "latest",
                          "enable.auto.commit" -> (false: java.lang.Boolean)
                                             )
      val WINDOW_LENGTH = Seconds(43200)
      val SLIDE_INTERVAL = Seconds(120)
               // Create the direct stream with the Kafka parameters and topics
      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      val  kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
      val lines = kafkaStream.map(_.value()).repartition(4)
      val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
      val lines2= lineswindowed.map(_.split(","))
      val lines4slide= lines2.map(p => AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, p(7).toString, p(10), p(12)))
            lines4slide.foreachRDD { rdd2 =>
               if (!rdd2.isEmpty) {
                    val count = rdd2.count
                    println("count received " + count)
                    import org.apache.spark.sql.functions._
                    import spark.implicits._
                    val LogDF = rdd2.toDF()
                               
                    LogDF.createOrReplaceTempView("Log")
                    val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method, ' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation, '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
                    LogDFslide.createOrReplaceTempView("LogDFslide")
                    //LogDFslide.printSchema()
                    //LogDFslide.show
                    val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application FROM LogDFslide")
                    Log2DFslide.createOrReplaceTempView("Log2DFslide")
                    val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, application FROM Log2DFslide")                    // Log2DFslide.show   
                    //println("printing line3")
                    //Log2DFslideoutput.show
                   // Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
                    val   log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, application from  Log2DFslide where responsetime <>'-' and responsetime <>'' ")  
                    log2DFFilter.createOrReplaceTempView("log2DFFilter")
                    //log2DFFilter.printSchema()
                    log2DFFilter.show
                    val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice, round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 <>'' group by initcap(webservice2) ")


                    // val  Log3DFslide =  log2DFFilter.select(expr("initcap(webservice2)"), expr("round(avg(responsetime),4)").as("Averageresponsetime")  ).groupBy(expr("initcap(webservice2)"))
                
                    // Log3DFslide.printSchema()
                    Log3DFslide.createOrReplaceTempView("Log3DFslide")
                            
                     Log3DFslide.show
                    //Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
                            
                                 }
                                  } 
								  ssc
   
      } 
   def main(args: Array[String]) { 
        val stopActiveContext = true
        if (stopActiveContext) 
     {    
        StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
        // Get or create a streaming context.
        val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
       
       // val ssc = StreamingContext.getOrCreate(checkpointDir, () => creatingFunc(checkpointDir))
        ssc.start() 
        ssc.awaitTermination() 
          } 
             
    }
}
1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: Failing Checkpoint Spark Streaming

Contributor

Figured out that I had to use dataset to make sure checkpointing works....

View solution in original post

1 REPLY 1
Highlighted

Re: Failing Checkpoint Spark Streaming

Contributor

Figured out that I had to use dataset to make sure checkpointing works....

View solution in original post

Don't have an account?
Coming from Hortonworks? Activate your account here