Reply
Explorer
Posts: 33
Registered: ‎03-08-2016

Re: How to use saveAsTextFiles in spark streaming

16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z
	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606)
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
	at com.org.file.filecount.FileCount$.main(FileCount.scala:52)
	at com.org.file.filecount.FileCount.main(FileCount.scala)

there's a mismatch in the versions of dependencies and runtime so i do :

            <dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.10</artifactId>
		<version>1.6.1</version>
	    </dependency>
	    <dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-streaming_2.10</artifactId>
		<version>1.6.1</version>
	    </dependency>

And i am getting error like as the following :

16/04/07 11:23:56 WARN FileInputDStream: Error finding new files
java.io.IOException: Incomplete HDFS URI, no host: "/root/file/test"

 

Explorer
Posts: 33
Registered: ‎03-08-2016

Re: How to use saveAsTextFiles in spark streaming

I don't know why but I re-run and it works, but I have an empty _success file into the directory file1.

here is the complete code :

    def main(args: Array[String]) {
    val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local[2]")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))
    val file = ssc.textFileStream("/root/file/test/file")
   file.foreachRDD(t=> {
         val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
         test.saveAsTextFile("/root/file/file1")

})
  ssc.start()
  ssc.awaitTermination()
  }
Cloudera Employee
Posts: 35
Registered: ‎04-05-2016

Re: How to use saveAsTextFiles in spark streaming

Thats because you have no new files arriving in the directory after streaming application starts.

 

You can try "cp" to drop files in the directory after starting the streaming application.