Created on 07-06-2016 12:08 PM - edited 09-16-2022 03:28 AM
We have spark jobs that we need inn production that fail after running for a sufficiently long time. The probability seems to increase with the stages, even when we greatly reduce the scale (say 100 fold). In this particular case it the spark job failed to save the data after a long computation. The same job with the same data on the same machine succeeds when retried. Thus, it is likely something other than simply program + data.
We get the following stacktrace.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 230 in stage 3.0 failed 4 times, most recent failure: Lost task 230.3 in stage 3.0 (TID 8681, anode.myCluster.myCompany.com): java.io.IOException: Failed to connect to anode.myCluster.myCompany.com/aaa.bbb.ccc.ddd:eeeee at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Connection refused: anode.myCluster.myCompany.com/aaa.bbb.ccc.ddd:fffff at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1294)
[Stuff deleted]
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1432)
at com.myCompany.myClass(MyCode.scala:120)
Consider what is happening at MyCode.scala:120, the main workflow for the spark job, it is the body of the MyCode class' myMethod, and uses the MyCode companion object:
100 def myMethod(sc : SparkContext, curringParameters : CurryingParameters, filterParameter : Int, numberOfTasksToUse : Int, desiredNumberOfPartitions : Int) : Unit = { 101 val bcCurryingParameters = sc.broadcast(curryingParameters); 102 val lines = sc.textFile(inputPath).repartition(numberOfTasksToUse); 103 val resultsAndSomeMetrics = lines.mapPartitions(MyCode.processLines(bcParameters.value)). 104 filter(aPair => apair._2 != MyCode.filteredValue). 105 mapPartitions( MyCode.automatedTypeDependentClosureConstruction(bcCurryingParameters.value)). // this is an extremely expensive mapping only function, we don't want to reevaluate it. 106 persist(StorageLevel.DISK_ONLY); // so we persist the results 107 108 // TODO: This statement was introduced to force materialization of resultsAndMetrics 109 val numResultsAndmetrics = resultAndMetrics.count; 110 111 logger.error(s"""There were ${numResultsAndmetrics} results and metrics."""); 112 113 val results = resultsAndMetrics. 114 map(resultAndMetric => resultAndMetric._1). 115 map(result => Json.toJson(result)). // Json serialize via play framework 116 map(jsValue => Json.stringify(jsValue)); 117 118 results. 119 coalesce(desiredNumberOfPartitions, true). // turned on shuffling due to anecdotal recommendations within my team 120 saveAsTextFile(routedTripsPath, classOf[GzipCodec]); 121
Please note that the output of Line 111 was found in the logs, so the count was run.
2016:06:29:12:35:26.424 [Driver] ERROR com.mycompany.mypackage.MyCode.myMethod:111 - There were xxxxx results and metrics.
From the stack trace and the barrier nature of the count, I strongly suspect that the death is occuring in evaluation of the lines 118-120, when the results of this 1:45 minutes of computattion are saved. However, it failed, but why? Interestingly at 12:12:34 there is a failure, but the code has no explicit destroys due to previous issues we experienced, and small (assuredly less than 10 KB at most, more likely less than 1 KB) broadcast data.
2016:06:29:12:12:34.021 [Spark Context Cleaner] ERROR org.apache.spark.ContextCleaner.logError:96 - Error cleaning broadcast 7 org.apache.spark.rpc.RpcTimeoutException: Timed out. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214) ~[main.jar:na] at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229) ~[main.jar:na] at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225) ~[main.jar:na] at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) ~[main.jar:na] at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) ~[main.jar:na] at scala.util.Try$.apply(Try.scala:161) ~[main.jar:na] at scala.util.Failure.recover(Try.scala:185) ~[main.jar:na] at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) ~[main.jar:na] at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) ~[main.jar:na] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[main.jar:na] at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) ~[main.jar:na] at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) ~[main.jar:na] at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) ~[main.jar:na] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[main.jar:na] at scala.concurrent.Promise$class.complete(Promise.scala:55) ~[main.jar:na] at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) ~[main.jar:na] at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) ~[main.jar:na] at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) ~[main.jar:na] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[main.jar:na] at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643) ~[main.jar:na] at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658) ~[main.jar:na] at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) ~[main.jar:na] at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) ~[main.jar:na] at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) ~[main.jar:na] at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634) ~[main.jar:na] at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) ~[main.jar:na] at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685) ~[main.jar:na] at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) ~[main.jar:na] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[main.jar:na] at scala.concurrent.Promise$class.complete(Promise.scala:55) ~[main.jar:na] at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) ~[main.jar:na] at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249) ~[main.jar:na] at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249) ~[main.jar:na] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[main.jar:na] at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) ~[main.jar:na] at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) ~[main.jar:na] at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) ~[main.jar:na] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) ~[main.jar:na] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) ~[main.jar:na] at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118) ~[main.jar:na] at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) ~[main.jar:na] at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) ~[main.jar:na] at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455) ~[main.jar:na] at akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407) ~[main.jar:na] at akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411) ~[main.jar:na] at akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363) ~[main.jar:na] at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_45] Caused by: akka.pattern.AskTimeoutException: Timed out ... 9 common frames omitted
Closer to the time of death we see an executor on the same node as the driver is complaining about failing with a very long stacktrace.
Container: container_e949_1462478996024_31856_01_000083 on anode.myCluseter.myCompany.com_gggg ==================================================================================================== LogType:stderr Log Upload Time:Wed Jun 29 13:53:45 -0700 2016 LogLength:525 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/data/3/yarn/nm/usercache/myUserName/filecache/2032/main.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p1108.867/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] LogType:stdout Log Upload Time:Wed Jun 29 13:53:45 -0700 2016 LogLength:57199 Log Contents: 2016:06:29:12:36:23.843 [Executor task launch worker-3] ERROR o.a.s.n.shuffle.RetryingBlockFetcher.fetchAllOutstanding:142 - Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to anode.myCluster.myCompany.com/aaa.bbb.ccc.fff:hhhhh at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) ~[main.jar:na] at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) ~[main.jar:na] at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) ~[main.jar:na] at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) [main.jar:na] at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) [main.jar:na] at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) [main.jar:na] at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) [main.jar:na] at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:605) [main.jar:na] at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:603) [main.jar:na] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [main.jar:na] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [main.jar:na] at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:603) [main.jar:na] [A bunch of lines from this pretty long stack trace deleted] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) ~[main.jar:na] at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[main.jar:na] ... 1 common frames omitted 2016:06:29:12:39:34.720 [SIGTERM handler] ERROR o.a.s.e.CoarseGrainedExecutorBackend.handle:57 - RECEIVED SIGNAL 15: SIGTERM
Why do we have these errors and how do we fix this problem so that this job runs in a stable and predictable manner?
Created 08-18-2016 05:41 PM
Created 08-18-2016 05:41 PM