Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to use saveAsTextFiles in spark streaming

avatar

Hi,

It is simple to display the result in RDD, for example:

val sc = new SparkContext(conf)
val textFile = sc.textFile("/root/file/test")
val apps = textFile.map (line => line.split(";")(0))
    .map(p=>(p,1)) // convert to countable tuples
    .reduceByKey(_+_) // count keys
    .collect() // collect the result
    apps.foreach(println)

And I have the result in my console.And if I want to save the output to a file I do:

apps.saveAsTextFiles("/root/file/file1")

But how I can do it now with DStream,this is my code:

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
  val file = ssc.textFileStream("/root/file/test")
  var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
  test.saveAsTextFiles("/root/file/file1")
    sc.stop()  
  }
}

But it doesn't work.

Any help please !!

2 ACCEPTED SOLUTIONS

avatar

I don't know why but I re-run and it works, but I have an empty _success file into the directory file1.

here is the complete code :

    def main(args: Array[String]) {
    val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local[2]")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))
    val file = ssc.textFileStream("/root/file/test/file")
   file.foreachRDD(t=> {
         val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
         test.saveAsTextFile("/root/file/file1")

})
  ssc.start()
  ssc.awaitTermination()
  }

View solution in original post

avatar
Rising Star

Thats because you have no new files arriving in the directory after streaming application starts.

 

You can try "cp" to drop files in the directory after starting the streaming application.

View solution in original post

12 REPLIES 12

avatar
16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z
	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606)
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
	at com.org.file.filecount.FileCount$.main(FileCount.scala:52)
	at com.org.file.filecount.FileCount.main(FileCount.scala)

there's a mismatch in the versions of dependencies and runtime so i do :

            <dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.10</artifactId>
		<version>1.6.1</version>
	    </dependency>
	    <dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-streaming_2.10</artifactId>
		<version>1.6.1</version>
	    </dependency>

And i am getting error like as the following :

16/04/07 11:23:56 WARN FileInputDStream: Error finding new files
java.io.IOException: Incomplete HDFS URI, no host: "/root/file/test"

 

avatar

I don't know why but I re-run and it works, but I have an empty _success file into the directory file1.

here is the complete code :

    def main(args: Array[String]) {
    val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local[2]")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))
    val file = ssc.textFileStream("/root/file/test/file")
   file.foreachRDD(t=> {
         val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
         test.saveAsTextFile("/root/file/file1")

})
  ssc.start()
  ssc.awaitTermination()
  }

avatar
Rising Star

Thats because you have no new files arriving in the directory after streaming application starts.

 

You can try "cp" to drop files in the directory after starting the streaming application.