Created on 10-28-2015 12:33 PM - edited 09-16-2022 02:46 AM
I have a job where the executors crash at the end of ingestion and distnct of a modest sized data set, with what seems to be a generous resource allocation, due to a timeout on the mapTracker in spite of tuning efforts. What causes this and how can I fix it?
I have a reasonable size data set (1.4 TB on a cluster with about 250 Nodes or so) which is
The code that fails looks something like:
val desiredNumberOfPartitions=2000; val cookedMyData = rawMyData.map(line => new MyData(line)); val uniqueGroups = cookedMyData.map(md => ((md.field1, md.field2), (md.field3, md.field4))).distinct().repartition(desiredNumberOfPartitions);
The Job runs (as seen in the spark ui) with a driver wotih 40 GB and processor cores and 1000 executors with 1 processor core and 4 GB each (i.e. pretty well resourced) until it hits the distinct and then it hangs on a straggler (the very last executor, pair) and then the job
dies in a fiery cataclysm of doom (see the stack trace at the end of this post for details).
I have tried some tuning and set the following sparkConf and akka parameters:
// Here begins my custom settings sparkConf.set("spark.akka.frameSize", "1000"); // Frame size in MB, workers should be able to send bigger messages, based on some recommended numbers sparkConf.set("spark.akka.threads", "16"); // Try quadrupling the default number of threads for improved response time sparkConf.set("spark.akka.askTimeout", "120"); // Increase from default 30 seconds in case it is too short sparkConf.set("spark.kryoserializer.buffer.max.mb", "256"); // In case it helps
and the logs for the executors look like:
Container: container_e36_1445047866026_2759_01_000100 on someNode.myCluster.myCompany.com_8041 ================================================================================================== LogType:stderr Log Upload Time:Tue Oct 27 17:42:12 -0700 2015 LogLength:519 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/data/10/yarn/nm/usercache/william_maniatty/filecache/83/main.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] LogType:stdout Log Upload Time:Tue Oct 27 17:42:12 -0700 2015 LogLength:8012 Log Contents: 17:16:35,434 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy] 17:16:35,434 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml] 17:16:35,435 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [jar:file:/data/10/yarn/nm/usercache/william_maniatty/filecache/83/main.jar!/logback.xml] 17:16:35,448 |-INFO in ch.qos.logback.core.joran.spi.ConfigurationWatchList@5606c0b - URL [jar:file:/data/10/yarn/nm/usercache/william_maniatty/filecache/83/main.jar!/logback.xml] is not of type file 17:16:35,602 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender] 17:16:35,605 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT] 17:16:35,675 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [com.myCompany.myTeam] to DEBUG 17:16:35,675 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to ERROR 17:16:35,675 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT] 17:16:35,676 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration. 17:16:35,678 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@6646153 - Registering current configuration as safe fallback point 2015:10:27:17:41:12.763 [Executor task launch worker-1] ERROR o.a.spark.MapOutputTrackerWorker.logError:96 - Error communicating with MapOutputTracker org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(3)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) ~[main.jar:na] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) [main.jar:na] at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164) [main.jar:na] at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) [main.jar:na] at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) [main.jar:na] at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) [main.jar:na] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) [main.jar:na] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) [main.jar:na] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [main.jar:na] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) [main.jar:na] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) [main.jar:na] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [main.jar:na] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) [main.jar:na] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) [main.jar:na] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) [main.jar:na] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) [main.jar:na] at org.apache.spark.scheduler.Task.run(Task.scala:64) [main.jar:na] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) [main.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) ~[main.jar:na] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) ~[main.jar:na] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) ~[main.jar:na] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) ~[main.jar:na] at scala.concurrent.Await$.result(package.scala:107) ~[main.jar:na] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ~[main.jar:na] ... 20 common frames omitted 2015:10:27:17:41:12.768 [Executor task launch worker-1] ERROR org.apache.spark.executor.Executor.logError:96 - Exception in task 162.0 in stage 4.0 (TID 29281) org.apache.spark.SparkException: Error communicating with MapOutputTracker at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117) ~[main.jar:na] at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164) ~[main.jar:na] at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) ~[main.jar:na] at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) ~[main.jar:na] at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) ~[main.jar:na] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[main.jar:na] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[main.jar:na] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[main.jar:na] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[main.jar:na] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[main.jar:na] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[main.jar:na] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[main.jar:na] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[main.jar:na] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[main.jar:na] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[main.jar:na] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[main.jar:na] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[main.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(3)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) ~[main.jar:na] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) ~[main.jar:na] ... 19 common frames omitted Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) ~[main.jar:na] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) ~[main.jar:na] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) ~[main.jar:na] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) ~[main.jar:na] at scala.concurrent.Await$.result(package.scala:107) ~[main.jar:na] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ~[main.jar:na] ... 20 common frames omitted 2015:10:27:17:42:05.646 [sparkExecutor-akka.actor.default-dispatcher-3] ERROR akka.remote.EndpointWriter.apply$mcV$sp:65 - AssociationError [akka.tcp://sparkExecutor@someNode.myCluster.myCompany.com:58865] -> [akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814]: Error [Shut down address: akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] 2015:10:27:17:42:05.693 [sparkExecutor-akka.actor.default-dispatcher-5] ERROR o.a.s.e.CoarseGrainedExecutorBackend.logError:75 - Driver Disassociated [akka.tcp://sparkExecutor@someNode.myCluster.myCompany.com:58865] -> [akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814] disassociated! Shutting down.
Created 12-04-2015 10:45 AM
Created 12-04-2015 10:45 AM