All the way, I have been reading that RDD are immutable but to my surprise today I found different result. I would like to know the reason and supporting documentation if possible.
scala> val m = Array.fill(2, 2)(5) m: Array[Array[Int]] = Array(Array(5, 5), Array(5, 5)) scala> val rdd = sc.parallelize(m) scala> rdd.collect() res6: Array[Array[Int]] = Array(Array(5, 5), Array(5, 5)) // Interesting here. scala> m(0)(1) = 99 scala> rdd.collect() res8: Array[Array[Int]] = Array(Array(5, 99), Array(5, 5))
In a distributed environment, RDD is spread over across many nodes. When you call some operation, you tell each node what to do with the piece of the RDD that it has. If you refer to any local variables (like
myMap), they get serialized and sent to the machines, so they can use it. But nothing comes back. So your original copy of
myMapis unaffected. RDD are not just immutable but deterministic functions of their inputs and RDD's parts can be recreated at any time. This helps in taking advantage of caching. RDD isn't really a collection of data, but just a recipe for making data from other data.
@Vinod Bonthu Thanks Vinod I understand this. However I tried with other dataset, here I am getting the results as per I wanted. According to above explanation, result should be 55,55!!
But for the example in the question results are not as per the expectation. Could you pl. explain why?
val nums = Array(99,99) val rdd = sc.parallelize(nums) rdd.collect // output is : 99,99 //Now doing similar changes as in the main question. nums(0) = 55 nums(1) = 55 rdd.collect // output is 99,99
The UC Berkeley white paper on RDD's spells it out this way:
Although individual RDDs are immutable, it is possible to implement mutable state by having multiple RDDs to represent multiple versions of a dataset. We made RDDs immutable to make it easier to describe lineage graphs, but it would have been equivalent to have our abstraction be versioned datasets and track versions in lineage graphs.
If you notice the output above, your collect() output is initially res6, and second time is res8. This, presumably, is a new version of the initial RDD and thus gets a different reference name.
It seems like a bug - results are different on pyspark + behavior would probably be different if rdd would be paritioned ot remote machines, or even if you repartition your rdd ot have multiple paritioned.
I am new to spark. so please correct me if im wrong...
In above example "nums" is a Array variable and "rdd" is a RDD. Since spark does lazy evaluation, it just creates lineage. when a action performs, it reads from source.
So we are making changes on Array variable not on RDD.
when we try to update/change RDD, it throws error.
scala> rdd=sc.parallelize(Array(1,2)) <console>:28: error: reassignment to val rdd=sc.parallelize(Array(1,2)) ^