Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

union rdd with emptyrdd

avatar

I want to make a union for which RDD who I have in streaming

this is my code :

val sc = new SparkContext(conf)   
val ssc = new StreamingContext(sc, Seconds(10))
val file = ssc.textFileStream("hdfs://192.168.1.20:8020/user/sparkStreaming/input")
  var test = file.map(x => (x.split(";")(0)+";"+x.split(";")(1), 1)).reduceByKey((x,y) => x+y)
  
  var addedRDD = sc.emptyRDD
  test.foreachRDD{ rdd =>
  addedRDD = addedRDD union rdd 
  addedRDD.cache()
}

but I have this error :

type mismatch; found : org.apache.spark.rdd.RDD[(String, Int)] required: org.apache.spark.rdd.RDD[Nothing]

And when I try to create an empty RDD with a given type, I have this error :

type mismatch; found : org.apache.spark.rdd.RDD[(String, Int)] required: org.apache.spark.rdd.EmptyRDD[(String, Int)]

How can I fix this problem?

thanks in advance !!!

1 ACCEPTED SOLUTION

avatar

I have found the solution :

var addedRDD : org.apache.spark.rdd.RDD[(String,Int)] = sc.emptyRDD

View solution in original post

3 REPLIES 3

avatar
Master Collaborator

Try "sc.emptyRDD[(String,Int)]"; currently the type of the RDD is not inferrable and so isn't obviously a match for the ones that it's unioned to later.

 

But, you really want to use SparkContext.union here to union many RDDs. Make a Seq of them and then call once to union them.

avatar
some code that I understand please

avatar

I have found the solution :

var addedRDD : org.apache.spark.rdd.RDD[(String,Int)] = sc.emptyRDD