Created on 07-06-2016 12:08 PM - edited 09-16-2022 03:28 AM
We have spark jobs that we need inn production that fail after running for a sufficiently long time. The probability seems to increase with the stages, even when we greatly reduce the scale (say 100 fold). In this particular case it the spark job failed to save the data after a long computation. The same job with the same data on the same machine succeeds when retried. Thus, it is likely something other than simply program + data.
We get the following stacktrace.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 230 in stage 3.0 failed 4 times, most recent failure: Lost task 230.3 in stage 3.0 (TID 8681, anode.myCluster.myCompany.com): java.io.IOException: Failed to connect to anode.myCluster.myCompany.com/aaa.bbb.ccc.ddd:eeeee
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: anode.myCluster.myCompany.com/aaa.bbb.ccc.ddd:fffff
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1294)
[Stuff deleted]
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1432)
at com.myCompany.myClass(MyCode.scala:120)
Consider what is happening at MyCode.scala:120, the main workflow for the spark job, it is the body of the MyCode class' myMethod, and uses the MyCode companion object:
100 def myMethod(sc : SparkContext, curringParameters : CurryingParameters, filterParameter : Int, numberOfTasksToUse : Int, desiredNumberOfPartitions : Int) : Unit = {
101 val bcCurryingParameters = sc.broadcast(curryingParameters);
102 val lines = sc.textFile(inputPath).repartition(numberOfTasksToUse);
103 val resultsAndSomeMetrics = lines.mapPartitions(MyCode.processLines(bcParameters.value)).
104 filter(aPair => apair._2 != MyCode.filteredValue).
105 mapPartitions( MyCode.automatedTypeDependentClosureConstruction(bcCurryingParameters.value)). // this is an extremely expensive mapping only function, we don't want to reevaluate it.
106 persist(StorageLevel.DISK_ONLY); // so we persist the results
107
108 // TODO: This statement was introduced to force materialization of resultsAndMetrics
109 val numResultsAndmetrics = resultAndMetrics.count;
110
111 logger.error(s"""There were ${numResultsAndmetrics} results and metrics.""");
112
113 val results = resultsAndMetrics.
114 map(resultAndMetric => resultAndMetric._1).
115 map(result => Json.toJson(result)). // Json serialize via play framework
116 map(jsValue => Json.stringify(jsValue));
117
118 results.
119 coalesce(desiredNumberOfPartitions, true). // turned on shuffling due to anecdotal recommendations within my team
120 saveAsTextFile(routedTripsPath, classOf[GzipCodec]);
121
Please note that the output of Line 111 was found in the logs, so the count was run.
2016:06:29:12:35:26.424 [Driver] ERROR com.mycompany.mypackage.MyCode.myMethod:111 - There were xxxxx results and metrics.
From the stack trace and the barrier nature of the count, I strongly suspect that the death is occuring in evaluation of the lines 118-120, when the results of this 1:45 minutes of computattion are saved. However, it failed, but why? Interestingly at 12:12:34 there is a failure, but the code has no explicit destroys due to previous issues we experienced, and small (assuredly less than 10 KB at most, more likely less than 1 KB) broadcast data.
2016:06:29:12:12:34.021 [Spark Context Cleaner] ERROR org.apache.spark.ContextCleaner.logError:96 - Error cleaning broadcast 7
org.apache.spark.rpc.RpcTimeoutException: Timed out. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214) ~[main.jar:na]
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229) ~[main.jar:na]
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225) ~[main.jar:na]
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) ~[main.jar:na]
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) ~[main.jar:na]
at scala.util.Try$.apply(Try.scala:161) ~[main.jar:na]
at scala.util.Failure.recover(Try.scala:185) ~[main.jar:na]
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) ~[main.jar:na]
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) ~[main.jar:na]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[main.jar:na]
at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) ~[main.jar:na]
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) ~[main.jar:na]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) ~[main.jar:na]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[main.jar:na]
at scala.concurrent.Promise$class.complete(Promise.scala:55) ~[main.jar:na]
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) ~[main.jar:na]
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) ~[main.jar:na]
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) ~[main.jar:na]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[main.jar:na]
at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643) ~[main.jar:na]
at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658) ~[main.jar:na]
at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) ~[main.jar:na]
at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) ~[main.jar:na]
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) ~[main.jar:na]
at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634) ~[main.jar:na]
at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) ~[main.jar:na]
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685) ~[main.jar:na]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) ~[main.jar:na]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[main.jar:na]
at scala.concurrent.Promise$class.complete(Promise.scala:55) ~[main.jar:na]
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) ~[main.jar:na]
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249) ~[main.jar:na]
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249) ~[main.jar:na]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[main.jar:na]
at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) ~[main.jar:na]
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) ~[main.jar:na]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) ~[main.jar:na]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[main.jar:na]
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) ~[main.jar:na]
at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118) ~[main.jar:na]
at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) ~[main.jar:na]
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) ~[main.jar:na]
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455) ~[main.jar:na]
at akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407) ~[main.jar:na]
at akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411) ~[main.jar:na]
at akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363) ~[main.jar:na]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_45]
Caused by: akka.pattern.AskTimeoutException: Timed out
... 9 common frames omittedCloser to the time of death we see an executor on the same node as the driver is complaining about failing with a very long stacktrace.
Container: container_e949_1462478996024_31856_01_000083 on anode.myCluseter.myCompany.com_gggg
====================================================================================================
LogType:stderr
Log Upload Time:Wed Jun 29 13:53:45 -0700 2016
LogLength:525
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/3/yarn/nm/usercache/myUserName/filecache/2032/main.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p1108.867/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
LogType:stdout
Log Upload Time:Wed Jun 29 13:53:45 -0700 2016
LogLength:57199
Log Contents:
2016:06:29:12:36:23.843 [Executor task launch worker-3] ERROR o.a.s.n.shuffle.RetryingBlockFetcher.fetchAllOutstanding:142 - Exception while beginning fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to anode.myCluster.myCompany.com/aaa.bbb.ccc.fff:hhhhh
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) ~[main.jar:na]
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) ~[main.jar:na]
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) ~[main.jar:na]
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) [main.jar:na]
at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) [main.jar:na]
at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) [main.jar:na]
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) [main.jar:na]
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:605) [main.jar:na]
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:603) [main.jar:na]
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [main.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [main.jar:na]
at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:603) [main.jar:na]
[A bunch of lines from this pretty long stack trace deleted]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) ~[main.jar:na]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[main.jar:na]
... 1 common frames omitted
2016:06:29:12:39:34.720 [SIGTERM handler] ERROR o.a.s.e.CoarseGrainedExecutorBackend.handle:57 - RECEIVED SIGNAL 15: SIGTERM
Why do we have these errors and how do we fix this problem so that this job runs in a stable and predictable manner?
Created 08-18-2016 05:41 PM
Created 08-18-2016 05:41 PM