Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to use saveAsTextFiles in spark streaming

avatar

Hi,

It is simple to display the result in RDD, for example:

val sc = new SparkContext(conf)
val textFile = sc.textFile("/root/file/test")
val apps = textFile.map (line => line.split(";")(0))
    .map(p=>(p,1)) // convert to countable tuples
    .reduceByKey(_+_) // count keys
    .collect() // collect the result
    apps.foreach(println)

And I have the result in my console.And if I want to save the output to a file I do:

apps.saveAsTextFiles("/root/file/file1")

But how I can do it now with DStream,this is my code:

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
  val file = ssc.textFileStream("/root/file/test")
  var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
  test.saveAsTextFiles("/root/file/file1")
    sc.stop()  
  }
}

But it doesn't work.

Any help please !!

2 ACCEPTED SOLUTIONS

avatar
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar
Rising Star
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login
12 REPLIES 12

avatar
16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z
	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606)
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
	at com.org.file.filecount.FileCount$.main(FileCount.scala:52)
	at com.org.file.filecount.FileCount.main(FileCount.scala)

there's a mismatch in the versions of dependencies and runtime so i do :

            <dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.10</artifactId>
		<version>1.6.1</version>
	    </dependency>
	    <dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-streaming_2.10</artifactId>
		<version>1.6.1</version>
	    </dependency>

And i am getting error like as the following :

16/04/07 11:23:56 WARN FileInputDStream: Error finding new files
java.io.IOException: Incomplete HDFS URI, no host: "/root/file/test"

 

avatar
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar
Rising Star
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login