Support Questions

Find answers, ask questions, and share your expertise

Spark 1.3.0 on Cloudera 5.4 fails on loading data and distinct.

avatar
Explorer

I have a job where the executors crash at the end of ingestion and distnct of a modest sized data set, with what seems to be a generous resource allocation, due to a timeout on the mapTracker in spite of tuning efforts.  What causes this and how can I fix it?

 

I have a reasonable size data set (1.4 TB on a cluster with about 250 Nodes or so) which is

  1. loaded from HDFS,
  2. parsed to scala objects
  3. map to project a few fields/columns/data members, to some nested tuples field1 is a modest sized String (less than say 15 characters) the rest, field2, field3, and field4,  are Int,
  4. distinct the columns (this is small, say 2 MB or so and is confirmed by the shuffle output memory in the spark UI.

The code that fails looks something like:

    val desiredNumberOfPartitions=2000;
    val cookedMyData = rawMyData.map(line => new MyData(line));
    val uniqueGroups = cookedMyData.map(md => ((md.field1, md.field2), (md.field3, md.field4))).distinct().repartition(desiredNumberOfPartitions);
  

 

The Job runs (as seen in the spark ui) with a driver wotih 40 GB and processor cores and 1000 executors with 1 processor core and 4 GB each (i.e. pretty well resourced) until it hits the distinct and then it hangs on a straggler (the very last executor, pair) and then the job

dies in a fiery cataclysm of doom (see the stack trace at the end of this post for details).

 

I have tried some tuning and set the following sparkConf and akka parameters:

 

    // Here begins my custom settings
    sparkConf.set("spark.akka.frameSize", "1000"); // Frame size in MB, workers should be able to send bigger messages, based on some recommended numbers
    sparkConf.set("spark.akka.threads", "16"); // Try quadrupling the default number of threads for improved response time
    sparkConf.set("spark.akka.askTimeout", "120"); // Increase from default 30 seconds in case it is too short
    sparkConf.set("spark.kryoserializer.buffer.max.mb", "256"); // In case it helps

and the logs for the executors look like:

 

Container: container_e36_1445047866026_2759_01_000100 on someNode.myCluster.myCompany.com_8041
==================================================================================================
LogType:stderr
Log Upload Time:Tue Oct 27 17:42:12 -0700 2015
LogLength:519
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/10/yarn/nm/usercache/william_maniatty/filecache/83/main.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]

LogType:stdout
Log Upload Time:Tue Oct 27 17:42:12 -0700 2015
LogLength:8012
Log Contents:
17:16:35,434 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
17:16:35,434 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
17:16:35,435 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [jar:file:/data/10/yarn/nm/usercache/william_maniatty/filecache/83/main.jar!/logback.xml]
17:16:35,448 |-INFO in ch.qos.logback.core.joran.spi.ConfigurationWatchList@5606c0b - URL [jar:file:/data/10/yarn/nm/usercache/william_maniatty/filecache/83/main.jar!/logback.xml] is not of type file
17:16:35,602 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
17:16:35,605 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
17:16:35,675 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [com.myCompany.myTeam] to DEBUG
17:16:35,675 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to ERROR
17:16:35,675 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
17:16:35,676 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
17:16:35,678 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@6646153 - Registering current configuration as safe fallback point
2015:10:27:17:41:12.763 [Executor task launch worker-1] ERROR o.a.spark.MapOutputTrackerWorker.logError:96 - Error communicating with MapOutputTracker
org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(3)]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) ~[main.jar:na]
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) [main.jar:na]
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164) [main.jar:na]
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) [main.jar:na]
        at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) [main.jar:na]
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) [main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) [main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) [main.jar:na]
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) [main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) [main.jar:na]
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) [main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) [main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) [main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) [main.jar:na]
        at org.apache.spark.scheduler.Task.run(Task.scala:64) [main.jar:na]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) [main.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) ~[main.jar:na]
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) ~[main.jar:na]
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) ~[main.jar:na]
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) ~[main.jar:na]
        at scala.concurrent.Await$.result(package.scala:107) ~[main.jar:na]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ~[main.jar:na]
        ... 20 common frames omitted
2015:10:27:17:41:12.768 [Executor task launch worker-1] ERROR org.apache.spark.executor.Executor.logError:96 - Exception in task 162.0 in stage 4.0 (TID 29281)
org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117) ~[main.jar:na]
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164) ~[main.jar:na]
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) ~[main.jar:na]
        at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) ~[main.jar:na]
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[main.jar:na]
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[main.jar:na]
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[main.jar:na]
        at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[main.jar:na]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[main.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(3)]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) ~[main.jar:na]
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) ~[main.jar:na]
        ... 19 common frames omitted
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) ~[main.jar:na]
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) ~[main.jar:na]
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) ~[main.jar:na]
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) ~[main.jar:na]
        at scala.concurrent.Await$.result(package.scala:107) ~[main.jar:na]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ~[main.jar:na]
        ... 20 common frames omitted
2015:10:27:17:42:05.646 [sparkExecutor-akka.actor.default-dispatcher-3] ERROR akka.remote.EndpointWriter.apply$mcV$sp:65 - AssociationError [akka.tcp://sparkExecutor@someNode.myCluster.myCompany.com:58865] -> [akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814]: Error [Shut down address: akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814] [
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.
]
2015:10:27:17:42:05.693 [sparkExecutor-akka.actor.default-dispatcher-5] ERROR o.a.s.e.CoarseGrainedExecutorBackend.logError:75 - Driver Disassociated [akka.tcp://sparkExecutor@someNode.myCluster.myCompany.com:58865] -> [akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814] disassociated! Shutting down.
1 ACCEPTED SOLUTION

avatar
Contributor
NA since customer opted to use Scalding to implement the solution instead of Spark.

View solution in original post

1 REPLY 1

avatar
Contributor
NA since customer opted to use Scalding to implement the solution instead of Spark.