Member since
09-03-2015
50
Posts
8
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2915 | 09-12-2017 07:24 PM |
12-14-2018
08:39 PM
Thanks Eric Wohlstadter. Do we add this as a custom spark2 defaults propoerty in the config
... View more
12-14-2018
08:24 PM
where is the setting "spark.security.credentials.hiveserver2.enabled" updated, Spark config in Ambari or Hive?
... View more
11-03-2017
07:42 PM
Hi @Greg Keys, Thanks for the post. Row filtering works based on the column values which is not in the end. But I am not sure how to filter the rows based on the last column value. Can you please let me know. Thanks
... View more
11-03-2017
04:17 PM
Thanks very much. I see now whats going on. I tried both of your suggestions and seem to work well
... View more
11-02-2017
04:11 PM
Hi, I have a spark streaming application which analysis log files and processes them. Eventually it dumps the processed results in a Hive Table (Internal). But the problem with this is that when spark loads the data, it creates small files and I have all the options in Hive configuration with regards to merging set to True. But still merging isnt happening. Please check the image of the config parameters attached. Any help will be greatly appreciated. Thanks, Chandra
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Spark
09-12-2017
07:24 PM
Figured out that I had to use dataset to make sure checkpointing works....
... View more
08-25-2017
09:08 PM
Hi, My program runs fine without checkpoint and when I modified the program to make it fault tolerant , I get the error as attached in the file The program runs fine when it starts fresh...but if it comes from a checkpoint it fails...not sure where I am i doing wrong. Any help will be appreciated. error.txt package ca.twitter2
import org.apache.kafka.clients._
import org.apache.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import java.util.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
object NGINXLogProcessingWindowedwithcheckpointappl {
case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method: String, serverip2: String, responsetime: String, operation: String, application: String)
val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
def creatingFunc(): StreamingContext = {
println("Creating new context")
val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
.setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(120))
ssc.checkpoint(checkpointDir)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val spark = SparkSession
.builder()
.getOrCreate()
val topics = List("REST").toSet
// Logger.getLogger("org").setLevel(Level.ERROR)
//Logger.getLogger("akka").setLevel(Level.ERROR)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "10.24.18.36:6667",
//"bootstrap.servers" -> "10.71.52.119:9092",
// "bootstrap.servers" -> "192.168.123.36:6667",
"group.id" -> "2",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val WINDOW_LENGTH = Seconds(43200)
val SLIDE_INTERVAL = Seconds(120)
// Create the direct stream with the Kafka parameters and topics
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
val lines = kafkaStream.map(_.value()).repartition(4)
val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
val lines2= lineswindowed.map(_.split(","))
val lines4slide= lines2.map(p => AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, p(7).toString, p(10), p(12)))
lines4slide.foreachRDD { rdd2 =>
if (!rdd2.isEmpty) {
val count = rdd2.count
println("count received " + count)
import org.apache.spark.sql.functions._
import spark.implicits._
val LogDF = rdd2.toDF()
LogDF.createOrReplaceTempView("Log")
val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method, ' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation, '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
LogDFslide.createOrReplaceTempView("LogDFslide")
//LogDFslide.printSchema()
//LogDFslide.show
val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application FROM LogDFslide")
Log2DFslide.createOrReplaceTempView("Log2DFslide")
val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, application FROM Log2DFslide") // Log2DFslide.show
//println("printing line3")
//Log2DFslideoutput.show
// Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
val log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, application from Log2DFslide where responsetime <>'-' and responsetime <>'' ")
log2DFFilter.createOrReplaceTempView("log2DFFilter")
//log2DFFilter.printSchema()
log2DFFilter.show
val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice, round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 <>'' group by initcap(webservice2) ")
// val Log3DFslide = log2DFFilter.select(expr("initcap(webservice2)"), expr("round(avg(responsetime),4)").as("Averageresponsetime") ).groupBy(expr("initcap(webservice2)"))
// Log3DFslide.printSchema()
Log3DFslide.createOrReplaceTempView("Log3DFslide")
Log3DFslide.show
//Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
}
}
ssc
}
def main(args: Array[String]) {
val stopActiveContext = true
if (stopActiveContext)
{
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
// Get or create a streaming context.
val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
// val ssc = StreamingContext.getOrCreate(checkpointDir, () => creatingFunc(checkpointDir))
ssc.start()
ssc.awaitTermination()
}
}
}
... View more
Labels:
- Labels:
-
Apache Spark
08-02-2017
09:11 PM
Hi, We have a Hadoop on-premise cluster and are planning to integrate spark with scikit learn using the spark-sklearn package. Can you please let me know if we need to install sklearn and spark-sklearn package in all nodes or just the node where spark2-history server has been installed. We will be using yarn for resource allocation. Thanks, Chandra
... View more
Labels:
- Labels:
-
Apache Spark
07-28-2017
03:23 PM
Thanks. If i not use Window and choose to use Streaming the data on to HDFS, could you suggest how to only store 1 week worth of data. Should i create a cron job to delete HDFS files older than a week. PLease let me know if you have any other suggestions
... View more