Created on 03-09-2016 06:04 PM - edited 09-16-2022 03:08 AM
I have a spark job that has a long running stage (24+ hours) running in batch mode using yarn-cluster mode. When a node hosting an executor fails during the job, the executor fails to restart and the stage is restarted causing the loss of valuable partial results and causing either the job to violate its deadlines and possibly causing the entire workflow to fail after very lengthy retries.
The stage persists its output (tuples), the remaining stages run quite fast (first and second stages materializes via countbykey with very small amounts of data to shuffle (fewer than 100KB) and a stage that saves a projection of the tuple's data to HDFS (runs longer but has no shuffle). During the last few runs, the runs cannot complete due to various node level issues (e.g. in one case a disk failed on a node hosting an executor, in another, an executors nodemanager was restarted, etc.). The job runs reliably for shorter runs, the container's memory limit is generous relative to the memory actually used and so far it appears that (for longer runs that fail) all errors are precisely correlated with and triggerred by cluster node failures that cause one or more executors to fail. I thought spark was resilient and would recover the executor and retry just the failed tasks in the stage, but in practice, the executor fails to restart. I have the following questions:
1) Does spark migrate a failed executor to another container (on a different cluster node) before a stage fails? If not, is there something that can be done to encourage executor migration to a container on a different cluster node?
2) Under what conditions does spark trigger a stage failure (is it because the same executor dies 4 times?)
3) I'm not seeing adequate diagnostics in my yarn application logs to determine why the executor fails (and it is a very laborious process to look at all the logs on the cluster nodes to determine the operational failure). Is there a smart way to learn why an executor fails and to identify all failed executors from an already failed job?
4) Is there some setting I need to configure to get spark to be resilient against single node failures.