Support Questions

Find answers, ask questions, and share your expertise

Is there a issue with saving ORC data with Spark Structured Streaming in Spark 2.2

avatar
Contributor

I am trying to read some data from a source and persist it in ORC format on file sink using Spark 2.2 Structured Streaming.

I have the SparkSession created with enableHiveSupport().

Let's say you have a data frame that you read from the streaming source and you want to write it as follows:

ds.writeStream().format("orc") ; /* This fails */

Same code with following options works:

ds.writeStream().format("json"); /* This works */

ds.writeStream().format("cvs"); /* This works */

ORC failed with following error:

18/03/23 18:26:07 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

org.apache.spark.SparkException: Task failed while writing rows

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:108)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xx-nn-host:8020/user/xxx.../part-00000-e26bd37a-0f0b-4d03-8d07-27b35073859c-c000.snappy.orc

at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)

at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)

at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)

at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)

at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)

at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)

at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)

... 8 more

Is this a know issue or I need to do something differently to persist ORC. Any feedback or help is highly appreciated.

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Although it seems that you are hitting output format issue, ORC is tested properly after SPARK-22781.

As one example, `FileNotFoundException` might occur because of empty dataframe. (SPARK-15474)

There are more ORC issue before Apache Spark 2.3. Please see SPARK-20901 for the full list.

View solution in original post

10 REPLIES 10

avatar

@Sanjay Gurnani

On my experience this works fine on spark 2.2 using DataFrame. With scala I will do like this:

scala> df.write.format("orc").save("top10Salaries-Repartition1”)

Then on hdfs user home directory I can see it created:

[root@falbani-hdpmgmt bin]# hdfs dfs -ls top10Salaries-Repartition1/

Found 2 items

-rw-r--r-- 3 falbani falbani top10Salaries-Repartition1/_SUCCESS

-rw-r--r-- 3 falbani falbani top10Salaries-Repartition1/part-00000-445ad80a-8d18-4ba5-859e-cebdb8443e62.snappy.orc

Maybe you can use Dataframe API.

HTH

avatar
Contributor

@Felix Albani : Thanks for looking and responding. What you are using to write is a DataFrameWriter and that works. But the original question is related to persisting using the DataStreamWriter.

You are using the write() method vs. when you are using the structured streaming you need to use writeStream() to persist and that is where I am having issues with saving it into ORC format. Any feedback or known issues surrounding this particular use case?

If we look at Spark doc here: https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html

and scroll down to Output Sinks we can see it says you can write orc, json, csv etc.

  • File sink - Stores the output to a directory.
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()

I have been able to write data as cvs, json and parquet but not able to write as "orc".

avatar
Expert Contributor

Hi, @Sanjay Gurnani

Officially, Apache Spark 2.2.1 Structured Streaming document doesn't mention ORC properly. Apache Spark 2.3 document starts to include ORC.

- http://spark.apache.org/docs/2.2.1/structured-streaming-programming-guide.html

avatar
Expert Contributor

Although it seems that you are hitting output format issue, ORC is tested properly after SPARK-22781.

As one example, `FileNotFoundException` might occur because of empty dataframe. (SPARK-15474)

There are more ORC issue before Apache Spark 2.3. Please see SPARK-20901 for the full list.

avatar
Contributor

@Dongjoon Hyun : Thanks for sharing the JIRA tickets for the issues. Running into the same exception with Spark 2.3 in certain situations.

My use case is reading from Kafka in a streaming manner and persist to HDFS. Observation: As long as the data is coming on Kafka topic it works and writes ORC. As soon as the data stops coming it runs into the same exception as above. This behavior is different when we use other formats to persist like JSON or CSV. Spark streaming process just waits for the data to arrive and does not crashes when other formats are used but it crashes with format as ORC if the data stops coming.

2018-03-24 00:08:58 INFO KafkaSourceRDD:54 - Beginning offset 58 is the same as ending offset skipping sgtest2 0

2018-03-24 00:08:58 ERROR Utils:91 - Aborting task

java.io.FileNotFoundException: File does not exist: hdfs://nn:8020/user/sanjay_gurnani/test/part-00000-75701137-f5e3-4c88-861e-7de2ed561ef0-c000.snappy.orc

at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)

at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)

...

avatar
Expert Contributor

Oh, is it? I'll try to reproduce your situation. Could you share more information about your sw stack? Apache Spark 2.3 on Hadoop 2.7 and Kafka?

Could you confirm that you are using new OrcFileFormat by setting `spark.sql.orc.impl=native`? The above bugs are fixed on new OrcFileFormat only.

avatar
Contributor

@Dongjoon Hyun : Thanks for sharing the additional information. No, I was not setting the value `spark.sql.orc.impl=native`. Upon setting that configuration value it works and I was able to write ORC and it also does not crashes if the data stops coming on Kafka topic.

And yes, my SW stack for this test was : Spark 2.3 , Hadoop 2.7 and Kafka 0.10

So to summarize based on your feedback:

* We cannot persist data in ORC format using Spark Structured Streaming (via DataStreamWriter) in Spark 2.2

* ORC support for Spark Structured Streaming really starts from Spark 2.3. To persist data in ORC format using Spark Structured Streaming we need to use Spark 2.3 and also set the spark configuration `spark.sql.orc.impl=native`.

Based on my understanding when we set the config `spark.sql.orc.impl=native` it uses Apache ORC implementation vs. the old Hive ORC implementation used otherwise.

In my simple test I was able to write some simple test data using the above config and read it back with Hive 1.2.1 (which I believe does not uses Apache ORC implementation). However it would be nice to know if there are any known incompatibility issues between the usage of Apache ORC vs. the Hive 1.2.1 ORC i.e for example, if the data written using the Apache ORC can always be read back using the Hive ORC in Hive 1.2.1.

Again, thanks for looking into this and providing the relevant information. Much appreciated.

avatar
Expert Contributor

Great! Thank you for sharing your experience too. Your summary and understanding is correct.

For Hive, since Hive 1.2.1 ORC writer and reader is too old, so it has some bugs of course. In general, it will read a new data correctly. For the best performance and safety, the latest Hive is recommended. Hive 2.3.0 starts to use Apache ORC.

For Apache ORC library, Apache Spark 2.3 was released with Apache ORC 1.4.1 due to some reasons. Please use with the latest one, Apache ORC 1.4.3, if possible. There is a known issue, SPARK-23340.

avatar

@Sanjay Gurnani

Looking at your issue in detail, seems you are facing problem committing the data when the there is no data. Otherwise, when you have data, it is working fine.

I am not sure if that is a bug since you are able to persist the data with other formats but I will check the source code to see why this peculiar behavior happen only to ORC.

In the meantime to get your code working, I would recommend checking if your dataset has some records before proceeding with the commit to the sink. This way you can escape having empty file issue, if you are getting any with Parquet etc, where the writeStream method is working.