Member since
09-03-2015
50
Posts
8
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1597 | 09-12-2017 07:24 PM |
12-14-2018
08:39 PM
Thanks Eric Wohlstadter. Do we add this as a custom spark2 defaults propoerty in the config
... View more
12-14-2018
08:24 PM
where is the setting "spark.security.credentials.hiveserver2.enabled" updated, Spark config in Ambari or Hive?
... View more
11-03-2017
07:42 PM
Hi @Greg Keys, Thanks for the post. Row filtering works based on the column values which is not in the end. But I am not sure how to filter the rows based on the last column value. Can you please let me know. Thanks
... View more
11-03-2017
04:17 PM
Thanks very much. I see now whats going on. I tried both of your suggestions and seem to work well
... View more
11-02-2017
04:11 PM
Hi, I have a spark streaming application which analysis log files and processes them. Eventually it dumps the processed results in a Hive Table (Internal). But the problem with this is that when spark loads the data, it creates small files and I have all the options in Hive configuration with regards to merging set to True. But still merging isnt happening. Please check the image of the config parameters attached. Any help will be greatly appreciated. Thanks, Chandra
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Spark
09-12-2017
07:24 PM
Figured out that I had to use dataset to make sure checkpointing works....
... View more
08-25-2017
09:08 PM
Hi, My program runs fine without checkpoint and when I modified the program to make it fault tolerant , I get the error as attached in the file The program runs fine when it starts fresh...but if it comes from a checkpoint it fails...not sure where I am i doing wrong. Any help will be appreciated. error.txt package ca.twitter2
import org.apache.kafka.clients._
import org.apache.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import java.util.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
object NGINXLogProcessingWindowedwithcheckpointappl {
case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method: String, serverip2: String, responsetime: String, operation: String, application: String)
val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
def creatingFunc(): StreamingContext = {
println("Creating new context")
val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
.setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(120))
ssc.checkpoint(checkpointDir)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val spark = SparkSession
.builder()
.getOrCreate()
val topics = List("REST").toSet
// Logger.getLogger("org").setLevel(Level.ERROR)
//Logger.getLogger("akka").setLevel(Level.ERROR)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "10.24.18.36:6667",
//"bootstrap.servers" -> "10.71.52.119:9092",
// "bootstrap.servers" -> "192.168.123.36:6667",
"group.id" -> "2",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val WINDOW_LENGTH = Seconds(43200)
val SLIDE_INTERVAL = Seconds(120)
// Create the direct stream with the Kafka parameters and topics
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
val lines = kafkaStream.map(_.value()).repartition(4)
val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
val lines2= lineswindowed.map(_.split(","))
val lines4slide= lines2.map(p => AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, p(7).toString, p(10), p(12)))
lines4slide.foreachRDD { rdd2 =>
if (!rdd2.isEmpty) {
val count = rdd2.count
println("count received " + count)
import org.apache.spark.sql.functions._
import spark.implicits._
val LogDF = rdd2.toDF()
LogDF.createOrReplaceTempView("Log")
val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method, ' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation, '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
LogDFslide.createOrReplaceTempView("LogDFslide")
//LogDFslide.printSchema()
//LogDFslide.show
val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application FROM LogDFslide")
Log2DFslide.createOrReplaceTempView("Log2DFslide")
val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, application FROM Log2DFslide") // Log2DFslide.show
//println("printing line3")
//Log2DFslideoutput.show
// Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
val log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, application from Log2DFslide where responsetime <>'-' and responsetime <>'' ")
log2DFFilter.createOrReplaceTempView("log2DFFilter")
//log2DFFilter.printSchema()
log2DFFilter.show
val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice, round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 <>'' group by initcap(webservice2) ")
// val Log3DFslide = log2DFFilter.select(expr("initcap(webservice2)"), expr("round(avg(responsetime),4)").as("Averageresponsetime") ).groupBy(expr("initcap(webservice2)"))
// Log3DFslide.printSchema()
Log3DFslide.createOrReplaceTempView("Log3DFslide")
Log3DFslide.show
//Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
}
}
ssc
}
def main(args: Array[String]) {
val stopActiveContext = true
if (stopActiveContext)
{
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
// Get or create a streaming context.
val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
// val ssc = StreamingContext.getOrCreate(checkpointDir, () => creatingFunc(checkpointDir))
ssc.start()
ssc.awaitTermination()
}
}
}
... View more
Labels:
- Labels:
-
Apache Spark
08-02-2017
09:11 PM
Hi, We have a Hadoop on-premise cluster and are planning to integrate spark with scikit learn using the spark-sklearn package. Can you please let me know if we need to install sklearn and spark-sklearn package in all nodes or just the node where spark2-history server has been installed. We will be using yarn for resource allocation. Thanks, Chandra
... View more
Labels:
- Labels:
-
Apache Spark
07-28-2017
03:23 PM
Thanks. If i not use Window and choose to use Streaming the data on to HDFS, could you suggest how to only store 1 week worth of data. Should i create a cron job to delete HDFS files older than a week. PLease let me know if you have any other suggestions
... View more
07-27-2017
04:56 PM
Hi, I was just wondering if it is ok to perform window operations on dstreams with 1 week as window length. Please let me know if there are any major concerns. Thanks
... View more
Labels:
- Labels:
-
Apache Spark
07-19-2017
10:28 PM
Does that mean that Nifi has built in Apache Tika into it or should we install Apache Tika externally
... View more
06-06-2017
09:59 PM
Hi I am getting error "Queries with streaming sources must be executed with writeStream.start();" while running the code shown below. Any help will be greatly appreciated. package ca.twitter2
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
//import org.apache.spark.streaming.kafka010._
//import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
//import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
//import kafka.serializer.StringDecoder
import java.util.HashMap
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.kafka010
//import com.datastax.spark.connector._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{explode, split}
object kafkatest3 {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("kafkatest3")
.master("local[*]")
.getOrCreate()
val topics = Array("twitter")
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "siaihdf1a.coc.ca:6667")
.option("subscribe", "twitter")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
val df = ds1.selectExpr("CAST(key AS STRING)", "CAST( value AS STRING)").as[(String, String)]
ds1.printSchema()
df.createOrReplaceTempView("df");
val records = spark.sql ("SELECT count(*) from df GROUP BY key")
records.show()
val query = records.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
spark.stop()
}
}
Thanks, CHandra
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark
10-04-2016
07:50 PM
1 Kudo
Hi, I am in the process of doing a protype of IOT in our organisation and in the process of charting out the architecture. It would be really appreciated if someone could help me choose the stream processor - storm or spark streaming. Not sure which one I should go about. Basically we are planning to record sensor events from fleet and we are ok with ocassional message loss. Also we prefer something which is easy to implement. Not sure which one is easier to implement as well. We are also planning to utilize the lambda architecture..one for batch and the other one for real time information. Thanks
... View more
Labels:
- Labels:
-
Apache Spark
-
Apache Storm
09-29-2016
07:34 PM
Hi, We are planning to do a prototype of Hadoop Log Analysis and not sure which Data ingestion tool should we select - Nifi or Flume. Can anyone suggest me which one we select and why - pros and cons Thanks, Chandra
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache NiFi
09-02-2016
09:19 PM
Also what is the need to run Hive queries on SparkSql when Hive on Tez can run much faster....
... View more
09-02-2016
08:16 PM
Thanks for your valuable information. So your recommendation is to go for Hive on LLAP rather than SparkSQL. Please correct me if I am wrong.
... View more
09-02-2016
07:44 PM
1 Kudo
Hi, Can you please let me know which one is faster -Hive on Tez or accessing Hive using Spark SQL. Thanks, Chandra
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Spark
06-15-2016
06:01 PM
Thanks for your answer. If we have multiple directories, will the hdfs files be stored multiple times in those directories? sorry I am a newbie hence need to get this clarified.
... View more
06-15-2016
05:45 PM
2 Kudos
directories.pngHi, I see that by default after the automated install using Ambari, there are bunch of directories under name node and data node settings in the configuration. Can you tell me if that is the best practice or if we should remove some directories in that and keep only one in each. Also please let me know if removing those do not affect the hdfs services. Thanks, Chandra
... View more
Labels:
- Labels:
-
Apache Hadoop
06-15-2016
04:35 PM
my zookeeper version is 3.4.6.2.4
... View more
06-14-2016
09:59 PM
dont know what i did..but i tried restarting each service manually using command line and it started working. One wierd thing is that if i try to start all the services from the Ambari dashboard it does not work. But i try to start all the services from the hosts page which says "start all host components" it works. Also the zookeeper service keeps stopping all the time. otherthan that all the services are running fine. Any suggestions will be greatly appreciated.
... View more
06-14-2016
04:25 PM
I am seeing this in zoo keeper log ...any idea what this means 2016-06-14 02:55:01,693 - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /10.71.34.225:56032 (no session established for client)
2016-06-14 02:56:01,723 - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /10.71.34.225:56208
2016-06-14 02:56:01,723 - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 0x0, likely client has closed socket
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
at java.lang.Thread.run(Thread.java:745)
... View more
06-14-2016
03:24 PM
Hi, We installed HDP using root account successfully and the services came up after the install ok with no issues. But now, all services are down and we see the message "Connection failed: [Errno 111] Connection refused to sibi1b.gov.calgary.ab.ca:" Not sure what is going on...Anyone had this issue and the resolution. Any help will be greatly appreciated. Rebooting the server didn't help as well. Thanks Chandra
... View more
Labels:
- Labels:
-
Hortonworks Data Platform (HDP)
06-13-2016
05:51 PM
1 Kudo
Hi, Since the largest folder in HDP is under /usr/hdp, we are planning to mount it in advance as 50G prior to running the installation. Will it delete and recreate the folder while running the installation? This was the recommendation from unix team. Please let me know Thanks
... View more
Labels:
- Labels:
-
Hortonworks Data Platform (HDP)
06-09-2016
05:43 PM
Thank you very much.
... View more
06-08-2016
09:41 PM
Also can you tell me the location where it is installed. We have to allocate proper space to that specific location.
... View more