Created on 05-15-2017 06:18 AM - edited 09-16-2022 04:36 AM
Hi,
I have an old table where data was created by Impala (2.x). The table is accessible by Impala and the data returned by Impala is valid and correct. However when I try to read the same table (partition) by SparkSQL or Hive, I got in 3 out of 30 columns NULL values. The data are there, the columns are varchars as the others, nullability is the same, and they contain non null values.
When I created a new table via CTAS in Impala:
create table tmp.sample stored as parquet as select * from orig_table where partcol = xxx;
Then the data are correctly read by Hive and SparkSQL, all 30 columns.
The old table has these properties:
| ROW FORMAT SERDE |
| 'parquet.hive.serde.ParquetHiveSerDe' |
| STORED AS INPUTFORMAT |
| 'parquet.hive.DeprecatedParquetInputFormat' |
| OUTPUTFORMAT |
| 'parquet.hive.DeprecatedParquetOutputFormat' |
The new table has different row format serde and input/output formats:
| ROW FORMAT SERDE |
| 'parquet.hive.serde.ParquetHiveSerDe' |
| STORED AS INPUTFORMAT |
| 'parquet.hive.DeprecatedParquetInputFormat' |
| OUTPUTFORMAT |
| 'parquet.hive.DeprecatedParquetOutputFormat' |
How can I tweak Hive/Beeline/Spark to read the data correctly from the original table?
Thanks
PS. Using CDH5.7.1
Created 05-15-2017 11:53 PM
I have figured out the problem and the solution.
The problem si that Hive reads parquet files in partitions by actual schema definition of the table and Impala (I assume) reads by position.
The table had some old partitions created under different schema, the column name were different. But the total number of columns and the position remained the same.
the old table
CREATE TABLE test ( event varchar(10), event_id int, event_time timestamp );
.. some partitions inserted, then the column renamed to event_name.
the actual table definition:
CREATE TABLE test ( event_name varchar(10), event_id int, event_time timestamp );
Now, if I select all partitions in Impala, the query returns all the data correctly. So I assume that Impala ignores the column names in parquet file and tries to access the first column as event_name with type varchar(10).
But SparkSQL and Beeline returns NULL for the partitions created with the old definition.
I downloaded the parquet files and evaluated the schema with parquet-tools and the columns are the old column names.
So to test whether the column was renamed in a table I created a simple script. It is important to run spark with mergeSchema true parameter, to read ALL the schema definitions from the table.
spark-shell --conf spark.sql.parquet.mergeSchema=true import scala.util.matching.Regex def test( tbl : String ) = { val tb_md = sqlContext.sql("show create table "+tbl).collect() val ddl = tb_md.map( x => x.get(0).toString ).mkString(" ") val pattern = new Regex("""LOCATION\s*\'(.+)\'\s+TBLPROPERTIES""") var loc:String = "" try { loc = (pattern findAllIn ddl).matchData.next.group(1) } catch { case e: Exception => //error } var d = sqlContext.read.parquet(loc ) val columns_parq = d.columns println( columns_parq.toList ) println( "Table " + tbl + " has " + columns_parq.length + " columns.") }
Created 05-15-2017 11:53 PM
I have figured out the problem and the solution.
The problem si that Hive reads parquet files in partitions by actual schema definition of the table and Impala (I assume) reads by position.
The table had some old partitions created under different schema, the column name were different. But the total number of columns and the position remained the same.
the old table
CREATE TABLE test ( event varchar(10), event_id int, event_time timestamp );
.. some partitions inserted, then the column renamed to event_name.
the actual table definition:
CREATE TABLE test ( event_name varchar(10), event_id int, event_time timestamp );
Now, if I select all partitions in Impala, the query returns all the data correctly. So I assume that Impala ignores the column names in parquet file and tries to access the first column as event_name with type varchar(10).
But SparkSQL and Beeline returns NULL for the partitions created with the old definition.
I downloaded the parquet files and evaluated the schema with parquet-tools and the columns are the old column names.
So to test whether the column was renamed in a table I created a simple script. It is important to run spark with mergeSchema true parameter, to read ALL the schema definitions from the table.
spark-shell --conf spark.sql.parquet.mergeSchema=true import scala.util.matching.Regex def test( tbl : String ) = { val tb_md = sqlContext.sql("show create table "+tbl).collect() val ddl = tb_md.map( x => x.get(0).toString ).mkString(" ") val pattern = new Regex("""LOCATION\s*\'(.+)\'\s+TBLPROPERTIES""") var loc:String = "" try { loc = (pattern findAllIn ddl).matchData.next.group(1) } catch { case e: Exception => //error } var d = sqlContext.read.parquet(loc ) val columns_parq = d.columns println( columns_parq.toList ) println( "Table " + tbl + " has " + columns_parq.length + " columns.") }
Created 12-05-2018 04:14 AM
Actually I got NULL value of columns in created new table which impala could return right results. So if your problem solved may give some suggestion for this. Thanks.