Created on 06-10-2016 10:05 AM - edited 09-16-2022 03:24 AM
Is there a reliable way to count things when using Spark? In Hadoop jobs, counters are never "counted" if the job fails partway through, or if speculative execution causes two process to perform the same operation, only the first complete task's counters are aggregated. My undersanding is that Spark's accumulators are not the same, and that both stage failures and speculative execution can lead to double-counting, meaning my counters are not actually reliable. Surely there's a way to get around this?
Created 06-10-2016 11:18 AM
Hi storysj,
Accumulator updates performed within an action will only be counted once while accumulator updates within transforms may be double counted if a retry occurs.
IE this code would gaurantee counts only applied once:
val accum = sc.accumulator(0, "test") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error") } println(accum.value)
While this could be double counted:
val accum = sc.accumulator(0, "test") sc.parallelize(Array(1, 2, 3, 4)).map(x => accum += x if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error") }.collect() println(accum.value)
Hope this helps,
Jason Hubbard
Created 06-10-2016 11:18 AM
Hi storysj,
Accumulator updates performed within an action will only be counted once while accumulator updates within transforms may be double counted if a retry occurs.
IE this code would gaurantee counts only applied once:
val accum = sc.accumulator(0, "test") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error") } println(accum.value)
While this could be double counted:
val accum = sc.accumulator(0, "test") sc.parallelize(Array(1, 2, 3, 4)).map(x => accum += x if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error") }.collect() println(accum.value)
Hope this helps,
Jason Hubbard
Created 06-10-2016 01:43 PM
Created 06-11-2016 07:54 AM
Based on the classes, I think you are using the Java API. You can still use the foreach to use the accumulators in a reliable way.
final Accumulator<Integer> failed = sc.accumulator(0); final Accumulator<Integer> fieldCount = sc.accumulator(0); rdd.foreach(new VoidFunction<String>(){ public void call(String line) { if (parseSuccess(line)) { fieldCount.add(1); } else { failed.add(1); }}); fieldCount.value() failed.value()
Another option is to use the reduce function instead of using accumulators. You could parse the xml in a transformation so you still have an RDD of beans to do other transformations and actions, but call the reduce to caclulate the counts you want. In order to calculate records that failed to parse, you can set that record to null or another value to indicate that. If you are concerned about performance, you can cache the RDD so it is efficient or wait to do this calculation in a accumulator when you finally perform your action, not all actions accept functions so this might not be possible.
Hope this helps.
Created 06-13-2016 07:39 AM
Created 06-13-2016 07:45 AM
I think something got lost there -- you can increment accumulators in a transformation. The point above was just that nothing happens until something later invokes an action. The only caveat here is that accumulators may double-count in some failure contexts.
Or, you can run an action to do this counting with accumulators (reliably), and then also separately do other transformations for whatever purpose you need. It's not like one RDD can only result in one action or transformation. That's the simple, standard answer.
There are other answers still. It's most certianly possible, even easy.
Created 06-13-2016 07:54 AM
So I need the reliable counters. That's a must-have.
You're right, I could do the same process twice, once for the counters (action) and once for the transformation (no counters), but that's not efficient.
Created 06-13-2016 07:58 AM
You can just run the counting on the output of the transformation, which presumably will contain a "null" or something for records that failed to parse. The only thing is that you'll want to persist that data set to avoid recomputing it. That means some extra I/O, but on the upside, does mean it's persisted for all future stages as well.
Created 06-13-2016 08:07 AM
I appreciate that suggestion, but for my use case, counters need to be incremented during the transformation when there's context to know what to count. That's my fault for not being more clear on earlier, but I need to know if I failed to parse because of invalid xml, or because of an OOME, or because the xml didn't conform to the schema, etc. That information can't be derrived from the output of "null", it needs to happen inside varous `catch` blocks.
I think the only suggestion so far that would "work" would be to run duplicate functions, first as an action for the counters, and then as a transformation to actually get my results. But that is likely to be inneficient, and will make going back to Hadoop more attractive.
Created 06-13-2016 08:13 AM
You could always use a trivial case class to contain either the desired output, or a complete error description, and process accordingly. Downstream processing would filter in only results with output and take the output; error counting action would filter out results with output and count different errors. That's not hard -- maybe it helps as a potential way forward.