Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

add columns to hive/parquet table

avatar
Rising Star

I am trying to add columns to table that I created with the “saveAsTable” api. 

I update the columns using sqlContext.sql(‘alter table myTable add columns (mycol string)’). 

The next time I create a df and save it in the same table, with the new columns I get a :

“ParquetRelation

 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE statement generates the same number of columns as its schema.”

 

Also thise two commands don t return the same columns :

1. sqlContext.table(‘myTable’).schema.fields    <— wrong result

2. sqlContext.sql(’show columns in mytable’)  <—— good results

 

It seems to be a known bug : https://issues.apache.org/jira/browse/SPARK-9764 (see related bugs)

 

But I am wondering, how else can I update the columns or make sure that spark take the new columns?

 

I already tried to refreshTable and to restart spark.

 

thanks

1 ACCEPTED SOLUTION

avatar
Rising Star

Hi,

I was able to do it with some code. Probably not the best solution but here it is in python :

    def _merge_schema(self, df, db, table):
        df_schema_dict = df.schema.jsonValue()['fields']

        table_columns = self.sqlContext.table(table).schema.fields

        df_schema_list = sorted([x['name'].lower() for x in df_schema_dict])
        table_schema_list = sorted([x.name.lower() for x in table_columns])

        missing_in_table = set(df_schema_list) - set(table_schema_list)
        missing_in_df = set(table_schema_list) - set(df_schema_list)

        df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict))
        missing_col_array = []

        for missing_col in missing_in_table:
            my_type = df_schema_dict_by_name[missing_col]['type']
            missing_col_array.append("`" + missing_col + "` " + my_type)
        if len(missing_col_array) != 0:
            self.sqlContext.sql(
                "ALTER TABLE " + table + " ADD COLUMNS ( " + ' , '.join(missing_col_array) + ')')

        table_schema_dict_by_name = dict((d.name, d) for d in table_columns)
        for missing_col in missing_in_df:
            df = df.withColumn(missing_col, lit(None).cast(table_schema_dict_by_name[missing_col].dataType.typeName()))
        # re-order all the columns otherwise the insert bugs
        all_columns = self.sqlContext.table(table).schema.fields
        columns_names = [df[x.name.lower()] for x in all_columns]
        df = df.select(*columns_names)
        return df

View solution in original post

6 REPLIES 6

avatar
New Contributor

Hi Maurin,

 

Did you ever get a resolution to this issue?  I am experiencing the exact same issue.  I am attempting to programmatically add columns to an existing table when our streaming data contains new attributes.

 

Thanks,

David

 

avatar
Rising Star

Hi,

I was able to do it with some code. Probably not the best solution but here it is in python :

    def _merge_schema(self, df, db, table):
        df_schema_dict = df.schema.jsonValue()['fields']

        table_columns = self.sqlContext.table(table).schema.fields

        df_schema_list = sorted([x['name'].lower() for x in df_schema_dict])
        table_schema_list = sorted([x.name.lower() for x in table_columns])

        missing_in_table = set(df_schema_list) - set(table_schema_list)
        missing_in_df = set(table_schema_list) - set(df_schema_list)

        df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict))
        missing_col_array = []

        for missing_col in missing_in_table:
            my_type = df_schema_dict_by_name[missing_col]['type']
            missing_col_array.append("`" + missing_col + "` " + my_type)
        if len(missing_col_array) != 0:
            self.sqlContext.sql(
                "ALTER TABLE " + table + " ADD COLUMNS ( " + ' , '.join(missing_col_array) + ')')

        table_schema_dict_by_name = dict((d.name, d) for d in table_columns)
        for missing_col in missing_in_df:
            df = df.withColumn(missing_col, lit(None).cast(table_schema_dict_by_name[missing_col].dataType.typeName()))
        # re-order all the columns otherwise the insert bugs
        all_columns = self.sqlContext.table(table).schema.fields
        columns_names = [df[x.name.lower()] for x in all_columns]
        df = df.select(*columns_names)
        return df

avatar
New Contributor
Thanks for the reply! I was able to get it working on Friday by doing the same thing as you. I manually generate the ALTER TABLE statement by sorting the fields and determining which fields are missing in the table and the incoming data frame. Thanks again for confirming that you had to do the same thing!

avatar
Rising Star

@DWinters

can you elaborate the way you have solved it, cause i just ran in to this issue.
I have 11 columns in data frame(added a timestamp column to 10 columned RDD).
And i have a hive table with complete 11 columns one is partitioned by timestamp.

avatar
Contributor

@maurin

 

I am getting the below error

TypeError: 'StructField' object has no attribute '__getitem__' 

 

at 

df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict))

 How do I rectify this 

avatar
Contributor

@DWinters

 

How were you able to overcome this issue? Could you please post a sample solution