Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark 1.3.0 on Cloudera 5.4 fails on loading data and distinct.

SOLVED Go to solution
Highlighted

Spark 1.3.0 on Cloudera 5.4 fails on loading data and distinct.

Contributor

I have a job where the executors crash at the end of ingestion and distnct of a modest sized data set, with what seems to be a generous resource allocation, due to a timeout on the mapTracker in spite of tuning efforts.  What causes this and how can I fix it?

 

I have a reasonable size data set (1.4 TB on a cluster with about 250 Nodes or so) which is

  1. loaded from HDFS,
  2. parsed to scala objects
  3. map to project a few fields/columns/data members, to some nested tuples field1 is a modest sized String (less than say 15 characters) the rest, field2, field3, and field4,  are Int,
  4. distinct the columns (this is small, say 2 MB or so and is confirmed by the shuffle output memory in the spark UI.

The code that fails looks something like:

    val desiredNumberOfPartitions=2000;
    val cookedMyData = rawMyData.map(line => new MyData(line));
    val uniqueGroups = cookedMyData.map(md => ((md.field1, md.field2), (md.field3, md.field4))).distinct().repartition(desiredNumberOfPartitions);
  

 

The Job runs (as seen in the spark ui) with a driver wotih 40 GB and processor cores and 1000 executors with 1 processor core and 4 GB each (i.e. pretty well resourced) until it hits the distinct and then it hangs on a straggler (the very last executor, pair) and then the job

dies in a fiery cataclysm of doom (see the stack trace at the end of this post for details).

 

I have tried some tuning and set the following sparkConf and akka parameters:

 

    // Here begins my custom settings
    sparkConf.set("spark.akka.frameSize", "1000"); // Frame size in MB, workers should be able to send bigger messages, based on some recommended numbers
    sparkConf.set("spark.akka.threads", "16"); // Try quadrupling the default number of threads for improved response time
    sparkConf.set("spark.akka.askTimeout", "120"); // Increase from default 30 seconds in case it is too short
    sparkConf.set("spark.kryoserializer.buffer.max.mb", "256"); // In case it helps

and the logs for the executors look like:

 

Container: container_e36_1445047866026_2759_01_000100 on someNode.myCluster.myCompany.com_8041
==================================================================================================
LogType:stderr
Log Upload Time:Tue Oct 27 17:42:12 -0700 2015
LogLength:519
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/10/yarn/nm/usercache/william_maniatty/filecache/83/main.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]

LogType:stdout
Log Upload Time:Tue Oct 27 17:42:12 -0700 2015
LogLength:8012
Log Contents:
17:16:35,434 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
17:16:35,434 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
17:16:35,435 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [jar:file:/data/10/yarn/nm/usercache/william_maniatty/filecache/83/main.jar!/logback.xml]
17:16:35,448 |-INFO in ch.qos.logback.core.joran.spi.ConfigurationWatchList@5606c0b - URL [jar:file:/data/10/yarn/nm/usercache/william_maniatty/filecache/83/main.jar!/logback.xml] is not of type file
17:16:35,602 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
17:16:35,605 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
17:16:35,675 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [com.myCompany.myTeam] to DEBUG
17:16:35,675 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to ERROR
17:16:35,675 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
17:16:35,676 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
17:16:35,678 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@6646153 - Registering current configuration as safe fallback point
2015:10:27:17:41:12.763 [Executor task launch worker-1] ERROR o.a.spark.MapOutputTrackerWorker.logError:96 - Error communicating with MapOutputTracker
org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(3)]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) ~[main.jar:na]
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) [main.jar:na]
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164) [main.jar:na]
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) [main.jar:na]
        at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) [main.jar:na]
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) [main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) [main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) [main.jar:na]
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) [main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) [main.jar:na]
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) [main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) [main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) [main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) [main.jar:na]
        at org.apache.spark.scheduler.Task.run(Task.scala:64) [main.jar:na]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) [main.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) ~[main.jar:na]
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) ~[main.jar:na]
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) ~[main.jar:na]
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) ~[main.jar:na]
        at scala.concurrent.Await$.result(package.scala:107) ~[main.jar:na]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ~[main.jar:na]
        ... 20 common frames omitted
2015:10:27:17:41:12.768 [Executor task launch worker-1] ERROR org.apache.spark.executor.Executor.logError:96 - Exception in task 162.0 in stage 4.0 (TID 29281)
org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117) ~[main.jar:na]
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164) ~[main.jar:na]
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) ~[main.jar:na]
        at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) ~[main.jar:na]
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[main.jar:na]
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[main.jar:na]
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[main.jar:na]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[main.jar:na]
        at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[main.jar:na]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[main.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(3)]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) ~[main.jar:na]
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) ~[main.jar:na]
        ... 19 common frames omitted
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) ~[main.jar:na]
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) ~[main.jar:na]
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) ~[main.jar:na]
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) ~[main.jar:na]
        at scala.concurrent.Await$.result(package.scala:107) ~[main.jar:na]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ~[main.jar:na]
        ... 20 common frames omitted
2015:10:27:17:42:05.646 [sparkExecutor-akka.actor.default-dispatcher-3] ERROR akka.remote.EndpointWriter.apply$mcV$sp:65 - AssociationError [akka.tcp://sparkExecutor@someNode.myCluster.myCompany.com:58865] -> [akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814]: Error [Shut down address: akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814] [
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.
]
2015:10:27:17:42:05.693 [sparkExecutor-akka.actor.default-dispatcher-5] ERROR o.a.s.e.CoarseGrainedExecutorBackend.logError:75 - Driver Disassociated [akka.tcp://sparkExecutor@someNode.myCluster.myCompany.com:58865] -> [akka.tcp://sparkDriver@anotherNode.myCluster.myCompany.com:46814] disassociated! Shutting down.
1 ACCEPTED SOLUTION

Accepted Solutions

Re: Spark 1.3.0 on Cloudera 5.4 fails on loading data and distinct.

Cloudera Employee
NA since customer opted to use Scalding to implement the solution instead of Spark.
1 REPLY 1

Re: Spark 1.3.0 on Cloudera 5.4 fails on loading data and distinct.

Cloudera Employee
NA since customer opted to use Scalding to implement the solution instead of Spark.