Created on 04-05-2016 08:26 AM - edited 09-16-2022 03:12 AM
Hi,
It is simple to display the result in RDD, for example:
val sc = new SparkContext(conf) val textFile = sc.textFile("/root/file/test") val apps = textFile.map (line => line.split(";")(0)) .map(p=>(p,1)) // convert to countable tuples .reduceByKey(_+_) // count keys .collect() // collect the result apps.foreach(println)
And I have the result in my console.And if I want to save the output to a file I do:
apps.saveAsTextFiles("/root/file/file1")
But how I can do it now with DStream,this is my code:
val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10)) val file = ssc.textFileStream("/root/file/test") var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y) test.saveAsTextFiles("/root/file/file1") sc.stop() } }
But it doesn't work.
Any help please !!
Created 04-07-2016 04:13 AM
Want to get a detailed solution you have to login/registered on the community
Register/LoginCreated 04-07-2016 04:26 AM
Want to get a detailed solution you have to login/registered on the community
Register/LoginCreated 04-07-2016 03:34 AM
16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at com.org.file.filecount.FileCount$.main(FileCount.scala:52) at com.org.file.filecount.FileCount.main(FileCount.scala)
there's a mismatch in the versions of dependencies and runtime so i do :
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.1</version> </dependency>
And i am getting error like as the following :
16/04/07 11:23:56 WARN FileInputDStream: Error finding new files java.io.IOException: Incomplete HDFS URI, no host: "/root/file/test"
Created 04-07-2016 04:13 AM
Want to get a detailed solution you have to login/registered on the community
Register/LoginCreated 04-07-2016 04:26 AM
Want to get a detailed solution you have to login/registered on the community
Register/Login