Created on 04-05-2016 08:26 AM - edited 09-16-2022 03:12 AM
Hi,
It is simple to display the result in RDD, for example:
val sc = new SparkContext(conf) val textFile = sc.textFile("/root/file/test") val apps = textFile.map (line => line.split(";")(0)) .map(p=>(p,1)) // convert to countable tuples .reduceByKey(_+_) // count keys .collect() // collect the result apps.foreach(println)
And I have the result in my console.And if I want to save the output to a file I do:
apps.saveAsTextFiles("/root/file/file1")
But how I can do it now with DStream,this is my code:
val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10)) val file = ssc.textFileStream("/root/file/test") var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y) test.saveAsTextFiles("/root/file/file1") sc.stop() } }
But it doesn't work.
Any help please !!
Created 04-07-2016 04:13 AM
I don't know why but I re-run and it works, but I have an empty _success file into the directory file1.
here is the complete code :
def main(args: Array[String]) { val conf = new SparkConf() .setAppName("File Count") .setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val file = ssc.textFileStream("/root/file/test/file") file.foreachRDD(t=> { val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y) test.saveAsTextFile("/root/file/file1") }) ssc.start() ssc.awaitTermination() }
Created 04-07-2016 04:26 AM
Thats because you have no new files arriving in the directory after streaming application starts.
You can try "cp" to drop files in the directory after starting the streaming application.
Created 04-07-2016 03:34 AM
16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at com.org.file.filecount.FileCount$.main(FileCount.scala:52) at com.org.file.filecount.FileCount.main(FileCount.scala)
there's a mismatch in the versions of dependencies and runtime so i do :
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.1</version> </dependency>
And i am getting error like as the following :
16/04/07 11:23:56 WARN FileInputDStream: Error finding new files java.io.IOException: Incomplete HDFS URI, no host: "/root/file/test"
Created 04-07-2016 04:13 AM
I don't know why but I re-run and it works, but I have an empty _success file into the directory file1.
here is the complete code :
def main(args: Array[String]) { val conf = new SparkConf() .setAppName("File Count") .setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val file = ssc.textFileStream("/root/file/test/file") file.foreachRDD(t=> { val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y) test.saveAsTextFile("/root/file/file1") }) ssc.start() ssc.awaitTermination() }
Created 04-07-2016 04:26 AM
Thats because you have no new files arriving in the directory after streaming application starts.
You can try "cp" to drop files in the directory after starting the streaming application.