Support Questions

Find answers, ask questions, and share your expertise

CDH 5.5 Spark 1.5 has sporadic failures including saveAsTextFile fails in yarn cluster mode

avatar
Explorer

We have spark jobs that we need inn production that fail after running for a sufficiently long time. The probability seems to increase with the stages, even when we greatly reduce the scale (say 100 fold). In this particular case it the spark job failed to save the data after a long computation. The same job with the same data on the same machine succeeds when retried. Thus, it is likely something other than simply program + data.

 

We get the following stacktrace.

 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 230 in stage 3.0 failed 4 times, most recent failure: Lost task 230.3 in stage 3.0 (TID 8681, anode.myCluster.myCompany.com): java.io.IOException: Failed to connect to anode.myCluster.myCompany.com/aaa.bbb.ccc.ddd:eeeee
         at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
         at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
         at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
         at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
         at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
         at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
         at java.lang.Thread.run(Thread.java:745)
 Caused by: java.net.ConnectException: Connection refused: anode.myCluster.myCompany.com/aaa.bbb.ccc.ddd:fffff
         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
         at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
         at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
         at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
         at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
         at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
         at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
         ... 1 more
  
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1294)
[Stuff deleted]
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1432)
at com.myCompany.myClass(MyCode.scala:120)

 

Consider what is happening at MyCode.scala:120, the main workflow for the spark job, it is the body of the MyCode class' myMethod, and uses the MyCode companion object:

100 def myMethod(sc : SparkContext, curringParameters : CurryingParameters, filterParameter : Int, numberOfTasksToUse : Int, desiredNumberOfPartitions : Int) : Unit = {
101     val bcCurryingParameters = sc.broadcast(curryingParameters);
102     val lines = sc.textFile(inputPath).repartition(numberOfTasksToUse);
103     val resultsAndSomeMetrics = lines.mapPartitions(MyCode.processLines(bcParameters.value)).
104       filter(aPair => apair._2 != MyCode.filteredValue).
105       mapPartitions( MyCode.automatedTypeDependentClosureConstruction(bcCurryingParameters.value)). // this is an extremely expensive mapping only function, we don't want to reevaluate it. 
106       persist(StorageLevel.DISK_ONLY); // so we persist the results
107 
108     // TODO: This statement was introduced to force materialization of resultsAndMetrics
109     val numResultsAndmetrics = resultAndMetrics.count;
110         
111     logger.error(s"""There were ${numResultsAndmetrics} results and metrics.""");
112         
113     val results = resultsAndMetrics.
114       map(resultAndMetric => resultAndMetric._1).
115       map(result => Json.toJson(result)). // Json serialize via play framework
116       map(jsValue => Json.stringify(jsValue));
117           
118     results.
119       coalesce(desiredNumberOfPartitions, true). // turned on shuffling due to anecdotal recommendations within my team
120       saveAsTextFile(routedTripsPath, classOf[GzipCodec]); 
121           


Please note that the output of Line 111 was found in the logs, so the count was run.

2016:06:29:12:35:26.424 [Driver] ERROR com.mycompany.mypackage.MyCode.myMethod:111 - There were xxxxx results and metrics. 

From the stack trace and the barrier nature of the count, I strongly suspect that the death is occuring in evaluation of the lines 118-120, when the results of this 1:45 minutes of computattion are saved. However, it failed, but why? Interestingly at 12:12:34 there is a failure, but the code has no explicit destroys due to previous issues we experienced, and small (assuredly less than 10 KB at most, more likely less than 1 KB) broadcast data.

2016:06:29:12:12:34.021 [Spark Context Cleaner] ERROR org.apache.spark.ContextCleaner.logError:96 - Error cleaning broadcast 7 
org.apache.spark.rpc.RpcTimeoutException: Timed out. This timeout is controlled by spark.rpc.askTimeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214) ~[main.jar:na]
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229) ~[main.jar:na]
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225) ~[main.jar:na]
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) ~[main.jar:na]
        at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) ~[main.jar:na]
        at scala.util.Try$.apply(Try.scala:161) ~[main.jar:na]
        at scala.util.Failure.recover(Try.scala:185) ~[main.jar:na]
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) ~[main.jar:na]
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) ~[main.jar:na]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[main.jar:na]
        at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) ~[main.jar:na]
        at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) ~[main.jar:na]
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) ~[main.jar:na]
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[main.jar:na]
        at scala.concurrent.Promise$class.complete(Promise.scala:55) ~[main.jar:na]
        at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) ~[main.jar:na]
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) ~[main.jar:na]
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) ~[main.jar:na]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[main.jar:na]
        at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643) ~[main.jar:na]
        at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658) ~[main.jar:na]
        at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) ~[main.jar:na]
        at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) ~[main.jar:na]
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) ~[main.jar:na]
        at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634) ~[main.jar:na]
        at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) ~[main.jar:na]
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685) ~[main.jar:na]
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) ~[main.jar:na]
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[main.jar:na]
        at scala.concurrent.Promise$class.complete(Promise.scala:55) ~[main.jar:na]
        at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) ~[main.jar:na]
        at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249) ~[main.jar:na]
        at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249) ~[main.jar:na]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[main.jar:na]
        at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) ~[main.jar:na]
        at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) ~[main.jar:na]
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) ~[main.jar:na]
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[main.jar:na]
        at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) ~[main.jar:na]
        at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118) ~[main.jar:na]
        at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) ~[main.jar:na]
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) ~[main.jar:na]
        at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455) ~[main.jar:na]
        at akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407) ~[main.jar:na]
        at akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411) ~[main.jar:na]
        at akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363) ~[main.jar:na]
        at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_45]
