Support Questions

Find answers, ask questions, and share your expertise
Celebrating as our community reaches 100,000 members! Thank you!

Spark 2 Can't write dataframe to parquet table


I'm trying to write a dataframe to a parquet hive table and keep getting an error saying that the table is HiveFileFormat and not ParquetFileFormat. The table is definitely a parquet table.


Here's how I'm creating the sparkSession:

val spark = SparkSession
          .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
          .config("spark.sql.parquet.compression.codec", "snappy")
          .config("hive.exec.dynamic.partition", "true")
          .config("hive.exec.dynamic.partition.mode", "nonstrict")
          .config("parquet.compression", "SNAPPY")
          .config("hive.exec.max.dynamic.partitions", "3000")
          .config("parquet.enable.dictionary", "false")
          .config("", "true")

Here's the code that fails and the error message:


org.apache.spark.sql.AnalysisException: The format of the existing table tableName is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;

Here's the table storage info:

# Storage Information
SerDe Library:
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
        field.delim             \t
        line.delim              \n
        serialization.format    \t

This same code worked before upgrading to spark2, for some reason it isn't recognizing the table as parquet now.


Master Collaborator

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community



After doing more testing I'm finding that this happens on tables that aren't even parquet tables and the write command isn't even specifying parquet as the write format. This error does not happen with the insertInto() command though. This is not a good work around because saveAsTable() checks column names wheras insertInto() is based on data order and that is not acceptable for the use case I have.


Table I'm trying to write to info:

$ hive -e "describe formatted test_parquet_spark"

# col_name              data_type               comment

col1                    string
col2                    string

# Detailed Table Information
Database:               default
CreateTime:             Fri Nov 10 22:54:20 GMT 2017
LastAccessTime:         UNKNOWN
Protect Mode:           None
Retention:              0
Table Type:             MANAGED_TABLE

# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
        serialization.format    1

Testing in spark2-shell:

scala> val spark = SparkSession.builder.config("spark.sql.warehouse.dir", "/user/hive/warehouse").enableHiveSupport.getOrCreate
17/11/13 20:17:05 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2524f9bf

scala> val schema = new StructType().add(StructField("col1", StringType)).add(StructField("col2", StringType))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(col1,StringType,true), StructField(col2,StringType,true))

scala> val dataRDD = spark.sparkContext.parallelize(Seq(Row("foobar", "barfoo")))
dataRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val dataDF = spark.createDataFrame(dataRDD, schema)
dataDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string]

scala> dataDF.write.insertInto("test_parquet_spark")

$ hive -e "select * from test_parquet_spark"
foobar  barfoo

scala> dataDF.write.mode("append").saveAsTable("test_parquet_spark")
org.apache.spark.sql.AnalysisException: The format of the existing table default.test_parquet_spark is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:115)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:75)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation.apply(rules.scala:75)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation.apply(rules.scala:71)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:420)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)
... 48 elided

Why does saveAsTable() not work while insertInto() does in this example?

And why does saveAsTable() not recognize parquet tables as being parquet tables?


If anyone knows what is going on I would really appreciate it. Thanks


Also if it helps this is on Spark 2.2.0 and CDH5.12.0

Master Collaborator

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community



Thank you for your response AutoIN. It looks like this workaround will work for what I need it for.


I'm running Spark2 submit command line successfully as local and yarn cluster mode in CDH 5.12.


I experience the same problem with saveAsTable when I run it in Hue Oozie workflow, given I loaded all Spark2 libraries to share/lib and pointed my workflow to that new dir. Since I have hundreds of tables, and some of them change structure over time, I am unable to declare Hive tables by hand.


In command line, Spark autogenerates the Hive table, as parquet, if it does not exist. Append mode also works well, given I have not tried the insert feature.



It is very tricky to run Spark2 cluster mode jobs. I made sure I entered first the spark-submit parameters first before my job arguments. See how I run the job below:




$ spark-submit --version
version 2.2.0.cloudera2
Using Scala version 2.11.8

$ pwd /home/hg/app/spark/demo $ ls -1 ojdbc8.jar demo_2.11-0.1.0-SNAPSHOT.jar $ spark-submit --verbose --class com.hernandezgraham.demo --master yarn --deploy-mode cluster --jars ojdbc8.jar --driver-class-path ojdbc8.jar --files demo_2.11-0.1.0-SNAPSHOT.jar "argument1" "argument2"

$ hdfs dfs -ls /user/hive/warehouse/demo
drwxr-xr-x   - hdfs  hive          0 2018-02-12 09:39 /user/hive/warehouse/demo

$ hive
hive> show databases;
hive> show tables;
hive> select * from demo limit 1;
1 hguser
Time taken: 0.557 seconds, Fetched: 1 row(s)




Even though Spark 2 executes my code successfully in Oozie workflows, it still does not write the file and the Hive table. Perhaps that is a bug fix in 5.12 for the command line. The documentation is not quite clear for Hue.

New Contributor

If Cloudera could update things maybe it could work better!

New Contributor

I haven't had much luck when pipelining the format and mode options.


I've been doing it like this instead. I'm using python though not scala.


dataFrame.write.saveAsTable("tableName", format="parquet", mode="overwrite")

The issue I'm having isn't that it won't create the table or write the data using saveAsTable, its that spark doesn't see any data in the the table if I go back and try to read it later. I can do queries on it using Hive without an issue. If I use hive and remove the spark.sql.sources.provider option from tblproperties then I can read the table with spark properly. 


Thank you for the suggestion EB9185. This unfortunately doesn't work in scala though. The saveAsTable() method in scala is only designed to take the tableName as input and you need to pipe the mode() and format() methods separately.