Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Super Guru

These are the steps to build and run spark streaming application, it was built and tested on HDP-2.5

setup:

ENV: HDP2.5

scala : 2.10.4

sbt: 0.13.11

mkdir spark-streaming-example 

cd spark-streaming-example/ 

mkdir -p src/main/scala 

cd src/main/scala

sample code:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.Time;

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel



object SqlNetworkWordCount {
        def main(args: Array[String]) {

                val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
                val ssc = new StreamingContext(sparkConf, Seconds(2))

                val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
                val words = lines.flatMap(_.split(" "))

                // Convert RDDs of the words DStream to DataFrame and run SQL query
                words.foreachRDD((rdd: RDD[String], time: Time) => {

                        val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
                        import sqlContext.implicits._
                        val wordsDataFrame = rdd.map(w => Record(w)).toDF()
                        wordsDataFrame.write.mode(SaveMode.Append).parquet("/tmp/parquet");
                        })

                ssc.start()
                ssc.awaitTermination()
        }
}


case class Record(word: String)


object SQLContextSingleton {

        @transient  private var instance: SQLContext = _

        def getInstance(sparkContext: SparkContext): SQLContext = {
                if (instance == null) {
                        instance = new SQLContext(sparkContext)
                }
                instance
        }
}

cd -

vim build.sbt

name := "Spark Streaming Example"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.4.1","org.apache.spark" %% "spark-streaming" % "1.4.1")

*Now run sbt package from project home and it will build a jar inside target/scala-2.10/spark-streaming-example_2.10-1.0.jar *Run this jar using spark-submit #bin/spark-submit --class TestStreaming target/scala-2.10/spark-streaming-example_2.10-1.0.jar hostname 6002

to test this program open a different terminal and run nc -lk `hostname` 6002 hit enter and type anything on console while will display on the spark console.

10,116 Views