Support Questions

Find answers, ask questions, and share your expertise

Spark on YARN: Snappy java.io.IOException: FAILED_TO_UNCOMPRESS

avatar
Expert Contributor

Hello everyone, I have a Spark application which runs fine with test tables but fails in production where there are tables with 200 million records and about 100 columns. From the logs the error seems related to Snappy codec, although these tables have been saved in Parquet without compression, and also at write time I have explicitly turned off compression with: 

 

 

sqlContext.sql("SET hive.exec.compress.output=false")
sqlContext.sql("SET parquet.compression=NONE")
sqlContext.sql("SET spark.sql.parquet.compression.codec=uncompressed")

 

 

The error is the following:

 

 

2018-08-01 16:19:45,467 [dag-scheduler-event-loop] INFO  org.apache.spark.scheduler.DAGScheduler  - ShuffleMapStage 183 (saveAsTable at Model1Prep.scala:776) failed in 543.126 s due to Job aborted due to stage failure: Task 169 in stage 97.0 failed 4 times, most recent failure: Lost task 169.3 in stage 97.0 (TID 15079, prwor-e414c813.azcloud.local, executor 2): java.io.IOException: FAILED_TO_UNCOMPRESS(5)
	at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
	at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
	at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:391)
	at org.xerial.snappy.Snappy.uncompress(Snappy.java:427)
	at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
	at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
	at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
	at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:159)
	at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1280)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:54)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:148)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:416)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:117)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170)
	at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90)
	at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

 

 

Why is this happenig if compression is turned off? Could it be compression is used anyway during shuffle phases?

 

 

The cluster has the following characteristics:

  • 2 master nodes
  • 7 worker nodes

 

Each node has:

  • cpu: 16 cores
  • ram: 110GB
  • hdfs disks: 4x1TB

 

These are the YARN settings for memory (GB):

 

yarn.nodemanager.resource.memory-mb84
yarn.scheduler.minimum-allocation-mb12
yarn.scheduler.maximum-allocation-mb84
mapreduce.map.memory.mb6
mapreduce.reduce.memory.mb12
mapreduce.map.java.opts4,8
mapreduce.reduce.java.opts9,6
yarn.app.mapreduce.am.resource.mb6
yarn.app.mapreduce.am.command-opts4,8
yarn.scheduler.maximum-allocation-vcores5

 

 

SPARK on YARN settings:

  • spark.shuffle.service.enabled: ENABLED
  • spark.dynamicAllocation.enabled: ENABLED

 

SPARK job submission settings: 

  • --driver-memory 30G
  • --executor-cores 5
  • --executor-memory 30G

 

Has anyone any hint on why is this happening?

 

 

 

 

 

 

 

 

1 REPLY 1

avatar
New Contributor