Member since
09-03-2015
50
Posts
8
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
723 | 09-12-2017 07:24 PM |
12-14-2018
08:39 PM
Thanks Eric Wohlstadter. Do we add this as a custom spark2 defaults propoerty in the config
... View more
12-14-2018
08:24 PM
where is the setting "spark.security.credentials.hiveserver2.enabled" updated, Spark config in Ambari or Hive?
... View more
10-19-2018
03:01 PM
Hi , We are on HDP 3.0 and when I run any select queries on Hive, it works fine but if I try to insert records into the Hive table, I get the following error during the insert both
using nifi and from command line. Have researched solutions but no luck so far. I had made sure all the HDFS folders have full permission to read, write and execute. Any help would be greatly appreciated. 0: jdbc:hive2://diaihdpmaster1b-p.coc.ca:2181> Insert
into nginxlogprocessing.NGINX_ERROR_LOG . . . . . . . . . . . . . . . . . . . . . . .> SELECT
dateanderror,client, server, request, null as upstream, host, . . . . . . . . . . . . . . . . . . . . . . .> null as
referrer,environment FROM nginxlogprocessing.host_nginx_error_log_temp; INFO : Compiling
command(queryId=hive_20181018123706_cf101d3f-8d22-4b65-9020-85c415df7466):
Insert into nginxlogprocessing.NGINX_ERROR_LOG SELECT dateanderror,client, server, request, null as
upstream, host, null as referrer,environment FROM
nginxlogprocessing.host_nginx_error_log_temp INFO : Semantic Analysis Completed (retrial =
false) INFO : Returning Hive schema:
Schema(fieldSchemas:[FieldSchema(name:_col0, type:string, comment:null),
FieldSchema(name:_col1, type:string, comment:null), FieldSchema(name:_col2,
type:string, comment:null), FieldSchema(name:_col3, type:string, comment:null),
FieldSchema(name:_col4, type:string, comment:null), FieldSchema(name:_col5,
type:string, comment:null), FieldSchema(name:_col6, type:string, comment:null),
FieldSchema(name:_col7, type:string, comment:null)], properties:null) INFO : Completed compiling
command(queryId=hive_20181018123706_cf101d3f-8d22-4b65-9020-85c415df7466); Time
taken: 0.967 seconds INFO : Executing
command(queryId=hive_20181018123706_cf101d3f-8d22-4b65-9020-85c415df7466):
Insert into nginxlogprocessing.NGINX_ERROR_LOG SELECT dateanderror,client, server, request, null as
upstream, host, null as referrer,environment FROM
nginxlogprocessing.host_nginx_error_log_temp INFO : Query ID =
hive_20181018123706_cf101d3f-8d22-4b65-9020-85c415df7466 INFO : Total jobs = 3 INFO : Launching Job 1 out of 3 INFO : Starting task [Stage-1:MAPRED] in serial
mode INFO : Subscribed to counters: [] for queryId:
hive_20181018123706_cf101d3f-8d22-4b65-9020-85c415df7466 INFO : Tez session hasn't been created yet. Opening
session ERROR : Failed to execute tez graph. java.io.IOException: Previous writer likely failed to
write
hdfs://diaihdpmaster1a-p.coc.ca:8020/user/hive/.hiveJars/hive-exec-3.1.0.3.0.0.0-1634-910a4521e48e1f226fc0a0dffdb1a77bc247011285140107e7035089dc876cb6.jar.
Failing because I am unlikely to write too. at org.apache.hadoop.hive.ql.exec.tez.DagUtils.localizeResource(DagUtils.java:1286)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.exec.tez.TezSessionState.createJarLocalResource(TezSessionState.java:917)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.exec.tez.TezSessionState.makeCombinedJarMap(TezSessionState.java:349)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.exec.tez.TezSessionState.openInternal(TezSessionState.java:418)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.openInternal(TezSessionPoolSession.java:124)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.exec.tez.TezSessionState.open(TezSessionState.java:373)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.exec.tez.TezTask.ensureSessionHasResources(TezTask.java:368)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.exec.tez.TezTask.execute(TezTask.java:195)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:205)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:97)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:2668)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.Driver.execute(Driver.java:2339) ~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:2015)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.Driver.run(Driver.java:1713)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.Driver.run(Driver.java:1707)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hadoop.hive.ql.reexec.ReExecDriver.run(ReExecDriver.java:157)
~[hive-exec-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:224)
~[hive-service-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hive.service.cli.operation.SQLOperation.access$700(SQLOperation.java:87)
~[hive-service-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:316)
~[hive-service-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_112] at
javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_112] at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688)
~[hadoop-common-3.1.0.3.0.0.0-1634.jar:?] at
org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:329)
~[hive-service-3.1.0.3.0.0.0-1634.jar:3.1.0.3.0.0.0-1634] at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_112] at
java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_112] at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_112] at
java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_112] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
~[?:1.8.0_112] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
~[?:1.8.0_112] at
java.lang.Thread.run(Thread.java:745) [?:1.8.0_112] ERROR : FAILED: Execution Error, return code 1 from
org.apache.hadoop.hive.ql.exec.tez.TezTask INFO : Completed executing
command(queryId=hive_20181018123706_cf101d3f-8d22-4b65-9020-85c415df7466); Time
taken: 25.087 seconds Error: Error while processing statement: FAILED:
Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask
(state=08S01,code=1) Thanks,
... View more
Labels:
11-03-2017
07:42 PM
Hi @Greg Keys, Thanks for the post. Row filtering works based on the column values which is not in the end. But I am not sure how to filter the rows based on the last column value. Can you please let me know. Thanks
... View more
11-03-2017
04:17 PM
Thanks very much. I see now whats going on. I tried both of your suggestions and seem to work well
... View more
11-02-2017
04:11 PM
Hi, I have a spark streaming application which analysis log files and processes them. Eventually it dumps the processed results in a Hive Table (Internal). But the problem with this is that when spark loads the data, it creates small files and I have all the options in Hive configuration with regards to merging set to True. But still merging isnt happening. Please check the image of the config parameters attached. Any help will be greatly appreciated. Thanks, Chandra
... View more
Labels:
09-18-2017
09:29 PM
Hi, Does anyone have the steps to integrate Spark on HDP with the Databricks Deep Learning Pipelines. I couldnt find anywhere on there documentation. Thanks Chandra
... View more
Labels:
09-12-2017
07:24 PM
Figured out that I had to use dataset to make sure checkpointing works....
... View more
08-25-2017
09:08 PM
Hi, My program runs fine without checkpoint and when I modified the program to make it fault tolerant , I get the error as attached in the file The program runs fine when it starts fresh...but if it comes from a checkpoint it fails...not sure where I am i doing wrong. Any help will be appreciated. error.txt package ca.twitter2
import org.apache.kafka.clients._
import org.apache.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import java.util.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
object NGINXLogProcessingWindowedwithcheckpointappl {
case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method: String, serverip2: String, responsetime: String, operation: String, application: String)
val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
def creatingFunc(): StreamingContext = {
println("Creating new context")
val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
.setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(120))
ssc.checkpoint(checkpointDir)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val spark = SparkSession
.builder()
.getOrCreate()
val topics = List("REST").toSet
// Logger.getLogger("org").setLevel(Level.ERROR)
//Logger.getLogger("akka").setLevel(Level.ERROR)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "10.24.18.36:6667",
//"bootstrap.servers" -> "10.71.52.119:9092",
// "bootstrap.servers" -> "192.168.123.36:6667",
"group.id" -> "2",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val WINDOW_LENGTH = Seconds(43200)
val SLIDE_INTERVAL = Seconds(120)
// Create the direct stream with the Kafka parameters and topics
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
val lines = kafkaStream.map(_.value()).repartition(4)
val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
val lines2= lineswindowed.map(_.split(","))
val lines4slide= lines2.map(p => AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, p(7).toString, p(10), p(12)))
lines4slide.foreachRDD { rdd2 =>
if (!rdd2.isEmpty) {
val count = rdd2.count
println("count received " + count)
import org.apache.spark.sql.functions._
import spark.implicits._
val LogDF = rdd2.toDF()
LogDF.createOrReplaceTempView("Log")
val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method, ' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation, '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
LogDFslide.createOrReplaceTempView("LogDFslide")
//LogDFslide.printSchema()
//LogDFslide.show
val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application FROM LogDFslide")
Log2DFslide.createOrReplaceTempView("Log2DFslide")
val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, application FROM Log2DFslide") // Log2DFslide.show
//println("printing line3")
//Log2DFslideoutput.show
// Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
val log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, application from Log2DFslide where responsetime <>'-' and responsetime <>'' ")
log2DFFilter.createOrReplaceTempView("log2DFFilter")
//log2DFFilter.printSchema()
log2DFFilter.show
val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice, round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 <>'' group by initcap(webservice2) ")
// val Log3DFslide = log2DFFilter.select(expr("initcap(webservice2)"), expr("round(avg(responsetime),4)").as("Averageresponsetime") ).groupBy(expr("initcap(webservice2)"))
// Log3DFslide.printSchema()
Log3DFslide.createOrReplaceTempView("Log3DFslide")
Log3DFslide.show
//Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
}
}
ssc
}
def main(args: Array[String]) {
val stopActiveContext = true
if (stopActiveContext)
{
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
// Get or create a streaming context.
val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
// val ssc = StreamingContext.getOrCreate(checkpointDir, () => creatingFunc(checkpointDir))
ssc.start()
ssc.awaitTermination()
}
}
}
... View more
Labels:
08-25-2017
02:20 PM
Hi, My program runs fine without checkpoint and when I modified the program to make it fault tolerant , I get the error as attached in the file The program runs fine when it starts fresh...but if it comes from a checkpoint it fails...not sure where I am i doing wrong. Any help will be appreciated. package ca.twitter2
import org.apache.kafka.clients._
import org.apache.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import java.util.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
object NGINXLogProcessingWindowedwithcheckpointappl {
case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method: String, serverip2: String, responsetime: String, operation: String, application: String)
val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
def creatingFunc(): StreamingContext = {
println("Creating new context")
val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
.setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(120))
ssc.checkpoint(checkpointDir)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val spark = SparkSession
.builder()
.getOrCreate()
val topics = List("REST").toSet
// Logger.getLogger("org").setLevel(Level.ERROR)
//Logger.getLogger("akka").setLevel(Level.ERROR)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "10.24.18.36:6667",
//"bootstrap.servers" -> "10.71.52.119:9092",
// "bootstrap.servers" -> "192.168.123.36:6667",
"group.id" -> "2",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val WINDOW_LENGTH = Seconds(43200)
val SLIDE_INTERVAL = Seconds(120)
// Create the direct stream with the Kafka parameters and topics
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
val lines = kafkaStream.map(_.value()).repartition(4)
val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
val lines2= lineswindowed.map(_.split(","))
val lines4slide= lines2.map(p => AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, p(7).toString, p(10), p(12)))
lines4slide.foreachRDD { rdd2 =>
if (!rdd2.isEmpty) {
val count = rdd2.count
println("count received " + count)
import org.apache.spark.sql.functions._
import spark.implicits._
val LogDF = rdd2.toDF()
LogDF.createOrReplaceTempView("Log")
val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method, ' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation, '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
LogDFslide.createOrReplaceTempView("LogDFslide")
//LogDFslide.printSchema()
//LogDFslide.show
val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application FROM LogDFslide")
Log2DFslide.createOrReplaceTempView("Log2DFslide")
val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, application FROM Log2DFslide") // Log2DFslide.show
//println("printing line3")
//Log2DFslideoutput.show
// Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
val log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, application from Log2DFslide where responsetime <>'-' and responsetime <>'' ")
log2DFFilter.createOrReplaceTempView("log2DFFilter")
//log2DFFilter.printSchema()
log2DFFilter.show
val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice, round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 <>'' group by initcap(webservice2) ")
// val Log3DFslide = log2DFFilter.select(expr("initcap(webservice2)"), expr("round(avg(responsetime),4)").as("Averageresponsetime") ).groupBy(expr("initcap(webservice2)"))
// Log3DFslide.printSchema()
Log3DFslide.createOrReplaceTempView("Log3DFslide")
Log3DFslide.show
//Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
}
}
ssc
}
def main(args: Array[String]) {
val stopActiveContext = true
if (stopActiveContext)
{
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
// Get or create a streaming context.
val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
// val ssc = StreamingContext.getOrCreate(checkpointDir, () => creatingFunc(checkpointDir))
ssc.start()
ssc.awaitTermination()
}
}
}
... View more
08-02-2017
09:11 PM
Hi, We have a Hadoop on-premise cluster and are planning to integrate spark with scikit learn using the spark-sklearn package. Can you please let me know if we need to install sklearn and spark-sklearn package in all nodes or just the node where spark2-history server has been installed. We will be using yarn for resource allocation. Thanks, Chandra
... View more
Labels:
07-28-2017
03:23 PM
Thanks. If i not use Window and choose to use Streaming the data on to HDFS, could you suggest how to only store 1 week worth of data. Should i create a cron job to delete HDFS files older than a week. PLease let me know if you have any other suggestions
... View more
07-27-2017
04:56 PM
Hi, I was just wondering if it is ok to perform window operations on dstreams with 1 week as window length. Please let me know if there are any major concerns. Thanks
... View more
Labels:
07-19-2017
10:28 PM
Does that mean that Nifi has built in Apache Tika into it or should we install Apache Tika externally
... View more
06-06-2017
09:59 PM
Hi I am getting error "Queries with streaming sources must be executed with writeStream.start();" while running the code shown below. Any help will be greatly appreciated. package ca.twitter2
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
//import org.apache.spark.streaming.kafka010._
//import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
//import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
//import kafka.serializer.StringDecoder
import java.util.HashMap
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.kafka010
//import com.datastax.spark.connector._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{explode, split}
object kafkatest3 {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("kafkatest3")
.master("local[*]")
.getOrCreate()
val topics = Array("twitter")
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "siaihdf1a.coc.ca:6667")
.option("subscribe", "twitter")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
val df = ds1.selectExpr("CAST(key AS STRING)", "CAST( value AS STRING)").as[(String, String)]
ds1.printSchema()
df.createOrReplaceTempView("df");
val records = spark.sql ("SELECT count(*) from df GROUP BY key")
records.show()
val query = records.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
spark.stop()
}
}
Thanks, CHandra
... View more
Labels:
05-31-2017
10:18 PM
Hi, I am trying to integrate spark streaming with twitter and get the following error. I have tried to update the cacert with the certificates but no luck. Anyone faced similar issue. /05/31 15:58:09 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error receiving tweets: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
Relevant discussions can be found on the Internet at:
http://www.google.co.jp/search?q=1169356e or
http://www.google.co.jp/search?q=c04b39f0
TwitterException{exceptionCode=[1169356e-c04b39f0 1169356e-c04b39c6 1169356e-c04b39c6 1169356e-c04b39c6], statusCode=-1, message=null, code=-1, retryAfter=-1, rateLimitStatus=null, version=4.0.4}
17/05/31 15:58:09 INFO TwitterReceiver: Twitter receiver stopped
17/05/31 15:58:09 INFO ReceiverSupervisorImpl: Called receiver onStop
17/05/31 15:58:09 INFO ReceiverSupervisorImpl: Deregistering receiver 0
17/05/31 15:58:09 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error receiving tweets - sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
Relevant discussions can be found on the Internet at:
http://www.google.co.jp/search?q=1169356e or
http://www.google.co.jp/search?q=c04b39f0
TwitterException{exceptionCode=[1169356e-c04b39f0 1169356e-c04b39c6 1169356e-c04b39c6 1169356e-c04b39c6], statusCode=-1, message=null, code=-1, retryAfter=-1, rateLimitStatus=null, version=4.0.4}
at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:179)
at twitter4j.HttpClientBase.request(HttpClientBase.java:57)
at twitter4j.HttpClientBase.post(HttpClientBase.java:86)
at twitter4j.TwitterStreamImpl.getFilterStream(TwitterStreamImpl.java:346)
at twitter4j.TwitterStreamImpl$8.getStream(TwitterStreamImpl.java:322)
at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:552)
Caused by: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.ssl.Alerts.getSSLException(Unknown Source)
at sun.security.ssl.SSLSocketImpl.fatal(Unknown Source)
at sun.security.ssl.Handshaker.fatalSE(Unknown Source)
at sun.security.ssl.Handshaker.fatalSE(Unknown Source)
at sun.security.ssl.ClientHandshaker.serverCertificate(Unknown Source)
at sun.security.ssl.ClientHandshaker.processMessage(Unknown Source)
at sun.security.ssl.Handshaker.processLoop(Unknown Source)
at sun.security.ssl.Handshaker.process_record(Unknown Source)
at sun.security.ssl.SSLSocketImpl.readRecord(Unknown Source)
at sun.security.ssl.SSLSocketImpl.performInitialHandshake(Unknown Source)
at sun.security.ssl.SSLSocketImpl.startHandshake(Unknown Source)
at sun.security.ssl.SSLSocketImpl.startHandshake(Unknown Source)
at sun.net.www.protocol.https.HttpsClient.afterConnect(Unknown Source)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(Unknown Source)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(Unknown Source)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(Unknown Source)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(Unknown Source)
at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:137)
... 5 more package ca.twitter
//import java.io.File
import org.apache.spark.streaming.twitter.TwitterUtils
//import com.google.gson.Gson
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object twittersentiment {
def main(args: Array[String])
{
if (args.length < 4) {
System.err.println("Usage: TwitterSentimentAnalysis <consumer key> <consumer secret> " +
"<access token> <access token secret> [<filters>]")
System.exit(1)
}
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)
// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generate OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val conf = new SparkConf().setAppName("twittersentiment")
.setMaster("local")
// conf.set("es.nodes", conf.get("spark.es.nodes"))
val ssc = new StreamingContext(conf, Seconds(1))
val tweets = TwitterUtils.createStream(ssc, None, filters)
tweets.print()
//tweets.foreachRDD{(rdd, time) =>
// rdd.map(t => {
// Map(
// "user"-> t.getUser.getScreenName,
// "created_at" -> t.getCreatedAt.toInstant.toString,
// "location" -> Option(t.getGeoLocation).map(geo => { s"${geo.getLatitude},${geo.getLongitude}" }),
// "text" -> t.getText,
// "hashtags" -> t.getHashtagEntities.map(_.getText),
// "retweet" -> t.getRetweetCount,
// "language" -> detectLanguage(t.getText),
// "sentiment" -> detectSentiment(t.getText).toString
// )
// }).saveToEs("twitter/tweet")
ssc.start()
ssc.awaitTermination()
}
}
Any help will be greatly appreciated. Thanks, Chandra
... View more
Labels:
11-22-2016
09:48 PM
Yes I was able to view the messages and the topic from the console/command line in the server
... View more
11-22-2016
08:34 PM
I am running the kafka in the Sandbox VM and using the spark install on my windows machine.
... View more
11-22-2016
08:10 PM
Thanks But I already had that in my code...
... View more
11-22-2016
07:42 PM
Thanks. I tried this and got the following [root@sandbox bin]# ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "192.168.30.130:6667" --topic "REST" --time -1
{metadata.broker.list=192.168.30.130:6667, request.timeout.ms=1000, client.id=GetOffsetShell, security.protocol=PLAINTEXT} REST:0:446 How do i alter my code to consume from there...sorry I am newbie in this...any guidance will be really helpful Thanks
... View more
11-22-2016
06:27 PM
Hi, I am trying to run the attached program in scala ide which grabs the messages from the Kafka topic but get the following spark exception. I am not sure how to proceed. Any help will be greatly appreciated. I am able to retrieve the messages from the command line but the program doesnt seem to run. 16/11/22 11:20:56 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([REST,0])
at org.apache.spark.streaming.kafka.KafkaCluster$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at ca.SparkCassandra.Kafka$.main(Kafka.scala:32)
at ca.SparkCassandra.Kafka.main(Kafka.scala)
16/11/22 11:20:57 INFO SparkContext: Invoking stop() from shutdown hook
16/11/22 11:20:57 INFO SparkUI: Stopped Spark web UI at http://10.63.48.172:4040
16/11/22 11:20:57 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! package ca.SparkCassandra
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
object Kafka {
def main(args: Array[String]) {
val ssc = new StreamingContext("local[*]","Kafka", Seconds(5))
val WINDOW_LENGTH = new Duration( 1000)
val SLIDE_INTERVAL = new Duration(900)
val kafkaParams = Map("metadata.broker.list" -> "192.168.30.130:6667", "auto.offset.reset" -> "smallest" )
val topics = List("REST").toSet
//val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Create the direct stream with the Kafka parameters and topics
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
kafkaStream.print()
ssc.checkpoint("c:/checkpoint/")
ssc.start()
ssc.awaitTermination()
}
}
... View more
Labels:
10-27-2016
10:42 PM
Hi, I have downloaded the VMware version of HDP 2.5 sandbox and successfully started and able to access Ambari etc. But when i logged into the server using ssh as root, I am unable to see any of the HDP directories under /usr/ Not sure if i am missing something. I am unable to change admin password of ambari using the instrcutions provided....can someone shed some light on that. I have used HDP 2.4 sandbox before and quite familiar with the folder structure and the variuos components. Thanks,
... View more
10-06-2016
09:03 PM
Thanks cduby. so do you mean to say that when we need to send real time alerts always use kafka written back from spark streaming rather than sending directly using spark streaming?
... View more
10-06-2016
08:25 PM
1 Kudo
I was just wondering if spark streaming can send back enriched events to Kafka or can it send jms alerts directly....I may have a scenario of the users getting alert for any specific anamoly...
... View more
Labels:
10-04-2016
07:50 PM
1 Kudo
Hi, I am in the process of doing a protype of IOT in our organisation and in the process of charting out the architecture. It would be really appreciated if someone could help me choose the stream processor - storm or spark streaming. Not sure which one I should go about. Basically we are planning to record sensor events from fleet and we are ok with ocassional message loss. Also we prefer something which is easy to implement. Not sure which one is easier to implement as well. We are also planning to utilize the lambda architecture..one for batch and the other one for real time information. Thanks
... View more
Labels:
09-29-2016
07:34 PM
Hi, We are planning to do a prototype of Hadoop Log Analysis and not sure which Data ingestion tool should we select - Nifi or Flume. Can anyone suggest me which one we select and why - pros and cons Thanks, Chandra
... View more
Labels:
09-02-2016
09:19 PM
Also what is the need to run Hive queries on SparkSql when Hive on Tez can run much faster....
... View more