I'm new to Spark, I want to make a treatment on files in streaming.
I have files csv which arrive non-stop:
Example csv file:
world world count world world earth count world
and I want to do two treatment on them :
the first treatment is for a result like this :
(world,2,2) // word is twice repeated for the first column and distinct (world,earth) for second therefore (2,2) (count,2,1) // word is twice repeated for the first column and not distinct (world,world) for second therefore (2,1)
the second result
I want to get that result after each hour.in our example:
(world,1) // 1=2/2 (count,2) //2=2/1
this is my code :
val conf = new SparkConf() .setAppName("File Count") .setMaster("local") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10m)) val file = ssc.textFileStream("hdfs://192.168.1.31:8020/user/sparkStreaming/input") var result = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y) val window = result.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(60), Seconds(20)) val result1 = window.map(x => x.toString ) val result2 = result1.map(line => line.split(";")(0)+","+line.split(",")(1)) val result3 = result2.map(line => line.substring(1, line.length-1)) val result4 = result3.map(line => (line.split(",")(0),line.split(",")(1).toInt ) ) val result5 = result4.reduceByKey((x,y) => x+y ) val result6 = result3.map(line => (line.split(",")(0), 1 )) val result7 = result6.reduceByKey((x,y) => x+y ) val result8 = result7.join(result5) // (world,2,2) val finalResult = result8.mapValues(x => x._1.toFloat / x._2 ) // (world,1), I want this result after every one hour ssc.start() ssc.awaitTermination()
Thanks in Advance!!!