Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Reliable method for keeping counters in Spark

avatar
Explorer

Is there a reliable way to count things when using Spark? In Hadoop jobs, counters are never "counted" if the job fails partway through, or if speculative execution causes two process to perform the same operation, only the first complete task's counters are aggregated. My undersanding is that Spark's accumulators are not the same, and that both stage failures and speculative execution can lead to double-counting, meaning my counters are not actually reliable. Surely there's a way to get around this?

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Hi storysj,

 

Accumulator updates performed within an action will only be counted once while accumulator updates within transforms may be double counted if a retry occurs.

 

IE this code would gaurantee counts only applied once:

val accum = sc.accumulator(0, "test")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x =>
  accum += x
  if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error")
}
println(accum.value)

While this could be double counted:

val accum = sc.accumulator(0, "test")
sc.parallelize(Array(1, 2, 3, 4)).map(x =>
  accum += x
  if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error")
}.collect()
println(accum.value)

Hope this helps,

Jason Hubbard

View solution in original post

11 REPLIES 11

avatar
Expert Contributor

Hi storysj,

 

Accumulator updates performed within an action will only be counted once while accumulator updates within transforms may be double counted if a retry occurs.

 

IE this code would gaurantee counts only applied once:

val accum = sc.accumulator(0, "test")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x =>
  accum += x
  if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error")
}
println(accum.value)

While this could be double counted:

val accum = sc.accumulator(0, "test")
sc.parallelize(Array(1, 2, 3, 4)).map(x =>
  accum += x
  if (scala.util.Random.nextInt(2) == 0) throw new RuntimeException("random error")
}.collect()
println(accum.value)

Hope this helps,

Jason Hubbard

avatar
Explorer
So at first I was really excited about your answer - until I realized that I really can't perform transformations on my rdd items. So here's a revised way to ask my question: Say I have an RDD of xml documents, and I want to run them through a function where I parse them into some sort of Bean (a transformation), and I want to count how many xml documents can't be parsed, and how may of my beans have property x, etc. Is there no way to count these things reliably, since the `forEach()` action takes a `VoidFunction` and the `map()` transformation doesn't keep reliable counters?

avatar
Expert Contributor

Based on the classes, I think you are using the Java API.  You can still use the foreach to use the accumulators in a reliable way.

 

final Accumulator<Integer> failed = sc.accumulator(0);
final Accumulator<Integer> fieldCount = sc.accumulator(0);
rdd.foreach(new VoidFunction<String>(){ public void call(String line) {
 if (parseSuccess(line)) {
  fieldCount.add(1);
 } else {
  failed.add(1);
}});

fieldCount.value()
failed.value()

Another option is to use the reduce function instead of using accumulators.  You could parse the xml in a transformation so you still have an RDD of beans to do other transformations and actions, but call the reduce to caclulate the counts you want.  In order to calculate records that failed to parse, you can set that record to null or another value to indicate that.  If you are concerned about performance, you can cache the RDD so it is efficient or wait to do this calculation in a accumulator when you finally perform your action, not all actions accept functions so this might not be possible.

 

Hope this helps.

avatar
Explorer
For me, since I need a collection (or RDD) of Bean objects from the result of processing, a void function won't cut it. Using a reduce is an interesting idea, but would require all the information I want to count to be derivable from the Beans, which it won't be. If I want to count the different types of parse errors (not just success-vs-failure) that information will be lost by the time I just have beans.

Seems a shame that spark didn't manage to capture this one critical piece of the Hadoop framework.

avatar
Master Collaborator

I think something got lost there -- you can increment accumulators in a transformation. The point above was just that nothing happens until something later invokes an action. The only caveat here is that accumulators may double-count in some failure contexts.

 

Or, you can run an action to do this counting with accumulators (reliably), and then also separately do other transformations for whatever purpose you need. It's not like one RDD can only result in one action or transformation. That's the simple, standard answer.


There are other answers still. It's most certianly possible, even easy.

avatar
Explorer

So I need the reliable counters. That's a must-have.

 

You're right, I could do the same process twice, once for the counters (action) and once for the transformation (no counters), but that's not efficient.

avatar
Master Collaborator

You can just run the counting on the output of the transformation, which presumably will contain a "null" or something for records that failed to parse. The only thing is that you'll want to persist that data set to avoid recomputing it. That means some extra I/O, but on the upside, does mean it's persisted for all future stages as well.

avatar
Explorer

I appreciate that suggestion, but for my use case, counters need to be incremented during the transformation when there's context to know what to count. That's my fault for not being more clear on earlier, but I need to know if I failed to parse because of invalid xml, or because of an OOME, or because the xml didn't conform to the schema, etc. That information can't be derrived from the output of "null", it needs to happen inside varous `catch` blocks.

 

I think the only suggestion so far that would "work" would be to run duplicate functions, first as an action for the counters, and then as a transformation to actually get my results. But that is likely to be inneficient, and will make going back to Hadoop more attractive.

avatar
Master Collaborator

You could always use a trivial case class to contain either the desired output, or a complete error description, and process accordingly. Downstream processing would filter in only results with output and take the output; error counting action would filter out results with output and count different errors. That's not hard -- maybe it helps as a potential way forward.