Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

spark2 upgrade to 2.3.0 from 2.2.0 wont read or write snappy compressed parquet

avatar
New Contributor

This seems like a really simple case of wrong classpaths, and the wrong version of snappy being brought in, but I can't work out how/why!

I have an upgraded CDH5.14.2 (upgraded from 5.13.2), spark2.3.0(upgraded from 2.2.0 cloudera2, including the csd)

I do the following in a spark2-shell;

 

val df = spark.read.parquet("/inputdata/parquet/parquet*snappy*")

scala> df.show

 

And then just get the following error.... Any ideas how to debug/fix?

I did all this on an offline platform, but have now replicated on a fresh online install.

Help!


WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 4, fozzy, executor 1): java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLength(Ljava/nio/ByteBuffer;II)I
at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:561)
at parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62)
at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
at parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary.<init>(PlainValuesDictionary.java:237)
at parquet.column.Encoding$1.initDictionary(Encoding.java:100)
at parquet.column.Encoding$4.initDictionary(Encoding.java:149)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.<init>(VectorizedColumnReader.java:114)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:312)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:258)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:161)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

18/04/25 21:11:12 ERROR scheduler.TaskSetManager: Task 0 in stage 3.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 7, fozzy, executor 1): java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLength(Ljava/nio/ByteBuffer;II)I
at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:561)
at parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62)
at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
at parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary.<init>(PlainValuesDictionary.java:237)
at parquet.column.Encoding$1.initDictionary(Encoding.java:100)
at parquet.column.Encoding$4.initDictionary(Encoding.java:149)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.<init>(VectorizedColumnReader.java:114)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:312)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:258)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:161)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
... 49 elided
Caused by: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLength(Ljava/nio/ByteBuffer;II)I
at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:561)
at parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62)
at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
at parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary.<init>(PlainValuesDictionary.java:237)
at parquet.column.Encoding$1.initDictionary(Encoding.java:100)
at parquet.column.Encoding$4.initDictionary(Encoding.java:149)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.<init>(VectorizedColumnReader.java:114)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:312)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:258)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:161)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Hello,

 

Support fixed it for me :

 

The workaround for this is the following:  
 
copy snappy-java-1.1.4.jar to /opt/cloudera/parcels/SPARK2/lib/spark2/jars/ on each node where such executors are running. That can be downloaded from http://repo1.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.4/snappy-java-1.1.4.jar.
 
Tested - confirmed working

View solution in original post

7 REPLIES 7

avatar
Expert Contributor

Hello,

 

Support fixed it for me :

 

The workaround for this is the following:  
 
copy snappy-java-1.1.4.jar to /opt/cloudera/parcels/SPARK2/lib/spark2/jars/ on each node where such executors are running. That can be downloaded from http://repo1.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.4/snappy-java-1.1.4.jar.
 
Tested - confirmed working

avatar
Cloudera Employee

Can you please confirm that you are using spark-shell as opposed to spark-submit

avatar
New Contributor

Hiya, yes it was spark2-shell.

The answer ChrisV has given is the fix, I'm so surprised the spark2.3.0 parcel didn't have this included!

avatar
New Contributor
Perfect! Thank you Chris, definitely solved. I thought I had caused it, but obviously based on this fix the spark2.3.0 parcel is just incomplete.

avatar
Expert Contributor

Glad I could help.

avatar
New Contributor

 

This works also without copying the jar to all nodes:

 

wget http://repo1.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.4/snappy-java-1.1.4.jar
spark2-shell --jars snappy-java-1.1.4.jar --conf spark.executor.userClassPathFirst=true --conf spark.executor.extraClassPath=snappy-java-1.1.4.jar

 

avatar
New Contributor

According to https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html#KI_spark2_CDH-6..., the resolution is to upgrade to CDS 2.3 Release 3, which contains the fix.