Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

S3A staging committer failing in job-commit phase.

avatar
Explorer

I am using S3A committer(Staging directory committer) with my object Store (not AWS) where I am trying to upload a large-sized directory (50TB in space) using on a Hortonworks client(hadoop). It uses Map Reduce to upload the directory to my object store. S3A staging committer initiates various tasks to do MultiPart Upload(MPU) operations to the Object Store and they all are committed during the job_commit phase.


Problem and Question:


1. MapReduce logs as seen on hadoop client

In my case, all the task commit are completed successfully but during S3A committer job_commit phase `it fails` and the error I see is

`INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=FAILED. Redirecting to the job history server

INFO mapreduce.Job: map 0% reduce 100%

INFO mapreduce.Job: Job job_1553199983818_0003 failed with state FAILED due to Job commit from a prior MRAppMaster attempt is potentially in progress. Preventing multiple commit executions` -> `Are these a warning only?` as none of my tasks failed on RM


2. S3A committer error logs during the job_commit phase and it started deleting the files. I couldn't understand why the job_commit failed from below logs

LOGS

```

INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: OutputCommitter is org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter

INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Attempt num: 2 is last retry: true because a commit was started.

INFO [main] org.apache.hadoop.yarn.event.AsyncDispatcher: Registering class org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType for class org.apache.hadoop.mapreduce.v2.app.MRAppMaster$NoopEventHandler

INFO [main] org.apache.hadoop.yarn.event.AsyncDispatcher: Registering class org.apache.hadoop.mapreduce.jobhistory.EventType for class org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler

INFO [main] org.apache.hadoop.yarn.event.AsyncDispatcher: Registering class org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator$EventType for class org.apache.hadoop.mapreduce.v2.app.MRAppMaster$ContainerAllocatorRouter

[main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]

INFO [main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]

INFO [main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]

INFO [main] org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler: Emitting job history data to the timeline server is not enabled

INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Not attempting to recover. Recovery is not supported by class org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter. Use an OutputCommitter that supports recovery.

INFO [main] org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils: Default file system [hdfs://xxxxxx:8020]

INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Previous history file is at hdfs://xxxxxx:8020/user/xxxxxx/.staging/job_1553199983818_0003/job_1553199983818_0003_1.jhist

INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Starting to clean up previous job's temporary files

INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Task committer attempt_1553199983818_0003_m_000000_0: aborting job job_1553199983818_0003 in state FAILED

INFO [main] org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter: Starting: Task committer attempt_1553199983818_0003_m_000000_0: aborting job in state job_1553199983818_0003

INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Task committer attempt_1553199983818_0003_m_000000_0: no pending commits to abort

INFO [main] org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter: Task committer attempt_1553199983818_0003_m_000000_0: aborting job in state job_1553199983818_0003 : duration 0:00.007s

INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Starting: Cleanup job job_1553199983818_0003

INFO [main] org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter: Starting: Aborting all pending commits under s3a://xxxxxx/user/xxxxxx/30000

```

The Yarn logs don't tell much why the S3 job_committer failed. Any insights what I can look into to figure this.

MRAppMaster code where the logs are coming from

```

try {

0309 String user = UserGroupInformation.getCurrentUser().getShortUserName();

0310 Path stagingDir = MRApps.getStagingAreaDir(conf, user);

0311 FileSystem fs = getFileSystem(conf);

0312

0313 boolean stagingExists = fs.exists(stagingDir);

0314 Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);

0315 boolean commitStarted = fs.exists(startCommitFile);

0316 Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);

0317 boolean commitSuccess = fs.exists(endCommitSuccessFile);

0318 Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);

0319 boolean commitFailure = fs.exists(endCommitFailureFile);

0320 if(!stagingExists) {

0321 isLastAMRetry = true;

0322 LOG.info("Attempt num: " + appAttemptID.getAttemptId() +

0323 " is last retry: " + isLastAMRetry +

0324 " because the staging dir doesn't exist.");

0325 errorHappenedShutDown = true;

0326 forcedState = JobStateInternal.ERROR;

0327 shutDownMessage = "Staging dir does not exist " + stagingDir;

0328 LOG.fatal(shutDownMessage);

0329 } else if (commitStarted) {

0330 //A commit was started so this is the last time, we just need to know

0331 // what result we will use to notify, and how we will unregister

0332 errorHappenedShutDown = true;

0333 isLastAMRetry = true;

0334 LOG.info("Attempt num: " + appAttemptID.getAttemptId() +

0335 " is last retry: " + isLastAMRetry +

0336 " because a commit was started.");

0337 copyHistory = true;

0338 if (commitSuccess) {

0339 shutDownMessage =

0340 "Job commit succeeded in a prior MRAppMaster attempt " +

0341 "before it crashed. Recovering.";

0342 forcedState = JobStateInternal.SUCCEEDED;

0343 } else if (commitFailure) {

0344 shutDownMessage =

0345 "Job commit failed in a prior MRAppMaster attempt " +

0346 "before it crashed. Not retrying.";

0347 forcedState = JobStateInternal.FAILED;

0348 } else {

0349 if (isCommitJobRepeatable()) {

0350 // cleanup previous half done commits if committer supports

0351 // repeatable job commit.

0352 errorHappenedShutDown = false;

0353 cleanupInterruptedCommit(conf, fs, startCommitFile);

0354 } else {

0355 //The commit is still pending, commit error

0356 shutDownMessage =

0357 "Job commit from a prior MRAppMaster attempt is " +

0358 "potentially in progress. Preventing multiple commit executions";

0359 forcedState = JobStateInternal.ERROR;

0360 }

0361 }

0362 }

0363 } catch (IOException e) {

0364 throw new YarnRuntimeException("Error while initializing", e);

0365 }

```

1 REPLY 1

avatar
Explorer

Any insights on this?