Created 03-23-2018 07:11 PM
I am trying to read some data from a source and persist it in ORC format on file sink using Spark 2.2 Structured Streaming.
I have the SparkSession created with enableHiveSupport().
Let's say you have a data frame that you read from the streaming source and you want to write it as follows:
ds.writeStream().format("orc") ; /* This fails */
Same code with following options works:
ds.writeStream().format("json"); /* This works */
ds.writeStream().format("cvs"); /* This works */
ORC failed with following error:
18/03/23 18:26:07 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xx-nn-host:8020/user/xxx.../part-00000-e26bd37a-0f0b-4d03-8d07-27b35073859c-c000.snappy.orc
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
... 8 more
Is this a know issue or I need to do something differently to persist ORC. Any feedback or help is highly appreciated.
Created 03-23-2018 09:21 PM
Although it seems that you are hitting output format issue, ORC is tested properly after SPARK-22781.
As one example, `FileNotFoundException` might occur because of empty dataframe. (SPARK-15474)
There are more ORC issue before Apache Spark 2.3. Please see SPARK-20901 for the full list.
Created 10-15-2018 10:16 AM
I have meet the same problem as you, and resolved by setting the spark config `spark.sql.orc.impl=native`
but when i read data by creating external hive table, it occured an erros like this
Bad status for request TFetchResultsReq(fetchType=0, operationHandle=TOperationHandle(hasResultSet=True, modifiedRowCount=None, operationType=0, operationId=THandleIdentifier(secret='71\xc7\xc6\x14\xc5NZ\x94W\x881\x15\x9b\xbf\xd4', guid='\xfe*}z\x7f5O\xae\x93\x1a\x80P\xd9\xba\xb8~')), orientation=4, maxRows=100): TFetchResultsResp(status=TStatus(errorCode=0, errorMessage='java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text', sqlState=None, infoMessages=['*org.apache.hive.service.cli.HiveSQLException:java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:25:24', 'org.apache.hive.service.cli.operation.SQLOperation:getNextRowSet:SQLOperation.java:366', 'org.apache.hive.service.cli.operation.OperationManager:getOperationNextRowSet:OperationManager.java:275', 'org.apache.hive.service.cli.session.HiveSessionImpl:fetchResults:HiveSessionImpl.java:751', 'sun.reflect.GeneratedMethodAccessor20:invoke::-1', 'sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccessorImpl.java:43', 'java.lang.reflect.Method:invoke:Method.java:498', 'org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:78', 'org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSessionProxy.java:36', 'org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSessionProxy.java:63', 'java.security.AccessController:doPrivileged:AccessController.java:-2', 'javax.security.auth.Subject:doAs:Subject.java:422', 'org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformation.java:1693', 'org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:59', 'com.sun.proxy.$Proxy19:fetchResults::-1', 'org.apache.hive.service.cli.CLIService:fetchResults:CLIService.java:438', 'org.apache.hive.service.cli.thrift.ThriftCLIService:FetchResults:ThriftCLIService.java:689', 'org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults:getResult:TCLIService.java:1553', 'org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults:getResult:TCLIService.java:1538', 'org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39', 'org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39', 'org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56', 'org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:285', 'java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1149', 'java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:624', 'java.lang.Thread:run:Thread.java:748', '*java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:27:2', 'org.apache.hadoop.hive.ql.exec.FetchTask:fetch:FetchTask.java:152', 'org.apache.hadoop.hive.ql.Driver:getResults:Driver.java:1786', 'org.apache.hive.service.cli.operation.SQLOperation:getNextRowSet:SQLOperation.java:361', '*org.apache.hadoop.hive.ql.metadata.HiveException:java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:36:9', 'org.apache.hadoop.hive.ql.exec.ListSinkOperator:processOp:ListSinkOperator.java:90', 'org.apache.hadoop.hive.ql.exec.Operator:forward:Operator.java:815', 'org.apache.hadoop.hive.ql.exec.LimitOperator:processOp:LimitOperator.java:51', 'org.apache.hadoop.hive.ql.exec.Operator:forward:Operator.java:815', 'org.apache.hadoop.hive.ql.exec.SelectOperator:processOp:SelectOperator.java:84', 'org.apache.hadoop.hive.ql.exec.Operator:forward:Operator.java:815', 'org.apache.hadoop.hive.ql.exec.TableScanOperator:processOp:TableScanOperator.java:95', 'org.apache.hadoop.hive.ql.exec.FetchOperator:pushRow:FetchOperator.java:424', 'org.apache.hadoop.hive.ql.exec.FetchOperator:pushRow:FetchOperator.java:416', 'org.apache.hadoop.hive.ql.exec.FetchTask:fetch:FetchTask.java:138', '*java.lang.ClassCastException:org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text:41:5', 'org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector:getPrimitiveJavaObject:WritableStringObjectInspector.java:46', 'org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector:getPrimitiveJavaObject:WritableStringObjectInspector.java:26', 'org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils:copyToStandardObject:ObjectInspectorUtils.java:336', 'org.apache.hadoop.hive.serde2.SerDeUtils:toThriftPayload:SerDeUtils.java:167', 'org.apache.hadoop.hive.ql.exec.FetchFormatter$ThriftFormatter:convert:FetchFormatter.java:61', 'org.apache.hadoop.hive.ql.exec.ListSinkOperator:processOp:ListSinkOperator.java:87'], statusCode=3), results=None, hasMoreRows=None)
my hive version is 1.1.0-cdh5.7.1
and my create table sql is like this
create external table orc_test (
key string,
value string,
topic string,
partition int,
offset bigint,
timestamp timestamp,
timestampType int )
STORED AS ORC
LOCATION '/data/orc_test';
could you show me some detail on reading data by hive sql
thanks