Caused by: akka.pattern.AskTimeoutException: Timed out
        ... 9 common frames omitted

Closer to the time of death we see an executor on the same node as the driver is complaining about failing with a very long stacktrace.

 Container: container_e949_1462478996024_31856_01_000083 on anode.myCluseter.myCompany.com_gggg
 ====================================================================================================
 LogType:stderr
 Log Upload Time:Wed Jun 29 13:53:45 -0700 2016
 LogLength:525
 Log Contents:
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in [jar:file:/data/3/yarn/nm/usercache/myUserName/filecache/2032/main.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p1108.867/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
 SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
 
 LogType:stdout
 Log Upload Time:Wed Jun 29 13:53:45 -0700 2016
 LogLength:57199
 Log Contents:
 2016:06:29:12:36:23.843 [Executor task launch worker-3] ERROR o.a.s.n.shuffle.RetryingBlockFetcher.fetchAllOutstanding:142 - Exception while beginning fetch of 1 outstanding blocks
 java.io.IOException: Failed to connect to anode.myCluster.myCompany.com/aaa.bbb.ccc.fff:hhhhh
         at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) ~[main.jar:na]
         at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) ~[main.jar:na]
         at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) ~[main.jar:na]
         at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) [main.jar:na]
         at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) [main.jar:na]
         at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) [main.jar:na]
         at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) [main.jar:na]
         at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:605) [main.jar:na]
         at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:603) [main.jar:na]
         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [main.jar:na]
         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [main.jar:na]
         at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:603) [main.jar:na]
[A bunch of lines from this pretty long stack trace deleted]
         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) ~[main.jar:na]
         at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[main.jar:na]
         ... 1 common frames omitted
 2016:06:29:12:39:34.720 [SIGTERM handler] ERROR o.a.s.e.CoarseGrainedExecutorBackend.handle:57 - RECEIVED SIGNAL 15: SIGTERM


Why do we have these errors and how do we fix this problem so that this job runs in a stable and predictable manner?

1 ACCEPTED SOLUTION

avatar
Contributor
1) Increase the executor and executor memory to 5GB and 3GB respectively to fix the OutOfMemory issue 2) Change two properties so that the retry will not be on the same node: a. spark.scheduler.executorTaskBlacklistTime= 3600000 b. spark.task.maxFailures=10

View solution in original post

1 REPLY 1

avatar
Contributor
1) Increase the executor and executor memory to 5GB and 3GB respectively to fix the OutOfMemory issue 2) Change two properties so that the retry will not be on the same node: a. spark.scheduler.executorTaskBlacklistTime= 3600000 b. spark.task.maxFailures=10