Created on 04-10-2016 11:44 AM - edited 09-16-2022 03:13 AM
I am trying to add columns to table that I created with the “saveAsTable” api.
I update the columns using sqlContext.sql(‘alter table myTable add columns (mycol string)’).
The next time I create a df and save it in the same table, with the new columns I get a :
“ParquetRelation
requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE statement generates the same number of columns as its schema.”
Also thise two commands don t return the same columns :
1. sqlContext.table(‘myTable’).schema.fields <— wrong result
2. sqlContext.sql(’show columns in mytable’) <—— good results
It seems to be a known bug : https://issues.apache.org/jira/browse/SPARK-9764 (see related bugs)
But I am wondering, how else can I update the columns or make sure that spark take the new columns?
I already tried to refreshTable and to restart spark.
thanks
Created 08-01-2016 11:30 AM
Hi,
I was able to do it with some code. Probably not the best solution but here it is in python :
def _merge_schema(self, df, db, table): df_schema_dict = df.schema.jsonValue()['fields'] table_columns = self.sqlContext.table(table).schema.fields df_schema_list = sorted([x['name'].lower() for x in df_schema_dict]) table_schema_list = sorted([x.name.lower() for x in table_columns]) missing_in_table = set(df_schema_list) - set(table_schema_list) missing_in_df = set(table_schema_list) - set(df_schema_list) df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict)) missing_col_array = [] for missing_col in missing_in_table: my_type = df_schema_dict_by_name[missing_col]['type'] missing_col_array.append("`" + missing_col + "` " + my_type) if len(missing_col_array) != 0: self.sqlContext.sql( "ALTER TABLE " + table + " ADD COLUMNS ( " + ' , '.join(missing_col_array) + ')') table_schema_dict_by_name = dict((d.name, d) for d in table_columns) for missing_col in missing_in_df: df = df.withColumn(missing_col, lit(None).cast(table_schema_dict_by_name[missing_col].dataType.typeName())) # re-order all the columns otherwise the insert bugs all_columns = self.sqlContext.table(table).schema.fields columns_names = [df[x.name.lower()] for x in all_columns] df = df.select(*columns_names) return df
Created 07-28-2016 10:52 PM
Hi Maurin,
Did you ever get a resolution to this issue? I am experiencing the exact same issue. I am attempting to programmatically add columns to an existing table when our streaming data contains new attributes.
Thanks,
David
Created 08-01-2016 11:30 AM
Hi,
I was able to do it with some code. Probably not the best solution but here it is in python :
def _merge_schema(self, df, db, table): df_schema_dict = df.schema.jsonValue()['fields'] table_columns = self.sqlContext.table(table).schema.fields df_schema_list = sorted([x['name'].lower() for x in df_schema_dict]) table_schema_list = sorted([x.name.lower() for x in table_columns]) missing_in_table = set(df_schema_list) - set(table_schema_list) missing_in_df = set(table_schema_list) - set(df_schema_list) df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict)) missing_col_array = [] for missing_col in missing_in_table: my_type = df_schema_dict_by_name[missing_col]['type'] missing_col_array.append("`" + missing_col + "` " + my_type) if len(missing_col_array) != 0: self.sqlContext.sql( "ALTER TABLE " + table + " ADD COLUMNS ( " + ' , '.join(missing_col_array) + ')') table_schema_dict_by_name = dict((d.name, d) for d in table_columns) for missing_col in missing_in_df: df = df.withColumn(missing_col, lit(None).cast(table_schema_dict_by_name[missing_col].dataType.typeName())) # re-order all the columns otherwise the insert bugs all_columns = self.sqlContext.table(table).schema.fields columns_names = [df[x.name.lower()] for x in all_columns] df = df.select(*columns_names) return df
Created 08-01-2016 12:13 PM
Created on 01-11-2017 02:58 AM - edited 01-11-2017 02:59 AM
can you elaborate the way you have solved it, cause i just ran in to this issue.
I have 11 columns in data frame(added a timestamp column to 10 columned RDD).
And i have a hive table with complete 11 columns one is partitioned by timestamp.
Created 03-22-2017 11:00 AM
I am getting the below error
TypeError: 'StructField' object has no attribute '__getitem__'
at
df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict))
How do I rectify this
Created 03-22-2017 02:44 PM