Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Now Live: Explore expert insights and technical deep dives on the new Cloudera Community BlogsRead the Announcement
Labels (1)
avatar
Super Guru

These are the steps to build and run spark streaming application, it was built and tested on HDP-2.5

setup:

ENV: HDP2.5

scala : 2.10.4

sbt: 0.13.11

mkdir spark-streaming-example 

cd spark-streaming-example/ 

mkdir -p src/main/scala 

cd src/main/scala

sample code:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.Time;

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel



object SqlNetworkWordCount {
        def main(args: Array[String]) {

                val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
                val ssc = new StreamingContext(sparkConf, Seconds(2))

                val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
                val words = lines.flatMap(_.split(" "))

                // Convert RDDs of the words DStream to DataFrame and run SQL query
                words.foreachRDD((rdd: RDD[String], time: Time) => {

                        val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
                        import sqlContext.implicits._
                        val wordsDataFrame = rdd.map(w => Record(w)).toDF()
                        wordsDataFrame.write.mode(SaveMode.Append).parquet("/tmp/parquet");
                        })

                ssc.start()
                ssc.awaitTermination()
        }
}


case class Record(word: String)


object SQLContextSingleton {

        @transient  private var instance: SQLContext = _

        def getInstance(sparkContext: SparkContext): SQLContext = {
                if (instance == null) {
                        instance = new SQLContext(sparkContext)
                }
                instance
        }
}

cd -

vim build.sbt

name := "Spark Streaming Example"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.4.1","org.apache.spark" %% "spark-streaming" % "1.4.1")

*Now run sbt package from project home and it will build a jar inside target/scala-2.10/spark-streaming-example_2.10-1.0.jar *Run this jar using spark-submit #bin/spark-submit --class TestStreaming target/scala-2.10/spark-streaming-example_2.10-1.0.jar hostname 6002

to test this program open a different terminal and run nc -lk `hostname` 6002 hit enter and type anything on console while will display on the spark console.

10,445 Views
Version history
Last update:
‎12-20-2016 01:59 PM
Updated by:
Contributors