Member since 
    
	
		
		
		09-03-2015
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                50
            
            
                Posts
            
        
                8
            
            
                Kudos Received
            
        
                1
            
            
                Solution
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 3718 | 09-12-2017 07:24 PM | 
			
    
	
		
		
		12-14-2018
	
		
		08:39 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Thanks Eric Wohlstadter. Do we add this as a custom spark2 defaults propoerty in the config 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		12-14-2018
	
		
		08:24 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 where is the setting "spark.security.credentials.hiveserver2.enabled" updated, Spark config in Ambari or Hive? 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-03-2017
	
		
		07:42 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hi @Greg Keys,  Thanks for the post. Row filtering works based on the column values which is not in the end. But I am not sure how to filter the rows based on the last column value. Can you please let me know.  Thanks 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-03-2017
	
		
		04:17 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Thanks very much. I see now whats going on. I tried both of your suggestions and seem to work well 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-02-2017
	
		
		04:11 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hi,  I have a spark streaming application which analysis log files and processes them. Eventually it dumps the processed results in a Hive Table (Internal). But the problem with this is that when spark loads the data, it creates small files and I have all the options in Hive configuration with regards to merging set to True. But still merging isnt happening. Please check the image of the config parameters attached. Any help will be greatly appreciated.  Thanks,  Chandra     
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
 - 
						
							
		
			Apache Hive
 - 
						
							
		
			Apache Spark
 
			
    
	
		
		
		09-12-2017
	
		
		07:24 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Figured out that I had to use dataset to make sure checkpointing works.... 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		08-25-2017
	
		
		09:08 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hi,  My program runs fine without checkpoint and when I modified the program to make it fault tolerant , I get the error as attached in the file  The program runs fine when it starts fresh...but if it comes from a checkpoint it fails...not sure where I am i doing wrong. Any help will be appreciated.  error.txt  package ca.twitter2
import org.apache.kafka.clients._
import org.apache.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import java.util.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
object NGINXLogProcessingWindowedwithcheckpointappl {    
     case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method: String, serverip2: String, responsetime: String, operation: String, application: String)
    val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
   
   def creatingFunc(): StreamingContext = {
   println("Creating new context")
   val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
   
  .setMaster("local")
   val ssc = new StreamingContext(sparkConf, Seconds(120))
  
	 ssc.checkpoint(checkpointDir)
	 sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
	 val spark = SparkSession
                   .builder()                  
                   .getOrCreate()
                   
       val topics = List("REST").toSet
       // Logger.getLogger("org").setLevel(Level.ERROR)
        //Logger.getLogger("akka").setLevel(Level.ERROR)
       val kafkaParams = Map[String, Object](
                         "bootstrap.servers" -> "10.24.18.36:6667",
                          //"bootstrap.servers" -> "10.71.52.119:9092",
          // "bootstrap.servers" -> "192.168.123.36:6667",
                         "group.id" -> "2",
                          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",
                          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
                          "auto.offset.reset" -> "latest",
                          "enable.auto.commit" -> (false: java.lang.Boolean)
                                             )
      val WINDOW_LENGTH = Seconds(43200)
      val SLIDE_INTERVAL = Seconds(120)
               // Create the direct stream with the Kafka parameters and topics
      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      val  kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
      val lines = kafkaStream.map(_.value()).repartition(4)
      val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
      val lines2= lineswindowed.map(_.split(","))
      val lines4slide= lines2.map(p => AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, p(7).toString, p(10), p(12)))
            lines4slide.foreachRDD { rdd2 =>
               if (!rdd2.isEmpty) {
                    val count = rdd2.count
                    println("count received " + count)
                    import org.apache.spark.sql.functions._
                    import spark.implicits._
                    val LogDF = rdd2.toDF()
                               
                    LogDF.createOrReplaceTempView("Log")
                    val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method, ' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation, '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
                    LogDFslide.createOrReplaceTempView("LogDFslide")
                    //LogDFslide.printSchema()
                    //LogDFslide.show
                    val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application FROM LogDFslide")
                    Log2DFslide.createOrReplaceTempView("Log2DFslide")
                    val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, application FROM Log2DFslide")                    // Log2DFslide.show   
                    //println("printing line3")
                    //Log2DFslideoutput.show
                   // Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
                    val   log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, application from  Log2DFslide where responsetime <>'-' and responsetime <>'' ")  
                    log2DFFilter.createOrReplaceTempView("log2DFFilter")
                    //log2DFFilter.printSchema()
                    log2DFFilter.show
                    val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice, round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 <>'' group by initcap(webservice2) ")
                    // val  Log3DFslide =  log2DFFilter.select(expr("initcap(webservice2)"), expr("round(avg(responsetime),4)").as("Averageresponsetime")  ).groupBy(expr("initcap(webservice2)"))
                
                    // Log3DFslide.printSchema()
                    Log3DFslide.createOrReplaceTempView("Log3DFslide")
                            
                     Log3DFslide.show
                    //Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
                            
                                 }
                                  } 
								  ssc
   
      } 
   def main(args: Array[String]) { 
        val stopActiveContext = true
        if (stopActiveContext) 
     {    
        StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
        // Get or create a streaming context.
        val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
       
       // val ssc = StreamingContext.getOrCreate(checkpointDir, () => creatingFunc(checkpointDir))
        ssc.start() 
        ssc.awaitTermination() 
          } 
             
    }
}
 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
 - 
						
							
		
			Apache Spark
 
			
    
	
		
		
		08-02-2017
	
		
		09:11 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hi,  We have a Hadoop on-premise cluster and are planning to integrate spark with scikit learn using the spark-sklearn package. Can you please let me know if we need to install sklearn and spark-sklearn package in all nodes or just the node where spark2-history server has been installed. We will be using yarn for resource allocation.  Thanks,  Chandra 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
 - 
						
							
		
			Apache Spark
 
			
    
	
		
		
		07-28-2017
	
		
		03:23 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Thanks. If i not use Window and choose to use Streaming the data on to HDFS, could you suggest how to only store 1 week worth of data. Should i create a cron job to delete HDFS files older than a week. PLease let me know if you have any other suggestions 
						
					
					... View more