Reply
Explorer
Posts: 33
Registered: ‎03-08-2016

saveastextfile replace the last result

[ Edited ]

Hi,

I have a simple code spark streaming do some processing on csv files.

   val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local[2]")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val file = ssc.textFileStream("hdfs://192.168.1.31:8020/user/sparkStreaming/input")   
    val test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
    val windowed = test.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(60), Seconds(20))
    windowed.foreachRDD(rdd=>{
      rdd.saveAsTextFile("hdfs://192.168.1.31:8020/user/sparkStreaming/output")
    })
  ssc.start()
  ssc.awaitTermination()
  }
}

in directory output I have only the last file processing result.

I want to keep result of all the file not only the last one.