Reply
Explorer
Posts: 11
Registered: ‎07-14-2015
Accepted Solution

Spark 2 Can't write dataframe to parquet table

[ Edited ]

I'm trying to write a dataframe to a parquet hive table and keep getting an error saying that the table is HiveFileFormat and not ParquetFileFormat. The table is definitely a parquet table.

 

Here's how I'm creating the sparkSession:

val spark = SparkSession
          .builder()
          .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
          .config("spark.sql.sources.maxConcurrentWrites","1")
          .config("spark.sql.parquet.compression.codec", "snappy")
          .config("hive.exec.dynamic.partition", "true")
          .config("hive.exec.dynamic.partition.mode", "nonstrict")
          .config("parquet.compression", "SNAPPY")
          .config("hive.exec.max.dynamic.partitions", "3000")
          .config("parquet.enable.dictionary", "false")
          .config("hive.support.concurrency", "true")
          .enableHiveSupport()
          .getOrCreate()

Here's the code that fails and the error message:

 

dataFrame.write.format("parquet").mode(saveMode).partitionBy(partitionCol).saveAsTable(tableName)
org.apache.spark.sql.AnalysisException: The format of the existing table tableName is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;

Here's the table storage info:

# Storage Information
SerDe Library:          org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat:            org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
        field.delim             \t
        line.delim              \n
        serialization.format    \t

This same code worked before upgrading to spark2, for some reason it isn't recognizing the table as parquet now.

Explorer
Posts: 11
Registered: ‎07-14-2015

Re: Spark 2 Can't write dataframe to parquet table

After doing more testing I'm finding that this happens on tables that aren't even parquet tables and the write command isn't even specifying parquet as the write format. This error does not happen with the insertInto() command though. This is not a good work around because saveAsTable() checks column names wheras insertInto() is based on data order and that is not acceptable for the use case I have.

 

Table I'm trying to write to info:

$ hive -e "describe formatted test_parquet_spark"

# col_name              data_type               comment

col1                    string
col2                    string

# Detailed Table Information
Database:               default
CreateTime:             Fri Nov 10 22:54:20 GMT 2017
LastAccessTime:         UNKNOWN
Protect Mode:           None
Retention:              0
Table Type:             MANAGED_TABLE

# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
        serialization.format    1

Testing in spark2-shell:

scala> val spark = SparkSession.builder.config("spark.sql.warehouse.dir", "/user/hive/warehouse").enableHiveSupport.getOrCreate
17/11/13 20:17:05 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2524f9bf

scala> val schema = new StructType().add(StructField("col1", StringType)).add(StructField("col2", StringType))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(col1,StringType,true), StructField(col2,StringType,true))

scala> val dataRDD = spark.sparkContext.parallelize(Seq(Row("foobar", "barfoo")))
dataRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val dataDF = spark.createDataFrame(dataRDD, schema)
dataDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string]

scala> dataDF.write.insertInto("test_parquet_spark")

$ hive -e "select * from test_parquet_spark"
OK
foobar  barfoo

scala> dataDF.write.mode("append").saveAsTable("test_parquet_spark")
org.apache.spark.sql.AnalysisException: The format of the existing table default.test_parquet_spark is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:115)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:75)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation.apply(rules.scala:75)
at org.apache.spark.sql.execution.datasources.PreprocessTableCreation.apply(rules.scala:71)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:420)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)
... 48 elided

Why does saveAsTable() not work while insertInto() does in this example?

And why does saveAsTable() not recognize parquet tables as being parquet tables?

 

If anyone knows what is going on I would really appreciate it. Thanks

Highlighted
Explorer
Posts: 11
Registered: ‎07-14-2015

Re: Spark 2 Can't write dataframe to parquet table

Also if it helps this is on Spark 2.2.0 and CDH5.12.0

New Contributor
Posts: 1
Registered: ‎11-13-2017

Re: Spark 2 Can't write dataframe to parquet table

I haven't had much luck when pipelining the format and mode options.

 

I've been doing it like this instead. I'm using python though not scala.

 

dataFrame.write.saveAsTable("tableName", format="parquet", mode="overwrite")

The issue I'm having isn't that it won't create the table or write the data using saveAsTable, its that spark doesn't see any data in the the table if I go back and try to read it later. I can do queries on it using Hive without an issue. If I use hive and remove the spark.sql.sources.provider option from tblproperties then I can read the table with spark properly. 

Explorer
Posts: 11
Registered: ‎07-14-2015

Re: Spark 2 Can't write dataframe to parquet table

Thank you for the suggestion EB9185. This unfortunately doesn't work in scala though. The saveAsTable() method in scala is only designed to take the tableName as input and you need to pipe the mode() and format() methods separately. 

Cloudera Employee
Posts: 31
Registered: ‎11-16-2015

Re: Spark 2 Can't write dataframe to parquet table

This is because 'saveAsTable()' doesn't currently works with Hive. It's documented here:

https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_rn_spark_ki.html#ki_spark...

 

More context: This was recently reported with Spark2.2 and we are working internally to test a fix, however, I don't have a timeline on when/which future release will this be fixed. Till then, please see if the workaround in the above doc helps.

Explorer
Posts: 11
Registered: ‎07-14-2015

Re: Spark 2 Can't write dataframe to parquet table

Thank you for your response AutoIN. It looks like this workaround will work for what I need it for.

Announcements