Created on 05-24-2018 02:12 PM - edited 08-17-2019 10:09 PM
Hi everyone,
this week we get an increment in the amount of data our Spark ETL Job needs to process.
We were able to successfully process up to 120 GB and due to some changes and backlog now around 1TB needs to be processed.
We have a cluster with 18 Spark2 clients and I have to use a Yarn Queue that has 30% assigned.
Every box has 56 CPUs and 256 GB RAM.
HDP Version = 2.6.4
Spark = 2.2.0
I was running the job with:
Since we have this amount of data (1 TB) we are getting the following error:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 11
These are the details of the failing job (see attachments):
And the complete error trace:
org.apache.spark.shuffle.MetadataFetchFailedException:Missing an output location for shuffle 11 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693) at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49) at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
That is a piece of the Spark program:
// Define json processing path val jsonProcessingPath = "hdfs:///" + env +"/etl/dwd/forecast/processing/" + filesPath + "*" // Target directories val dwdForecastDataPath = "hdfs:///" + env +"/data/dwd/forecast" val dwdForecastStagePath = "hdfs:///" + env +"/etl/dwd/forecast/stage" val jsonRDD = spark.sparkContext.wholeTextFiles(jsonProcessingPath, 72) val jsonDF = spark.read.json(jsonRDD.map(f => f._2)) val dwdJsonRecords = jsonDF.select($"meta.filename" , $"meta.numberOfRecords", $"meta.recordNumber", explode($"records").as("records")) .select($"filename", $"numberOfRecords", $"recordNumber", $"records.time", explode($"records.grid s").as("grids")) .select($"filename", $"numberOfRecords", $"recordNumber", $"time", $"grids.gPt".getItem(0).as("lat 1"), $"grids.gPt".getItem(1).as("long1"), $"grids.gPt".getItem(2).as("lat2"), $"grids.gPt".g etItem(3).as("long2"), $"grids.gPt".getItem(4).as("value")) // Create Temporary View for data processing dwdJsonRecords.createOrReplaceTempView("dwd") // Create table from View spark.table("dwd") // Load the table in memory spark.table("dwd").cache /* -------------------------------------- -------- ASWDIR_S ---------------- ------------------------------------ */ val aswdirCsvDf = spark.sql("""SELECT regexp_extract(filename, 'cosmo.+_([0-9]{10}_[0-9]{3})_.+(?=.gri b2.bz2)', 1) as `msg_id` , to_utc_timestamp(CAST(from_unixtime(unix_timestamp(time,"yyyy-MM-dd'T'HH:mm:ssXXX")) as timestam p),'Europe/Berlin') as `msg_ts` , lat1, long1, lat2, long2, value FROM dwd WHERE filename like '%ASWDIR%' and value != 'NaN'""") aswdirCsvDf.repartition(1).write.format("com.databricks.spark.csv").option("timestampFormat", "yyyy-MM-dd HH:mm:ss").mode("append").save(aswdirPathCsv.concat("ASWDIR_S"))
I am using a wholeTextFiles to load several JSON files at once and for 72 partitions. I selected this values after some try and error some months ago.
Then I want to catch the dataframe in-memory.
Finally I write some queries to flatten-out the JSON content and write it back as single csv file to HDFS. Attach you can find an screen shot of the stages.
Any help will be highly appreciated.
Kind regards,
Paul
Created 05-28-2018 01:03 PM
Hi everyone,
I already solved it after a deep analysis of the code.
As you can see in the code I posted above, I am repartitioning the data. As a background, the regular process transforms small files, and I want to collect the partial results and created a sigle file, which is then written into HDFS. That is a desired feature since HDFS works better with bigger files. To explain it better, because small and big could be very fuzzy. Our HDFS has a standard configuration of 128 MB blocks, therefore, a 2 or 3 MB files makes no sense and is also affecting the performance.
This is the regular situation, but now a backlog of around 1 TB needs to be processed and the repartition is causing a shuffle operation. As far as I understand, the repartition requires to collect all the parts in one worker to create one partition. Since the original RDD is bigger than the memory available in the workers, this collapses everything and throws the errors I reported above.
aswdirCsvDf.repartition(1).write
I just removed the ".repartition(1)" from the code and now is everything working. The program, writes several files, that is, one file pro worker, and in this context it is quite ok.
Kind regards,
Paul