- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 12-20-2016 01:59 PM
These are the steps to build and run spark streaming application, it was built and tested on HDP-2.5
setup:
ENV: HDP2.5
scala : 2.10.4
sbt: 0.13.11
mkdir spark-streaming-example cd spark-streaming-example/ mkdir -p src/main/scala cd src/main/scala
sample code:
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SQLContext, SaveMode} import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.Time; import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Time, Seconds, StreamingContext} import org.apache.spark.util.IntParam import org.apache.spark.sql.SQLContext import org.apache.spark.storage.StorageLevel object SqlNetworkWordCount { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) // Convert RDDs of the words DStream to DataFrame and run SQL query words.foreachRDD((rdd: RDD[String], time: Time) => { val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ val wordsDataFrame = rdd.map(w => Record(w)).toDF() wordsDataFrame.write.mode(SaveMode.Append).parquet("/tmp/parquet"); }) ssc.start() ssc.awaitTermination() } } case class Record(word: String) object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } }
cd -
vim build.sbt
name := "Spark Streaming Example"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.4.1","org.apache.spark" %% "spark-streaming" % "1.4.1")
*Now run sbt package from project home and it will build a jar inside target/scala-2.10/spark-streaming-example_2.10-1.0.jar *Run this jar using spark-submit #bin/spark-submit --class TestStreaming target/scala-2.10/spark-streaming-example_2.10-1.0.jar hostname 6002
to test this program open a different terminal and run nc -lk `hostname` 6002 hit enter and type anything on console while will display on the spark console.