Support Questions

Find answers, ask questions, and share your expertise

Is there a issue with saving ORC data with Spark Structured Streaming in Spark 2.2

avatar
Contributor

I am trying to read some data from a source and persist it in ORC format on file sink using Spark 2.2 Structured Streaming.

I have the SparkSession created with enableHiveSupport().

Let's say you have a data frame that you read from the streaming source and you want to write it as follows:

ds.writeStream().format("orc") ; /* This fails */

Same code with following options works:

ds.writeStream().format("json"); /* This works */

ds.writeStream().format("cvs"); /* This works */

ORC failed with following error:

18/03/23 18:26:07 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

org.apache.spark.SparkException: Task failed while writing rows

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:108)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xx-nn-host:8020/user/xxx.../part-00000-e26bd37a-0f0b-4d03-8d07-27b35073859c-c000.snappy.orc

at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)

at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)

at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)

at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)

at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)

at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)

at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)

... 8 more

Is this a know issue or I need to do something differently to persist ORC. Any feedback or help is highly appreciated.

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Although it seems that you are hitting output format issue, ORC is tested properly after SPARK-22781.

As one example, `FileNotFoundException` might occur because of empty dataframe. (SPARK-15474)

There are more ORC issue before Apache Spark 2.3. Please see SPARK-20901 for the full list.

View solution in original post

10 REPLIES 10

avatar
New Contributor

@Sanjay Gurnani

I have meet the same problem as you, and resolved by setting the spark config `spark.sql.orc.impl=native`

but when i read data by creating external hive table, it occured an erros like this

Bad status for request TFetchResultsReq(fetchType=0, operationHandle=TOperationHandle(hasResultSet=True, modifiedRowCount=None, operationType=0, operationId=THandleIdentifier(secret='71\xc7\xc6\x14\xc5NZ\x94W\x881\x15\x9b\xbf\xd4', guid='\xfe*}z\x7f5O\xae\x93\x1a\x80P\xd9\xba\xb8~')), orientation=4, maxRows=100): TFetchResultsResp(status=TStatus(errorCode=0, errorMessage='java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text', sqlState=None, infoMessages=['*org.apache.hive.service.cli.HiveSQLException:java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:25:24', 'org.apache.hive.service.cli.operation.SQLOperation:getNextRowSet:SQLOperation.java:366', 'org.apache.hive.service.cli.operation.OperationManager:getOperationNextRowSet:OperationManager.java:275', 'org.apache.hive.service.cli.session.HiveSessionImpl:fetchResults:HiveSessionImpl.java:751', 'sun.reflect.GeneratedMethodAccessor20:invoke::-1', 'sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccessorImpl.java:43', 'java.lang.reflect.Method:invoke:Method.java:498', 'org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:78', 'org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSessionProxy.java:36', 'org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSessionProxy.java:63', 'java.security.AccessController:doPrivileged:AccessController.java:-2', 'javax.security.auth.Subject:doAs:Subject.java:422', 'org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformation.java:1693', 'org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:59', 'com.sun.proxy.$Proxy19:fetchResults::-1', 'org.apache.hive.service.cli.CLIService:fetchResults:CLIService.java:438', 'org.apache.hive.service.cli.thrift.ThriftCLIService:FetchResults:ThriftCLIService.java:689', 'org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults:getResult:TCLIService.java:1553', 'org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults:getResult:TCLIService.java:1538', 'org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39', 'org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39', 'org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56', 'org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:285', 'java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1149', 'java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:624', 'java.lang.Thread:run:Thread.java:748', '*java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:27:2', 'org.apache.hadoop.hive.ql.exec.FetchTask:fetch:FetchTask.java:152', 'org.apache.hadoop.hive.ql.Driver:getResults:Driver.java:1786', 'org.apache.hive.service.cli.operation.SQLOperation:getNextRowSet:SQLOperation.java:361', '*org.apache.hadoop.hive.ql.metadata.HiveException:java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:36:9', 'org.apache.hadoop.hive.ql.exec.ListSinkOperator:processOp:ListSinkOperator.java:90', 'org.apache.hadoop.hive.ql.exec.Operator:forward:Operator.java:815', 'org.apache.hadoop.hive.ql.exec.LimitOperator:processOp:LimitOperator.java:51', 'org.apache.hadoop.hive.ql.exec.Operator:forward:Operator.java:815', 'org.apache.hadoop.hive.ql.exec.SelectOperator:processOp:SelectOperator.java:84', 'org.apache.hadoop.hive.ql.exec.Operator:forward:Operator.java:815', 'org.apache.hadoop.hive.ql.exec.TableScanOperator:processOp:TableScanOperator.java:95', 'org.apache.hadoop.hive.ql.exec.FetchOperator:pushRow:FetchOperator.java:424', 'org.apache.hadoop.hive.ql.exec.FetchOperator:pushRow:FetchOperator.java:416', 'org.apache.hadoop.hive.ql.exec.FetchTask:fetch:FetchTask.java:138', '*java.lang.ClassCastException:org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:41:5', 'org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector:getPrimitiveJavaObject:WritableStringObjectInspector.java:46', 'org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector:getPrimitiveJavaObject:WritableStringObjectInspector.java:26', 'org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils:copyToStandardObject:ObjectInspectorUtils.java:336', 'org.apache.hadoop.hive.serde2.SerDeUtils:toThriftPayload:SerDeUtils.java:167', 'org.apache.hadoop.hive.ql.exec.FetchFormatter$ThriftFormatter:convert:FetchFormatter.java:61', 'org.apache.hadoop.hive.ql.exec.ListSinkOperator:processOp:ListSinkOperator.java:87'], statusCode=3), results=None, hasMoreRows=None)

my hive version is 1.1.0-cdh5.7.1

and my create table sql is like this

create external table orc_test (

key string,

value string,

topic string,

partition int,

offset bigint,

timestamp timestamp,

timestampType int )

STORED AS ORC

LOCATION '/data/orc_test';

could you show me some detail on reading data by hive sql

thanks