Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark2 save insert data to Hive with snappy compression

SOLVED Go to solution

Spark2 save insert data to Hive with snappy compression

New Contributor

I'm trying to create Hive table with snappy compression via Spark2.

 

CDH 5.14

SLE12

 

simple command is

spark.sqlContext.setConf("spark.sql.parquet.compression.codec","snappy") 
sql("CREATE TABLE parquet_table_name (x INT, y STRING) STORED AS PARQUET")
sql("INSERT INTO parquet_table_name VALUES(1, 'test')")

Then error as

[Stage 0:>                                                          (0 + 1) / 1]18/04/26 21:03:44 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, master3-1.hadoop2.moph.go.th, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
	at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
	at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
	at parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67)
	at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
	at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
	at parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112)
	at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:89)
	at parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:153)
	at parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:241)
	at parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:126)
	at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:159)
	at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:111)
	at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:396)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	... 8 more

18/04/26 21:03:44 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
18/04/26 21:03:44 ERROR datasources.FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, master3-1.hadoop2.moph.go.th, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
	at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
	at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
	at parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67)
	at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
	at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
	at parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112)
	at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:89)
	at parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:153)
	at parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:241)
	at parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:126)
	at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:159)
	at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:111)
	at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:396)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	... 8 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
	at $line23.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:24)
	at $line23.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:29)
	at $line23.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31)
	at $line23.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)
	at $line23.$read$$iw$$iw$$iw$$iw.<init>(<console>:35)
	at $line23.$read$$iw$$iw$$iw.<init>(<console>:37)
	at $line23.$read$$iw$$iw.<init>(<console>:39)
	at $line23.$read$$iw.<init>(<console>:41)
	at $line23.$read.<init>(<console>:43)
	at $line23.$read$.<init>(<console>:47)
	at $line23.$read$.<clinit>(<console>)
	at $line23.$eval$.$print$lzycompute(<console>:7)
	at $line23.$eval$.$print(<console>:6)
	at $line23.$eval.$print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
	at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
	at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
	at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
	at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
	at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
	at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
	at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
	at org.apache.spark.repl.Main$.doMain(Main.scala:76)
	at org.apache.spark.repl.Main$.main(Main.scala:56)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:892)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
	at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
	at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
	at parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67)
	at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
	at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
	at parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112)
	at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:89)
	at parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:153)
	at parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:241)
	at parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:126)
	at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:159)
	at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:111)
	at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:396)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	... 8 more

But samething work if set compression with gzip format.

1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: Spark2 save insert data to Hive with snappy compression

Expert Contributor

@jirapong this is a known issue which we've recently seen in CDS 2.3

On Spark 2.3 the nativeLoader (SnappyNativeLoader’s) parentClassLoader is now an ExecutorClassLoader , whereas the parentClassLoader was a Launcher$ExtClassLoader prior to Spark 2.3. This created incompatibility with the snappy version (snappy-java.1.0.4.1) packaged with CDH.

 

We are currently working on a solution in a future release, but there are two workarounds:

1) Use a later version of the Snappy library, which works with the above-mentioned class loader change, for example, snappy-java-1.1.4.

Place the new snappy-java library on a local file system (for example /var/snappy). Then run your spark application with the user classpath options as shown below:

spark2-shell --jars /var/snappy/snappy-java-1.1.4.jar --conf spark.userClassspathFirst=true --conf spark.executor.extraClassPath="./snappy-java-1.1.4.jar"

2)  Instead of using Snappy, you can set the compression by changing the codec to LZ4 or UNCOMPRESSED (which you've already tested).

 

1 REPLY 1
Highlighted

Re: Spark2 save insert data to Hive with snappy compression

Expert Contributor

@jirapong this is a known issue which we've recently seen in CDS 2.3

On Spark 2.3 the nativeLoader (SnappyNativeLoader’s) parentClassLoader is now an ExecutorClassLoader , whereas the parentClassLoader was a Launcher$ExtClassLoader prior to Spark 2.3. This created incompatibility with the snappy version (snappy-java.1.0.4.1) packaged with CDH.

 

We are currently working on a solution in a future release, but there are two workarounds:

1) Use a later version of the Snappy library, which works with the above-mentioned class loader change, for example, snappy-java-1.1.4.

Place the new snappy-java library on a local file system (for example /var/snappy). Then run your spark application with the user classpath options as shown below:

spark2-shell --jars /var/snappy/snappy-java-1.1.4.jar --conf spark.userClassspathFirst=true --conf spark.executor.extraClassPath="./snappy-java-1.1.4.jar"

2)  Instead of using Snappy, you can set the compression by changing the codec to LZ4 or UNCOMPRESSED (which you've already tested).