Created on 04-25-2016 04:53 AM - edited 09-16-2022 03:15 AM
I want to make a union for which RDD who I have in streaming
this is my code :
val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10)) val file = ssc.textFileStream("hdfs://192.168.1.20:8020/user/sparkStreaming/input") var test = file.map(x => (x.split(";")(0)+";"+x.split(";")(1), 1)).reduceByKey((x,y) => x+y) var addedRDD = sc.emptyRDD test.foreachRDD{ rdd => addedRDD = addedRDD union rdd addedRDD.cache() }
but I have this error :
type mismatch; found : org.apache.spark.rdd.RDD[(String, Int)] required: org.apache.spark.rdd.RDD[Nothing]
And when I try to create an empty RDD with a given type, I have this error :
type mismatch; found : org.apache.spark.rdd.RDD[(String, Int)] required: org.apache.spark.rdd.EmptyRDD[(String, Int)]
How can I fix this problem?
thanks in advance !!!
Created 04-25-2016 05:45 AM
I have found the solution :
var addedRDD : org.apache.spark.rdd.RDD[(String,Int)] = sc.emptyRDD
Created 04-25-2016 05:02 AM
Try "sc.emptyRDD[(String,Int)]"; currently the type of the RDD is not inferrable and so isn't obviously a match for the ones that it's unioned to later.
But, you really want to use SparkContext.union here to union many RDDs. Make a Seq of them and then call once to union them.
Created 04-25-2016 05:22 AM
Created 04-25-2016 05:45 AM
I have found the solution :
var addedRDD : org.apache.spark.rdd.RDD[(String,Int)] = sc.emptyRDD