Created on 11-10-2017 11:59 AM - edited 09-16-2022 05:30 AM
I'm trying to write a dataframe to a parquet hive table and keep getting an error saying that the table is HiveFileFormat and not ParquetFileFormat. The table is definitely a parquet table.
Here's how I'm creating the sparkSession:
val spark = SparkSession .builder() .config("spark.sql.warehouse.dir", "/user/hive/warehouse") .config("spark.sql.sources.maxConcurrentWrites","1") .config("spark.sql.parquet.compression.codec", "snappy") .config("hive.exec.dynamic.partition", "true") .config("hive.exec.dynamic.partition.mode", "nonstrict") .config("parquet.compression", "SNAPPY") .config("hive.exec.max.dynamic.partitions", "3000") .config("parquet.enable.dictionary", "false") .config("hive.support.concurrency", "true") .enableHiveSupport() .getOrCreate()
Here's the code that fails and the error message:
dataFrame.write.format("parquet").mode(saveMode).partitionBy(partitionCol).saveAsTable(tableName)
org.apache.spark.sql.AnalysisException: The format of the existing table tableName is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;
Here's the table storage info:
# Storage Information SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat Compressed: No Num Buckets: -1 Bucket Columns: [] Sort Columns: [] Storage Desc Params: field.delim \t line.delim \n serialization.format \t
This same code worked before upgrading to spark2, for some reason it isn't recognizing the table as parquet now.
Created 11-14-2017 06:45 AM
This is because 'saveAsTable()' doesn't currently works with Hive. It's documented here:
More context: This was recently reported with Spark2.2 and we are working internally to test a fix, however, I don't have a timeline on when/which future release will this be fixed. Till then, please see if the workaround in the above doc helps.
Created 11-13-2017 12:36 PM
After doing more testing I'm finding that this happens on tables that aren't even parquet tables and the write command isn't even specifying parquet as the write format. This error does not happen with the insertInto() command though. This is not a good work around because saveAsTable() checks column names wheras insertInto() is based on data order and that is not acceptable for the use case I have.
Table I'm trying to write to info:
$ hive -e "describe formatted test_parquet_spark" # col_name data_type comment col1 string col2 string # Detailed Table Information Database: default CreateTime: Fri Nov 10 22:54:20 GMT 2017 LastAccessTime: UNKNOWN Protect Mode: None Retention: 0 Table Type: MANAGED_TABLE # Storage Information SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Compressed: No Num Buckets: -1 Bucket Columns: [] Sort Columns: [] Storage Desc Params: serialization.format 1
Testing in spark2-shell:
scala> val spark = SparkSession.builder.config("spark.sql.warehouse.dir", "/user/hive/warehouse").enableHiveSupport.getOrCreate 17/11/13 20:17:05 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect. spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2524f9bf scala> val schema = new StructType().add(StructField("col1", StringType)).add(StructField("col2", StringType)) schema: org.apache.spark.sql.types.StructType = StructType(StructField(col1,StringType,true), StructField(col2,StringType,true)) scala> val dataRDD = spark.sparkContext.parallelize(Seq(Row("foobar", "barfoo"))) dataRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27 scala> val dataDF = spark.createDataFrame(dataRDD, schema) dataDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string] scala> dataDF.write.insertInto("test_parquet_spark") $ hive -e "select * from test_parquet_spark" OK foobar barfoo scala> dataDF.write.mode("append").saveAsTable("test_parquet_spark") org.apache.spark.sql.AnalysisException: The format of the existing table default.test_parquet_spark is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:115)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:75)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation.apply(rules.scala:75)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation.apply(rules.scala:71)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:420)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)
... 48 elided
Why does saveAsTable() not work while insertInto() does in this example?
And why does saveAsTable() not recognize parquet tables as being parquet tables?
If anyone knows what is going on I would really appreciate it. Thanks
Created 11-13-2017 12:47 PM
Also if it helps this is on Spark 2.2.0 and CDH5.12.0
Created 11-14-2017 06:45 AM
This is because 'saveAsTable()' doesn't currently works with Hive. It's documented here:
More context: This was recently reported with Spark2.2 and we are working internally to test a fix, however, I don't have a timeline on when/which future release will this be fixed. Till then, please see if the workaround in the above doc helps.
Created 11-14-2017 09:09 AM
Thank you for your response AutoIN. It looks like this workaround will work for what I need it for.
Created 02-12-2018 07:05 AM
I'm running Spark2 submit command line successfully as local and yarn cluster mode in CDH 5.12.
I experience the same problem with saveAsTable when I run it in Hue Oozie workflow, given I loaded all Spark2 libraries to share/lib and pointed my workflow to that new dir. Since I have hundreds of tables, and some of them change structure over time, I am unable to declare Hive tables by hand.
In command line, Spark autogenerates the Hive table, as parquet, if it does not exist. Append mode also works well, given I have not tried the insert feature.
It is very tricky to run Spark2 cluster mode jobs. I made sure I entered first the spark-submit parameters first before my job arguments. See how I run the job below:
$ spark-submit --version
version 2.2.0.cloudera2
Using Scala version 2.11.8
...
$ pwd /home/hg/app/spark/demo $ ls -1 ojdbc8.jar oracle_schema.properties demo_2.11-0.1.0-SNAPSHOT.jar $ spark-submit --verbose --class com.hernandezgraham.demo --master yarn --deploy-mode cluster --jars ojdbc8.jar --driver-class-path ojdbc8.jar --files oracle_schema.properties demo_2.11-0.1.0-SNAPSHOT.jar "argument1" "argument2"
$ hdfs dfs -ls /user/hive/warehouse/demo
drwxr-xr-x - hdfs hive 0 2018-02-12 09:39 /user/hive/warehouse/demo
$ hive
hive> show databases;
OK
default
hive> show tables;
OK
demo
hive> select * from demo limit 1;
OK
1 hguser
Time taken: 0.557 seconds, Fetched: 1 row(s)
Even though Spark 2 executes my code successfully in Oozie workflows, it still does not write the file and the Hive table. Perhaps that is a bug fix in 5.12 for the command line. The documentation is not quite clear for Hue.
Created 12-10-2019 01:30 PM
If Cloudera could update things maybe it could work better!
Created 11-13-2017 01:55 PM
I haven't had much luck when pipelining the format and mode options.
I've been doing it like this instead. I'm using python though not scala.
dataFrame.write.saveAsTable("tableName", format="parquet", mode="overwrite")
The issue I'm having isn't that it won't create the table or write the data using saveAsTable, its that spark doesn't see any data in the the table if I go back and try to read it later. I can do queries on it using Hive without an issue. If I use hive and remove the spark.sql.sources.provider option from tblproperties then I can read the table with spark properly.
Created 11-13-2017 02:06 PM
Thank you for the suggestion EB9185. This unfortunately doesn't work in scala though. The saveAsTable() method in scala is only designed to take the tableName as input and you need to pipe the mode() and format() methods separately.