Created on 08-01-2018 01:05 PM - edited 09-16-2022 06:32 AM
Hello everyone, I have a Spark application which runs fine with test tables but fails in production where there are tables with 200 million records and about 100 columns. From the logs the error seems related to Snappy codec, although these tables have been saved in Parquet without compression, and also at write time I have explicitly turned off compression with:
sqlContext.sql("SET hive.exec.compress.output=false") sqlContext.sql("SET parquet.compression=NONE") sqlContext.sql("SET spark.sql.parquet.compression.codec=uncompressed")
The error is the following:
2018-08-01 16:19:45,467 [dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ShuffleMapStage 183 (saveAsTable at Model1Prep.scala:776) failed in 543.126 s due to Job aborted due to stage failure: Task 169 in stage 97.0 failed 4 times, most recent failure: Lost task 169.3 in stage 97.0 (TID 15079, prwor-e414c813.azcloud.local, executor 2): java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:391) at org.xerial.snappy.Snappy.uncompress(Snappy.java:427) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:159) at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1280) at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:54) at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:148) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:416) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:117) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170) at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90) at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Why is this happenig if compression is turned off? Could it be compression is used anyway during shuffle phases?
The cluster has the following characteristics:
Each node has:
These are the YARN settings for memory (GB):
yarn.nodemanager.resource.memory-mb | 84 |
yarn.scheduler.minimum-allocation-mb | 12 |
yarn.scheduler.maximum-allocation-mb | 84 |
mapreduce.map.memory.mb | 6 |
mapreduce.reduce.memory.mb | 12 |
mapreduce.map.java.opts | 4,8 |
mapreduce.reduce.java.opts | 9,6 |
yarn.app.mapreduce.am.resource.mb | 6 |
yarn.app.mapreduce.am.command-opts | 4,8 |
yarn.scheduler.maximum-allocation-vcores | 5 |
SPARK on YARN settings:
SPARK job submission settings:
Has anyone any hint on why is this happening?
Created 08-06-2018 09:41 AM