Support Questions

Find answers, ask questions, and share your expertise

Reliable method for keeping counters in Spark

avatar
Explorer

Is there a reliable way to count things when using Spark? In Hadoop jobs, counters are never "counted" if the job fails partway through, or if speculative execution causes two process to perform the same operation, only the first complete task's counters are aggregated. My undersanding is that Spark's accumulators are not the same, and that both stage failures and speculative execution can lead to double-counting, meaning my counters are not actually reliable. Surely there's a way to get around this?

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Hi storysj,

 

Accumulator updates performed within an action will only be counted once while accumulator updates within transforms may be double counted if a retry occurs.

 

IE this code would gaurantee counts only applied once:

val accum = sc.accumulator(0, "test")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x =>
  accum += x
  if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error")
}
println(accum.value)

While this could be double counted:

val accum = sc.accumulator(0, "test")
sc.parallelize(Array(1, 2, 3, 4)).map(x =>
  accum += x
  if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error")
}.collect()
println(accum.value)

Hope this helps,

Jason Hubbard

View solution in original post

11 REPLIES 11

avatar
Explorer

Does spark-streaming also have the double-counting problem for transformations on DStreams? I assume that under-the-hood they operate the same way, but I figured I'd better double-check.

avatar
Master Collaborator

Yes it would be. The execution of transformations/actions is the same, just the source is different.