Member since
07-12-2016
11
Posts
3
Kudos Received
0
Solutions
03-26-2018
12:44 AM
@Dongjoon Hyun : Thanks for sharing the additional information. No, I was not setting the value `spark.sql.orc.impl=native`. Upon setting that configuration value it works and I was able to write ORC and it also does not crashes if the data stops coming on Kafka topic. And yes, my SW stack for this test was : Spark 2.3 , Hadoop 2.7 and Kafka 0.10 So to summarize based on your feedback: * We cannot persist data in ORC format using Spark Structured Streaming (via DataStreamWriter) in Spark 2.2 * ORC support for Spark Structured Streaming really starts from Spark 2.3. To persist data in ORC format using Spark Structured Streaming we need to use Spark 2.3 and also set the spark configuration `spark.sql.orc.impl=native`. Based on my understanding when we set the config `spark.sql.orc.impl=native` it uses Apache ORC implementation vs. the old Hive ORC implementation used otherwise. In my simple test I was able to write some simple test data using the above config and read it back with Hive 1.2.1 (which I believe does not uses Apache ORC implementation). However it would be nice to know if there are any known incompatibility issues between the usage of Apache ORC vs. the Hive 1.2.1 ORC i.e for example, if the data written using the Apache ORC can always be read back using the Hive ORC in Hive 1.2.1. Again, thanks for looking into this and providing the relevant information. Much appreciated.
... View more
03-23-2018
11:23 PM
@Dongjoon Hyun : Thanks for sharing the JIRA tickets for the issues. Running into the same exception with Spark 2.3 in certain situations. My use case is reading from Kafka in a streaming manner and persist to HDFS. Observation: As long as the data is coming on Kafka topic it works and writes ORC. As soon as the data stops coming it runs into the same exception as above. This behavior is different when we use other formats to persist like JSON or CSV. Spark streaming process just waits for the data to arrive and does not crashes when other formats are used but it crashes with format as ORC if the data stops coming. 2018-03-24 00:08:58 INFO KafkaSourceRDD:54 - Beginning offset 58 is the same as ending offset skipping sgtest2 0 2018-03-24 00:08:58 ERROR Utils:91 - Aborting task java.io.FileNotFoundException: File does not exist: hdfs://nn:8020/user/sanjay_gurnani/test/part-00000-75701137-f5e3-4c88-861e-7de2ed561ef0-c000.snappy.orc at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) ...
... View more
03-23-2018
09:13 PM
@Felix Albani : Thanks for looking and responding. What you are using to write is a DataFrameWriter and that works. But the original question is related to persisting using the DataStreamWriter. You are using the write() method vs. when you are using the structured streaming you need to use writeStream() to persist and that is where I am having issues with saving it into ORC format. Any feedback or known issues surrounding this particular use case? If we look at Spark doc here: https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html and scroll down to Output Sinks we can see it says you can write orc, json, csv etc.
File sink - Stores the output to a directory. writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
.start() I have been able to write data as cvs, json and parquet but not able to write as "orc".
... View more
03-23-2018
07:11 PM
I am trying to read some data from a source and persist it in ORC format on file sink using Spark 2.2 Structured Streaming. I have the SparkSession created with enableHiveSupport(). Let's say you have a data frame that you read from the streaming source and you want to write it as follows: ds.writeStream().format("orc") ; /* This fails */ Same code with following options works: ds.writeStream().format("json"); /* This works */ ds.writeStream().format("cvs"); /* This works */ ORC failed with following error: 18/03/23 18:26:07 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Task failed while writing rows at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xx-nn-host:8020/user/xxx.../part-00000-e26bd37a-0f0b-4d03-8d07-27b35073859c-c000.snappy.orc at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261) ... 8 more Is this a know issue or I need to do something differently to persist ORC. Any feedback or help is highly appreciated.
... View more
Labels:
- Labels:
-
Apache Spark
07-12-2016
04:29 PM
Thank you @lmccay for the response and detailed explanation. Highly appreciated.
... View more
07-12-2016
08:37 AM
1 Kudo
You can create a password alias using the following example hadoop command: hadoop credential create pwalias -provider jceks://hdfs/tmp/test.jceks It prompts you to enter a password and stores it in the provider location. Question: What is the default encryption or hashing algorithm it uses to store your password? I looked but I did not find any documentation for this or the default credential provider implementation used by hadoop. If you know can you please share any information or point in the right direction. Needed for a security audit.
... View more
Labels: