Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Is there a issue with saving ORC data with Spark Structured Streaming in Spark 2.2

avatar
New Member

I am trying to read some data from a source and persist it in ORC format on file sink using Spark 2.2 Structured Streaming.

I have the SparkSession created with enableHiveSupport().

Let's say you have a data frame that you read from the streaming source and you want to write it as follows:

ds.writeStream().format("orc") ; /* This fails */

Same code with following options works:

ds.writeStream().format("json"); /* This works */

ds.writeStream().format("cvs"); /* This works */

ORC failed with following error:

18/03/23 18:26:07 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

org.apache.spark.SparkException: Task failed while writing rows

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:108)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xx-nn-host:8020/user/xxx.../part-00000-e26bd37a-0f0b-4d03-8d07-27b35073859c-c000.snappy.orc

at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)

at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)

at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)

at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)

at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)

at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)

at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)

... 8 more

Is this a know issue or I need to do something differently to persist ORC. Any feedback or help is highly appreciated.

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Although it seems that you are hitting output format issue, ORC is tested properly after SPARK-22781.

As one example, `FileNotFoundException` might occur because of empty dataframe. (SPARK-15474)

There are more ORC issue before Apache Spark 2.3. Please see SPARK-20901 for the full list.

View solution in original post

10 REPLIES 10

avatar
New Member

@Sanjay Gurnani

I have meet the same problem as you, and resolved by setting the spark config `spark.sql.orc.impl=native`

but when i read data by creating external hive table, it occured an erros like this

Bad status for request TFetchResultsReq(fetchType=0, operationHandle=TOperationHandle(hasResultSet=True, modifiedRowCount=None, operationType=0, operationId=THandleIdentifier(secret='71\xc7\xc6\x14\xc5NZ\x94W\x881\x15\x9b\xbf\xd4', guid='\xfe*}z\x7f5O\xae\x93\x1a\x80P\xd9\xba\xb8~')), orientation=4, maxRows=100): TFetchResultsResp(status=TStatus(errorCode=0, errorMessage='java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text', sqlState=None, infoMessages=['*org.apache.hive.service.cli.HiveSQLException:java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:25:24', 'org.apache.hive.service.cli.operation.SQLOperation:getNextRowSet:SQLOperation.java:366', 'org.apache.hive.service.cli.operation.OperationManager:getOperationNextRowSet:OperationManager.java:275', 'org.apache.hive.service.cli.session.HiveSessionImpl:fetchResults:HiveSessionImpl.java:751', 'sun.reflect.GeneratedMethodAccessor20:invoke::-1', 'sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccessorImpl.java:43', 'java.lang.reflect.Method:invoke:Method.java:498', 'org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:78', 'org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSessionProxy.java:36', 'org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSessionProxy.java:63', 'java.security.AccessController:doPrivileged:AccessController.java:-2', 'javax.security.auth.Subject:doAs:Subject.java:422', 'org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformation.java:1693', 'org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:59', 'com.sun.proxy.$Proxy19:fetchResults::-1', 'org.apache.hive.service.cli.CLIService:fetchResults:CLIService.java:438', 'org.apache.hive.service.cli.thrift.ThriftCLIService:FetchResults:ThriftCLIService.java:689', 'org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults:getResult:TCLIService.java:1553', 'org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults:getResult:TCLIService.java:1538', 'org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39', 'org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39', 'org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56', 'org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:285', 'java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1149', 'java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:624', 'java.lang.Thread:run:Thread.java:748', '*java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:27:2', 'org.apache.hadoop.hive.ql.exec.FetchTask:fetch:FetchTask.java:152', 'org.apache.hadoop.hive.ql.Driver:getResults:Driver.java:1786', 'org.apache.hive.service.cli.operation.SQLOperation:getNextRowSet:SQLOperation.java:361', '*org.apache.hadoop.hive.ql.metadata.HiveException:java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:36:9', 'org.apache.hadoop.hive.ql.exec.ListSinkOperator:processOp:ListSinkOperator.java:90', 'org.apache.hadoop.hive.ql.exec.Operator:forward:Operator.java:815', 'org.apache.hadoop.hive.ql.exec.LimitOperator:processOp:LimitOperator.java:51', 'org.apache.hadoop.hive.ql.exec.Operator:forward:Operator.java:815', 'org.apache.hadoop.hive.ql.exec.SelectOperator:processOp:SelectOperator.java:84', 'org.apache.hadoop.hive.ql.exec.Operator:forward:Operator.java:815', 'org.apache.hadoop.hive.ql.exec.TableScanOperator:processOp:TableScanOperator.java:95', 'org.apache.hadoop.hive.ql.exec.FetchOperator:pushRow:FetchOperator.java:424', 'org.apache.hadoop.hive.ql.exec.FetchOperator:pushRow:FetchOperator.java:416', 'org.apache.hadoop.hive.ql.exec.FetchTask:fetch:FetchTask.java:138', '*java.lang.ClassCastException:org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:41:5', 'org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector:getPrimitiveJavaObject:WritableStringObjectInspector.java:46', 'org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector:getPrimitiveJavaObject:WritableStringObjectInspector.java:26', 'org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils:copyToStandardObject:ObjectInspectorUtils.java:336', 'org.apache.hadoop.hive.serde2.SerDeUtils:toThriftPayload:SerDeUtils.java:167', 'org.apache.hadoop.hive.ql.exec.FetchFormatter$ThriftFormatter:convert:FetchFormatter.java:61', 'org.apache.hadoop.hive.ql.exec.ListSinkOperator:processOp:ListSinkOperator.java:87'], statusCode=3), results=None, hasMoreRows=None)

my hive version is 1.1.0-cdh5.7.1

and my create table sql is like this

create external table orc_test (

key string,

value string,

topic string,

partition int,

offset bigint,

timestamp timestamp,

timestampType int )

STORED AS ORC

LOCATION '/data/orc_test';

could you show me some detail on reading data by hive sql

thanks