Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark 2 Can't write dataframe to parquet table

avatar
Explorer

I'm trying to write a dataframe to a parquet hive table and keep getting an error saying that the table is HiveFileFormat and not ParquetFileFormat. The table is definitely a parquet table.

 

Here's how I'm creating the sparkSession:

val spark = SparkSession
          .builder()
          .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
          .config("spark.sql.sources.maxConcurrentWrites","1")
          .config("spark.sql.parquet.compression.codec", "snappy")
          .config("hive.exec.dynamic.partition", "true")
          .config("hive.exec.dynamic.partition.mode", "nonstrict")
          .config("parquet.compression", "SNAPPY")
          .config("hive.exec.max.dynamic.partitions", "3000")
          .config("parquet.enable.dictionary", "false")
          .config("hive.support.concurrency", "true")
          .enableHiveSupport()
          .getOrCreate()

Here's the code that fails and the error message:

 

dataFrame.write.format("parquet").mode(saveMode).partitionBy(partitionCol).saveAsTable(tableName)
org.apache.spark.sql.AnalysisException: The format of the existing table tableName is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;

Here's the table storage info:

# Storage Information
SerDe Library:          org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat:            org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
        field.delim             \t
        line.delim              \n
        serialization.format    \t

This same code worked before upgrading to spark2, for some reason it isn't recognizing the table as parquet now.

1 ACCEPTED SOLUTION

avatar
Master Collaborator

This is because 'saveAsTable()' doesn't currently works with Hive. It's documented here:

https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_rn_spark_ki.html#ki_spark...

 

More context: This was recently reported with Spark2.2 and we are working internally to test a fix, however, I don't have a timeline on when/which future release will this be fixed. Till then, please see if the workaround in the above doc helps.

View solution in original post

8 REPLIES 8

avatar
Explorer

After doing more testing I'm finding that this happens on tables that aren't even parquet tables and the write command isn't even specifying parquet as the write format. This error does not happen with the insertInto() command though. This is not a good work around because saveAsTable() checks column names wheras insertInto() is based on data order and that is not acceptable for the use case I have.

 

Table I'm trying to write to info:

$ hive -e "describe formatted test_parquet_spark"

# col_name              data_type               comment

col1                    string
col2                    string

# Detailed Table Information
Database:               default
CreateTime:             Fri Nov 10 22:54:20 GMT 2017
LastAccessTime:         UNKNOWN
Protect Mode:           None
Retention:              0
Table Type:             MANAGED_TABLE

# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
        serialization.format    1

Testing in spark2-shell:

scala> val spark = SparkSession.builder.config("spark.sql.warehouse.dir", "/user/hive/warehouse").enableHiveSupport.getOrCreate
17/11/13 20:17:05 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2524f9bf

scala> val schema = new StructType().add(StructField("col1", StringType)).add(StructField("col2", StringType))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(col1,StringType,true), StructField(col2,StringType,true))

scala> val dataRDD = spark.sparkContext.parallelize(Seq(Row("foobar", "barfoo")))
dataRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val dataDF = spark.createDataFrame(dataRDD, schema)
dataDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string]

scala> dataDF.write.insertInto("test_parquet_spark")

$ hive -e "select * from test_parquet_spark"
OK
foobar  barfoo

scala> dataDF.write.mode("append").saveAsTable("test_parquet_spark")
org.apache.spark.sql.AnalysisException: The format of the existing table default.test_parquet_spark is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:115)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:75)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation.apply(rules.scala:75)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation.apply(rules.scala:71)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:420)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)
... 48 elided

Why does saveAsTable() not work while insertInto() does in this example?

And why does saveAsTable() not recognize parquet tables as being parquet tables?

 

If anyone knows what is going on I would really appreciate it. Thanks

avatar
Explorer

Also if it helps this is on Spark 2.2.0 and CDH5.12.0

avatar
Master Collaborator

This is because 'saveAsTable()' doesn't currently works with Hive. It's documented here:

https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_rn_spark_ki.html#ki_spark...

 

More context: This was recently reported with Spark2.2 and we are working internally to test a fix, however, I don't have a timeline on when/which future release will this be fixed. Till then, please see if the workaround in the above doc helps.

avatar
Explorer

Thank you for your response AutoIN. It looks like this workaround will work for what I need it for.

avatar
Explorer

I'm running Spark2 submit command line successfully as local and yarn cluster mode in CDH 5.12.

 

I experience the same problem with saveAsTable when I run it in Hue Oozie workflow, given I loaded all Spark2 libraries to share/lib and pointed my workflow to that new dir. Since I have hundreds of tables, and some of them change structure over time, I am unable to declare Hive tables by hand.

 

In command line, Spark autogenerates the Hive table, as parquet, if it does not exist. Append mode also works well, given I have not tried the insert feature.

 

 

It is very tricky to run Spark2 cluster mode jobs. I made sure I entered first the spark-submit parameters first before my job arguments. See how I run the job below:

 

 

 

$ spark-submit --version
version 2.2.0.cloudera2
Using Scala version 2.11.8
...

$ pwd /home/hg/app/spark/demo $ ls -1 ojdbc8.jar oracle_schema.properties demo_2.11-0.1.0-SNAPSHOT.jar $ spark-submit --verbose --class com.hernandezgraham.demo --master yarn --deploy-mode cluster --jars ojdbc8.jar --driver-class-path ojdbc8.jar --files oracle_schema.properties demo_2.11-0.1.0-SNAPSHOT.jar "argument1" "argument2"

$ hdfs dfs -ls /user/hive/warehouse/demo
drwxr-xr-x   - hdfs  hive          0 2018-02-12 09:39 /user/hive/warehouse/demo

$ hive
hive> show databases;
OK
default
hive> show tables;
OK
demo
hive> select * from demo limit 1;
OK
1 hguser
Time taken: 0.557 seconds, Fetched: 1 row(s)

 

 

 

Even though Spark 2 executes my code successfully in Oozie workflows, it still does not write the file and the Hive table. Perhaps that is a bug fix in 5.12 for the command line. The documentation is not quite clear for Hue.

avatar
New Contributor

If Cloudera could update things maybe it could work better!

avatar
New Contributor

I haven't had much luck when pipelining the format and mode options.

 

I've been doing it like this instead. I'm using python though not scala.

 

dataFrame.write.saveAsTable("tableName", format="parquet", mode="overwrite")

The issue I'm having isn't that it won't create the table or write the data using saveAsTable, its that spark doesn't see any data in the the table if I go back and try to read it later. I can do queries on it using Hive without an issue. If I use hive and remove the spark.sql.sources.provider option from tblproperties then I can read the table with spark properly. 

avatar
Explorer

Thank you for the suggestion EB9185. This unfortunately doesn't work in scala though. The saveAsTable() method in scala is only designed to take the tableName as input and you need to pipe the mode() and format() methods separately.