Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

union rdd with emptyrdd

avatar

I want to make a union for which RDD who I have in streaming

this is my code :

val sc = new SparkContext(conf)   
val ssc = new StreamingContext(sc, Seconds(10))
val file = ssc.textFileStream("hdfs://192.168.1.20:8020/user/sparkStreaming/input")
  var test = file.map(x => (x.split(";")(0)+";"+x.split(";")(1), 1)).reduceByKey((x,y) => x+y)
  
  var addedRDD = sc.emptyRDD
  test.foreachRDD{ rdd =>
  addedRDD = addedRDD union rdd 
  addedRDD.cache()
}

but I have this error :

type mismatch; found : org.apache.spark.rdd.RDD[(String, Int)] required: org.apache.spark.rdd.RDD[Nothing]

And when I try to create an empty RDD with a given type, I have this error :

type mismatch; found : org.apache.spark.rdd.RDD[(String, Int)] required: org.apache.spark.rdd.EmptyRDD[(String, Int)]

How can I fix this problem?

thanks in advance !!!

1 ACCEPTED SOLUTION

avatar

I have found the solution :

var addedRDD : org.apache.spark.rdd.RDD[(String,Int)] = sc.emptyRDD

View solution in original post

3 REPLIES 3

avatar
Master Collaborator

Try "sc.emptyRDD[(String,Int)]"; currently the type of the RDD is not inferrable and so isn't obviously a match for the ones that it's unioned to later.

 

But, you really want to use SparkContext.union here to union many RDDs. Make a Seq of them and then call once to union them.

avatar
some code that I understand please

avatar

I have found the solution :

var addedRDD : org.apache.spark.rdd.RDD[(String,Int)] = sc.emptyRDD