Support Questions
Find answers, ask questions, and share your expertise
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Repetitive treatment on spark streaming


Repetitive treatment on spark streaming


I'm new to Spark, I want to make a treatment on files in streaming.

I have files csv which arrive non-stop:

Example csv file:

world world
count world
world earth
count world

and I want to do two treatment on them :
the first treatment is for a result like this :

(world,2,2) // word is twice repeated for the first column and distinct (world,earth) for second therefore (2,2) 
(count,2,1) // word is twice repeated for the first column and not distinct (world,world) for second therefore (2,1)

the second result
I want to get that result after each our example:

(world,1) // 1=2/2
(count,2) //2=2/1

this is my code :

val conf = new SparkConf()
    .setAppName("File Count")
val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(10m))
  val file = ssc.textFileStream("hdfs://")
    var result = => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
  val window = result.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(60), Seconds(20))
  val result1 = => x.toString )
  val result2 = => line.split(";")(0)+","+line.split(",")(1))
  val result3 = => line.substring(1, line.length-1))
  val result4 = => (line.split(",")(0),line.split(",")(1).toInt ) )
  val result5 = result4.reduceByKey((x,y) => x+y )
  val result6 = => (line.split(",")(0), 1 ))
  val result7 = result6.reduceByKey((x,y) => x+y )
  val result8 = result7.join(result5) // (world,2,2)
  val finalResult = result8.mapValues(x => x._1.toFloat /  x._2  ) // (world,1), I want this result after every one hour

Thanks in Advance!!!

Don't have an account?
Coming from Hortonworks? Activate your account here