Support Questions

Find answers, ask questions, and share your expertise

Spark2 save insert data to Hive with snappy compression

avatar
Explorer

I'm trying to create Hive table with snappy compression via Spark2.

 

CDH 5.14

SLE12

 

simple command is

spark.sqlContext.setConf("spark.sql.parquet.compression.codec","snappy") 
sql("CREATE TABLE parquet_table_name (x INT, y STRING) STORED AS PARQUET")
sql("INSERT INTO parquet_table_name VALUES(1, 'test')")

Then error as

[Stage 0:>                                                          (0 + 1) / 1]18/04/26 21:03:44 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, master3-1.hadoop2.moph.go.th, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
	at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
	at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
	at parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67)
	at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
	at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
	at parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112)
	at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:89)
	at parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:153)
	at parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:241)
	at parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:126)
	at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:159)
	at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:111)
	at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:396)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	... 8 more

18/04/26 21:03:44 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
18/04/26 21:03:44 ERROR datasources.FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, master3-1.hadoop2.moph.go.th, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
	at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
	at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
	at parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67)
	at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
	at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
	at parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112)
	at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:89)
	at parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:153)
	at parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:241)
	at parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:126)
	at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:159)
	at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:111)
	at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:396)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	... 8 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
	at $line23.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:24)
	at $line23.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:29)
	at $line23.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31)
	at $line23.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)
	at $line23.$read$$iw$$iw$$iw$$iw.<init>(<console>:35)
	at $line23.$read$$iw$$iw$$iw.<init>(<console>:37)
	at $line23.$read$$iw$$iw.<init>(<console>:39)
	at $line23.$read$$iw.<init>(<console>:41)
	at $line23.$read.<init>(<console>:43)
	at $line23.$read$.<init>(<console>:47)
	at $line23.$read$.<clinit>(<console>)
	at $line23.$eval$.$print$lzycompute(<console>:7)
	at $line23.$eval$.$print(<console>:6)
	at $line23.$eval.$print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
	at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
	at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
	at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
	at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
	at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
	at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
	at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
	at org.apache.spark.repl.Main$.doMain(Main.scala:76)
	at org.apache.spark.repl.Main$.main(Main.scala:56)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:892)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
	at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
	at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
	at parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67)
	at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
	at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
	at parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112)
	at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:89)
	at parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:153)
	at parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:241)
	at parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:126)
	at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:159)
	at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:111)
	at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:396)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	... 8 more

But samething work if set compression with gzip format.

1 ACCEPTED SOLUTION

avatar
Master Collaborator

@jirapong this is a known issue which we've recently seen in CDS 2.3

On Spark 2.3 the nativeLoader (SnappyNativeLoader’s) parentClassLoader is now an ExecutorClassLoader , whereas the parentClassLoader was a Launcher$ExtClassLoader prior to Spark 2.3. This created incompatibility with the snappy version (snappy-java.1.0.4.1) packaged with CDH.

 

We are currently working on a solution in a future release, but there are two workarounds:

1) Use a later version of the Snappy library, which works with the above-mentioned class loader change, for example, snappy-java-1.1.4.

Place the new snappy-java library on a local file system (for example /var/snappy). Then run your spark application with the user classpath options as shown below:

spark2-shell --jars /var/snappy/snappy-java-1.1.4.jar --conf spark.userClassspathFirst=true --conf spark.executor.extraClassPath="./snappy-java-1.1.4.jar"

2)  Instead of using Snappy, you can set the compression by changing the codec to LZ4 or UNCOMPRESSED (which you've already tested).

 

View solution in original post

1 REPLY 1

avatar
Master Collaborator

@jirapong this is a known issue which we've recently seen in CDS 2.3

On Spark 2.3 the nativeLoader (SnappyNativeLoader’s) parentClassLoader is now an ExecutorClassLoader , whereas the parentClassLoader was a Launcher$ExtClassLoader prior to Spark 2.3. This created incompatibility with the snappy version (snappy-java.1.0.4.1) packaged with CDH.

 

We are currently working on a solution in a future release, but there are two workarounds:

1) Use a later version of the Snappy library, which works with the above-mentioned class loader change, for example, snappy-java-1.1.4.

Place the new snappy-java library on a local file system (for example /var/snappy). Then run your spark application with the user classpath options as shown below:

spark2-shell --jars /var/snappy/snappy-java-1.1.4.jar --conf spark.userClassspathFirst=true --conf spark.executor.extraClassPath="./snappy-java-1.1.4.jar"

2)  Instead of using Snappy, you can set the compression by changing the codec to LZ4 or UNCOMPRESSED (which you've already tested).