Created on 06-10-2016 10:05 AM - edited 09-16-2022 03:24 AM
Is there a reliable way to count things when using Spark? In Hadoop jobs, counters are never "counted" if the job fails partway through, or if speculative execution causes two process to perform the same operation, only the first complete task's counters are aggregated. My undersanding is that Spark's accumulators are not the same, and that both stage failures and speculative execution can lead to double-counting, meaning my counters are not actually reliable. Surely there's a way to get around this?
Created 06-10-2016 11:18 AM
Hi storysj,
Accumulator updates performed within an action will only be counted once while accumulator updates within transforms may be double counted if a retry occurs.
IE this code would gaurantee counts only applied once:
val accum = sc.accumulator(0, "test") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error") } println(accum.value)
While this could be double counted:
val accum = sc.accumulator(0, "test") sc.parallelize(Array(1, 2, 3, 4)).map(x => accum += x if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error") }.collect() println(accum.value)
Hope this helps,
Jason Hubbard
Created 06-13-2016 03:01 PM
Does spark-streaming also have the double-counting problem for transformations on DStreams? I assume that under-the-hood they operate the same way, but I figured I'd better double-check.
Created 06-13-2016 03:46 PM
Yes it would be. The execution of transformations/actions is the same, just the source is different.