Created 04-07-2016 03:34 AM
16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at com.org.file.filecount.FileCount$.main(FileCount.scala:52) at com.org.file.filecount.FileCount.main(FileCount.scala)
there's a mismatch in the versions of dependencies and runtime so i do :
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.1</version> </dependency>
And i am getting error like as the following :
16/04/07 11:23:56 WARN FileInputDStream: Error finding new files java.io.IOException: Incomplete HDFS URI, no host: "/root/file/test"
Created 04-07-2016 04:13 AM
I don't know why but I re-run and it works, but I have an empty _success file into the directory file1.
here is the complete code :
def main(args: Array[String]) { val conf = new SparkConf() .setAppName("File Count") .setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val file = ssc.textFileStream("/root/file/test/file") file.foreachRDD(t=> { val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y) test.saveAsTextFile("/root/file/file1") }) ssc.start() ssc.awaitTermination() }
Created 04-07-2016 04:26 AM
Thats because you have no new files arriving in the directory after streaming application starts.
You can try "cp" to drop files in the directory after starting the streaming application.