Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

CDH 5.5, Spark 1.5.0 Executor fails due to operational issues, and causing job restarts

avatar
Explorer

I have a spark job that has a long running stage (24+ hours) running in batch mode using yarn-cluster mode.  When a node hosting an executor fails during the job, the executor fails to restart and the stage is restarted causing the loss of valuable partial results and causing either the job to violate its deadlines and possibly causing the entire workflow to fail after very lengthy retries.

 

The stage persists its output (tuples), the remaining stages run quite fast (first and second stages materializes via countbykey with very small amounts of data to shuffle (fewer than 100KB) and a stage that saves a projection of the tuple's data to HDFS (runs longer but has no shuffle). During the last few runs, the runs cannot complete due to various node level issues (e.g. in one case a disk failed on a node hosting an executor, in another, an executors nodemanager was restarted, etc.).  The job runs reliably for shorter runs, the container's memory limit is generous relative to the memory actually used and so far it appears that (for longer runs that fail) all errors are precisely correlated with and  triggerred by cluster node failures that cause one or more executors to fail.  I thought spark was resilient and would recover the executor and retry just the failed tasks in the stage, but in practice, the executor fails to restart.  I have the following questions:

1) Does spark migrate a failed executor to another container (on a different cluster node) before a stage fails?  If not, is there something that can be done to encourage executor migration to a container on a different cluster node?

2) Under what conditions does spark trigger a stage failure (is it because the same executor dies 4 times?)

3) I'm not seeing adequate diagnostics in my yarn application logs to determine why the executor fails (and it is a very laborious process to look at all the logs on the cluster nodes to determine the operational failure).  Is there a smart way to learn why an executor fails and to identify all failed executors from an already failed job?

4) Is there some setting I need to configure to get spark to be resilient against single node failures.

1 ACCEPTED SOLUTION

avatar
Contributor
1) currently, spark does not migrate to a different node (it may, but that would just be by chance).  There is work in-progress to add node blacklisting, but that hasn't been committed yet (https://issues.apache.org/jira/browse/SPARK-8426) 2) Task failure - some exception encounter while running a task, e.g user code throws exception or external to the task such as spark cannot read from HDFS, etc… Job failure - if a particular task fails 4 times then Sparks gives up and cancels the whole job Stage failure (this is the trickiest) - this happens when a task attempts to read the *shuffle* data from another node. If it fails to read that shuffle data then it assumes that the remote node is dead (failure may happen due to bad disk, network error, bad node, node overload with other tasks and not responding fast enough, etc…). This is when Spark thinks it needs to regenerate the input data so Spark mark the stage as failed and return the previous stage that generate the input data. If the stage retry fails 4 times, Spark will give up assuming there the cluster has issue. 3) No great answer to this one. The best answer is really just using “yarn logs —applicationId ” to get all the logs in one file so it’s a bit easier to search through to find errors (rather than having to click the log one by one) 4) No, you don’t need any setting for that. Spark should be resilient to single node failures. With that said, there could be bugs in this area. if you encounter that is not the case, please provide the applicationId and cluster information so that I can collect logs and pass it on to our Spark team to analyze.

View solution in original post

1 REPLY 1

avatar
Contributor
1) currently, spark does not migrate to a different node (it may, but that would just be by chance).  There is work in-progress to add node blacklisting, but that hasn't been committed yet (https://issues.apache.org/jira/browse/SPARK-8426) 2) Task failure - some exception encounter while running a task, e.g user code throws exception or external to the task such as spark cannot read from HDFS, etc… Job failure - if a particular task fails 4 times then Sparks gives up and cancels the whole job Stage failure (this is the trickiest) - this happens when a task attempts to read the *shuffle* data from another node. If it fails to read that shuffle data then it assumes that the remote node is dead (failure may happen due to bad disk, network error, bad node, node overload with other tasks and not responding fast enough, etc…). This is when Spark thinks it needs to regenerate the input data so Spark mark the stage as failed and return the previous stage that generate the input data. If the stage retry fails 4 times, Spark will give up assuming there the cluster has issue. 3) No great answer to this one. The best answer is really just using “yarn logs —applicationId ” to get all the logs in one file so it’s a bit easier to search through to find errors (rather than having to click the log one by one) 4) No, you don’t need any setting for that. Spark should be resilient to single node failures. With that said, there could be bugs in this area. if you encounter that is not the case, please provide the applicationId and cluster information so that I can collect logs and pass it on to our Spark team to analyze.