Reply
Explorer
Posts: 33
Registered: ‎03-08-2016

Repetitive treatment on spark streaming

[ Edited ]

I'm new to Spark, I want to make a treatment on files in streaming.

I have files csv which arrive non-stop:

Example csv file:

world world
count world
world earth
count world

and I want to do two treatment on them :
the first treatment is for a result like this :

(world,2,2) // word is twice repeated for the first column and distinct (world,earth) for second therefore (2,2) 
(count,2,1) // word is twice repeated for the first column and not distinct (world,world) for second therefore (2,1)

the second result
I want to get that result after each hour.in our example:

(world,1) // 1=2/2
(count,2) //2=2/1

this is my code :

val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local[2]")    
val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(10m))
  val file = ssc.textFileStream("hdfs://192.168.1.31:8020/user/sparkStreaming/input")
    var result = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
  val window = result.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(60), Seconds(20))
  val result1 = window.map(x => x.toString )
  val result2 = result1.map(line => line.split(";")(0)+","+line.split(",")(1))
  val result3 = result2.map(line => line.substring(1, line.length-1))
  val result4 = result3.map(line => (line.split(",")(0),line.split(",")(1).toInt ) )
  val result5 = result4.reduceByKey((x,y) => x+y )
  val result6 = result3.map(line => (line.split(",")(0), 1 ))
  val result7 = result6.reduceByKey((x,y) => x+y )
  val result8 = result7.join(result5) // (world,2,2)
  val finalResult = result8.mapValues(x => x._1.toFloat /  x._2  ) // (world,1), I want this result after every one hour
  ssc.start()
  ssc.awaitTermination()

Thanks in Advance!!!