Created 08-25-2017 09:08 PM
Hi,
My program runs fine without checkpoint and when I modified the program to make it fault tolerant , I get the error as attached in the file
The program runs fine when it starts fresh...but if it comes from a checkpoint it fails...not sure where I am i doing wrong. Any help will be appreciated.
package ca.twitter2 import org.apache.kafka.clients._ import org.apache.kafka._ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients._ import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.log4j._ import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.KafkaUtils import java.util.HashMap import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.log4j.{Level, Logger} object NGINXLogProcessingWindowedwithcheckpointappl { case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method: String, serverip2: String, responsetime: String, operation: String, application: String) val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint" def creatingFunc(): StreamingContext = { println("Creating new context") val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint") .setMaster("local") val ssc = new StreamingContext(sparkConf, Seconds(120)) ssc.checkpoint(checkpointDir) sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") val spark = SparkSession .builder() .getOrCreate() val topics = List("REST").toSet // Logger.getLogger("org").setLevel(Level.ERROR) //Logger.getLogger("akka").setLevel(Level.ERROR) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "10.24.18.36:6667", //"bootstrap.servers" -> "10.71.52.119:9092", // "bootstrap.servers" -> "192.168.123.36:6667", "group.id" -> "2", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val WINDOW_LENGTH = Seconds(43200) val SLIDE_INTERVAL = Seconds(120) // Create the direct stream with the Kafka parameters and topics val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy) val lines = kafkaStream.map(_.value()).repartition(4) val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL) val lines2= lineswindowed.map(_.split(",")) val lines4slide= lines2.map(p => AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, p(7).toString, p(10), p(12))) lines4slide.foreachRDD { rdd2 => if (!rdd2.isEmpty) { val count = rdd2.count println("count received " + count) import org.apache.spark.sql.functions._ import spark.implicits._ val LogDF = rdd2.toDF() LogDF.createOrReplaceTempView("Log") val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method, ' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation, '/'))[4]").cast("string").as("operationtype"), $"application".cast("string")) LogDFslide.createOrReplaceTempView("LogDFslide") //LogDFslide.printSchema() //LogDFslide.show val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application FROM LogDFslide") Log2DFslide.createOrReplaceTempView("Log2DFslide") val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, application FROM Log2DFslide") // Log2DFslide.show //println("printing line3") //Log2DFslideoutput.show // Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed"); val log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, application from Log2DFslide where responsetime <>'-' and responsetime <>'' ") log2DFFilter.createOrReplaceTempView("log2DFFilter") //log2DFFilter.printSchema() log2DFFilter.show val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice, round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 <>'' group by initcap(webservice2) ") // val Log3DFslide = log2DFFilter.select(expr("initcap(webservice2)"), expr("round(avg(responsetime),4)").as("Averageresponsetime") ).groupBy(expr("initcap(webservice2)")) // Log3DFslide.printSchema() Log3DFslide.createOrReplaceTempView("Log3DFslide") Log3DFslide.show //Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics"); } } ssc } def main(args: Array[String]) { val stopActiveContext = true if (stopActiveContext) { StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } // Get or create a streaming context. val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc) // val ssc = StreamingContext.getOrCreate(checkpointDir, () => creatingFunc(checkpointDir)) ssc.start() ssc.awaitTermination() } } }
Created 09-12-2017 07:24 PM
Figured out that I had to use dataset to make sure checkpointing works....
Created 09-12-2017 07:24 PM
Figured out that I had to use dataset to make sure checkpointing works....