Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Repetitive treatment on spark streaming

Highlighted

Repetitive treatment on spark streaming

Contributor

I'm new to Spark, I want to make a treatment on files in streaming.

I have files csv which arrive non-stop:

Example csv file:

world world
count world
world earth
count world

and I want to do two treatment on them :
the first treatment is for a result like this :

(world,2,2) // word is twice repeated for the first column and distinct (world,earth) for second therefore (2,2) 
(count,2,1) // word is twice repeated for the first column and not distinct (world,world) for second therefore (2,1)

the second result
I want to get that result after each hour.in our example:

(world,1) // 1=2/2
(count,2) //2=2/1

this is my code :

val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local[2]")    
val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(10m))
  val file = ssc.textFileStream("hdfs://192.168.1.31:8020/user/sparkStreaming/input")
    var result = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
  val window = result.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(60), Seconds(20))
  val result1 = window.map(x => x.toString )
  val result2 = result1.map(line => line.split(";")(0)+","+line.split(",")(1))
  val result3 = result2.map(line => line.substring(1, line.length-1))
  val result4 = result3.map(line => (line.split(",")(0),line.split(",")(1).toInt ) )
  val result5 = result4.reduceByKey((x,y) => x+y )
  val result6 = result3.map(line => (line.split(",")(0), 1 ))
  val result7 = result6.reduceByKey((x,y) => x+y )
  val result8 = result7.join(result5) // (world,2,2)
  val finalResult = result8.mapValues(x => x._1.toFloat /  x._2  ) // (world,1), I want this result after every one hour
  ssc.start()
  ssc.awaitTermination()

Thanks in Advance!!!