Created on 04-10-2016 11:44 AM - edited 09-16-2022 03:13 AM
I am trying to add columns to table that I created with the “saveAsTable” api.
I update the columns using sqlContext.sql(‘alter table myTable add columns (mycol string)’).
The next time I create a df and save it in the same table, with the new columns I get a :
“ParquetRelation
requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE statement generates the same number of columns as its schema.”
Also thise two commands don t return the same columns :
1. sqlContext.table(‘myTable’).schema.fields <— wrong result
2. sqlContext.sql(’show columns in mytable’) <—— good results
It seems to be a known bug : https://issues.apache.org/jira/browse/SPARK-9764 (see related bugs)
But I am wondering, how else can I update the columns or make sure that spark take the new columns?
I already tried to refreshTable and to restart spark.
thanks
Created 08-01-2016 11:30 AM
Hi,
I was able to do it with some code. Probably not the best solution but here it is in python :
    def _merge_schema(self, df, db, table):
        df_schema_dict = df.schema.jsonValue()['fields']
        table_columns = self.sqlContext.table(table).schema.fields
        df_schema_list = sorted([x['name'].lower() for x in df_schema_dict])
        table_schema_list = sorted([x.name.lower() for x in table_columns])
        missing_in_table = set(df_schema_list) - set(table_schema_list)
        missing_in_df = set(table_schema_list) - set(df_schema_list)
        df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict))
        missing_col_array = []
        for missing_col in missing_in_table:
            my_type = df_schema_dict_by_name[missing_col]['type']
            missing_col_array.append("`" + missing_col + "` " + my_type)
        if len(missing_col_array) != 0:
            self.sqlContext.sql(
                "ALTER TABLE " + table + " ADD COLUMNS ( " + ' , '.join(missing_col_array) + ')')
        table_schema_dict_by_name = dict((d.name, d) for d in table_columns)
        for missing_col in missing_in_df:
            df = df.withColumn(missing_col, lit(None).cast(table_schema_dict_by_name[missing_col].dataType.typeName()))
        # re-order all the columns otherwise the insert bugs
        all_columns = self.sqlContext.table(table).schema.fields
        columns_names = [df[x.name.lower()] for x in all_columns]
        df = df.select(*columns_names)
        return df
					
				
			
			
				
			
			
			
				
			
			
			
			
			
		Created 07-28-2016 10:52 PM
Hi Maurin,
Did you ever get a resolution to this issue? I am experiencing the exact same issue. I am attempting to programmatically add columns to an existing table when our streaming data contains new attributes.
Thanks,
David
Created 08-01-2016 11:30 AM
Hi,
I was able to do it with some code. Probably not the best solution but here it is in python :
    def _merge_schema(self, df, db, table):
        df_schema_dict = df.schema.jsonValue()['fields']
        table_columns = self.sqlContext.table(table).schema.fields
        df_schema_list = sorted([x['name'].lower() for x in df_schema_dict])
        table_schema_list = sorted([x.name.lower() for x in table_columns])
        missing_in_table = set(df_schema_list) - set(table_schema_list)
        missing_in_df = set(table_schema_list) - set(df_schema_list)
        df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict))
        missing_col_array = []
        for missing_col in missing_in_table:
            my_type = df_schema_dict_by_name[missing_col]['type']
            missing_col_array.append("`" + missing_col + "` " + my_type)
        if len(missing_col_array) != 0:
            self.sqlContext.sql(
                "ALTER TABLE " + table + " ADD COLUMNS ( " + ' , '.join(missing_col_array) + ')')
        table_schema_dict_by_name = dict((d.name, d) for d in table_columns)
        for missing_col in missing_in_df:
            df = df.withColumn(missing_col, lit(None).cast(table_schema_dict_by_name[missing_col].dataType.typeName()))
        # re-order all the columns otherwise the insert bugs
        all_columns = self.sqlContext.table(table).schema.fields
        columns_names = [df[x.name.lower()] for x in all_columns]
        df = df.select(*columns_names)
        return df
					
				
			
			
				
			
			
			
			
			
			
			
		Created 08-01-2016 12:13 PM
Created on 01-11-2017 02:58 AM - edited 01-11-2017 02:59 AM
can you elaborate the way you have solved it, cause i just ran in to this issue.
I have 11 columns in data frame(added a timestamp column to 10 columned RDD).
And i have a hive table with complete 11 columns one is partitioned by timestamp.
Created 03-22-2017 11:00 AM
I am getting the below error
TypeError: 'StructField' object has no attribute '__getitem__'
at
df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict))
How do I rectify this
Created 03-22-2017 02:44 PM
 
					
				
				
			
		
