Support Questions

Find answers, ask questions, and share your expertise

Spark Metadata Fetch Failed Exception: Missing an output location for shuffle

avatar
Rising Star

Hi everyone,

this week we get an increment in the amount of data our Spark ETL Job needs to process.

We were able to successfully process up to 120 GB and due to some changes and backlog now around 1TB needs to be processed.

We have a cluster with 18 Spark2 clients and I have to use a Yarn Queue that has 30% assigned.

Every box has 56 CPUs and 256 GB RAM.

HDP Version = 2.6.4

Spark = 2.2.0

I was running the job with:

  • Number of executors: 73
  • Executor Memory: 3 GB
  • Executor Cores: 5
  • Driver Memory: 5 GB
  • Spark Master: yarn
  • spark.driver.maxResultSize: 5 GB

Since we have this amount of data (1 TB) we are getting the following error:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 11

These are the details of the failing job (see attachments):

And the complete error trace:

org.apache.spark.shuffle.MetadataFetchFailedException:Missing an output location for shuffle 11
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693)
        at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
        at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) 

That is a piece of the Spark program:

    // Define json processing path
    val jsonProcessingPath = "hdfs:///" + env +"/etl/dwd/forecast/processing/" + filesPath + "*"
    // Target directories
    val dwdForecastDataPath = "hdfs:///" + env +"/data/dwd/forecast"
    val dwdForecastStagePath = "hdfs:///" + env +"/etl/dwd/forecast/stage"

    val jsonRDD = spark.sparkContext.wholeTextFiles(jsonProcessingPath, 72)
    val jsonDF = spark.read.json(jsonRDD.map(f => f._2))
    val dwdJsonRecords = jsonDF.select($"meta.filename"
          , $"meta.numberOfRecords", $"meta.recordNumber", explode($"records").as("records"))
          .select($"filename", $"numberOfRecords", $"recordNumber", $"records.time", explode($"records.grid  s").as("grids"))
          .select($"filename", $"numberOfRecords", $"recordNumber", $"time", $"grids.gPt".getItem(0).as("lat  1"), $"grids.gPt".getItem(1).as("long1"), $"grids.gPt".getItem(2).as("lat2"), $"grids.gPt".g  etItem(3).as("long2"), $"grids.gPt".getItem(4).as("value"))         


      // Create Temporary View for data processing
      dwdJsonRecords.createOrReplaceTempView("dwd")
      // Create table from View
      spark.table("dwd")
      // Load the table in memory
      spark.table("dwd").cache

      /* --------------------------------------
     --------   ASWDIR_S   ----------------
     ------------------------------------ */
      val aswdirCsvDf = spark.sql("""SELECT regexp_extract(filename, 'cosmo.+_([0-9]{10}_[0-9]{3})_.+(?=.gri b2.bz2)', 1) as `msg_id`
        , to_utc_timestamp(CAST(from_unixtime(unix_timestamp(time,"yyyy-MM-dd'T'HH:mm:ssXXX")) as timestam p),'Europe/Berlin')  as `msg_ts`
        , lat1, long1, lat2, long2, value  FROM dwd WHERE filename like '%ASWDIR%' and value != 'NaN'""")

aswdirCsvDf.repartition(1).write.format("com.databricks.spark.csv").option("timestampFormat", "yyyy-MM-dd HH:mm:ss").mode("append").save(aswdirPathCsv.concat("ASWDIR_S"))

I am using a wholeTextFiles to load several JSON files at once and for 72 partitions. I selected this values after some try and error some months ago.

Then I want to catch the dataframe in-memory.

Finally I write some queries to flatten-out the JSON content and write it back as single csv file to HDFS. Attach you can find an screen shot of the stages.

Any help will be highly appreciated.

Kind regards,

Paul

73007-failed-stages.png

73006-stages-for-all-jobs.png

73005-failing-job.png

1 REPLY 1

avatar
Rising Star

Hi everyone,

I already solved it after a deep analysis of the code.

As you can see in the code I posted above, I am repartitioning the data. As a background, the regular process transforms small files, and I want to collect the partial results and created a sigle file, which is then written into HDFS. That is a desired feature since HDFS works better with bigger files. To explain it better, because small and big could be very fuzzy. Our HDFS has a standard configuration of 128 MB blocks, therefore, a 2 or 3 MB files makes no sense and is also affecting the performance.

This is the regular situation, but now a backlog of around 1 TB needs to be processed and the repartition is causing a shuffle operation. As far as I understand, the repartition requires to collect all the parts in one worker to create one partition. Since the original RDD is bigger than the memory available in the workers, this collapses everything and throws the errors I reported above.

aswdirCsvDf.repartition(1).write

I just removed the ".repartition(1)" from the code and now is everything working. The program, writes several files, that is, one file pro worker, and in this context it is quite ok.

Kind regards,

Paul