Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (1)

These are the steps to build and run spark streaming application, it was built and tested on HDP-2.5

setup:

ENV: HDP2.5

scala : 2.10.4

sbt: 0.13.11

mkdir spark-streaming-example 

cd spark-streaming-example/ 

mkdir -p src/main/scala 

cd src/main/scala

sample code:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.Time;

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel



object SqlNetworkWordCount {
        def main(args: Array[String]) {

                val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
                val ssc = new StreamingContext(sparkConf, Seconds(2))

                val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
                val words = lines.flatMap(_.split(" "))

                // Convert RDDs of the words DStream to DataFrame and run SQL query
                words.foreachRDD((rdd: RDD[String], time: Time) => {

                        val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
                        import sqlContext.implicits._
                        val wordsDataFrame = rdd.map(w => Record(w)).toDF()
                        wordsDataFrame.write.mode(SaveMode.Append).parquet("/tmp/parquet");
                        })

                ssc.start()
                ssc.awaitTermination()
        }
}


case class Record(word: String)


object SQLContextSingleton {

        @transient  private var instance: SQLContext = _

        def getInstance(sparkContext: SparkContext): SQLContext = {
                if (instance == null) {
                        instance = new SQLContext(sparkContext)
                }
                instance
        }
}

cd -

vim build.sbt

name := "Spark Streaming Example"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.4.1","org.apache.spark" %% "spark-streaming" % "1.4.1")

*Now run sbt package from project home and it will build a jar inside target/scala-2.10/spark-streaming-example_2.10-1.0.jar *Run this jar using spark-submit #bin/spark-submit --class TestStreaming target/scala-2.10/spark-streaming-example_2.10-1.0.jar hostname 6002

to test this program open a different terminal and run nc -lk `hostname` 6002 hit enter and type anything on console while will display on the spark console.

8,671 Views
Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
1 of 1
Last update:
‎12-20-2016 01:59 PM
Updated by:
 
Contributors
Top Kudoed Authors