Is there a reliable way to count things when using Spark? In Hadoop jobs, counters are never "counted" if the job fails partway through, or if speculative execution causes two process to perform the same operation, only the first complete task's counters are aggregated. My undersanding is that Spark's accumulators are not the same, and that both stage failures and speculative execution can lead to double-counting, meaning my counters are not actually reliable. Surely there's a way to get around this?
Accumulator updates performed within an action will only be counted once while accumulator updates within transforms may be double counted if a retry occurs.
IE this code would gaurantee counts only applied once:
val accum = sc.accumulator(0, "test") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error") } println(accum.value)
While this could be double counted:
val accum = sc.accumulator(0, "test") sc.parallelize(Array(1, 2, 3, 4)).map(x => accum += x if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error") }.collect() println(accum.value)
Hope this helps,
Does spark-streaming also have the double-counting problem for transformations on DStreams? I assume that under-the-hood they operate the same way, but I figured I'd better double-check.