Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

add columns to hive/parquet table

Solved Go to solution
Highlighted

add columns to hive/parquet table

Contributor

I am trying to add columns to table that I created with the “saveAsTable” api. 

I update the columns using sqlContext.sql(‘alter table myTable add columns (mycol string)’). 

The next time I create a df and save it in the same table, with the new columns I get a :

“ParquetRelation

 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE statement generates the same number of columns as its schema.”

 

Also thise two commands don t return the same columns :

1. sqlContext.table(‘myTable’).schema.fields    <— wrong result

2. sqlContext.sql(’show columns in mytable’)  <—— good results

 

It seems to be a known bug : https://issues.apache.org/jira/browse/SPARK-9764 (see related bugs)

 

But I am wondering, how else can I update the columns or make sure that spark take the new columns?

 

I already tried to refreshTable and to restart spark.

 

thanks

1 ACCEPTED SOLUTION

Accepted Solutions

Re: add columns to hive/parquet table

Contributor

Hi,

I was able to do it with some code. Probably not the best solution but here it is in python :

    def _merge_schema(self, df, db, table):
        df_schema_dict = df.schema.jsonValue()['fields']

        table_columns = self.sqlContext.table(table).schema.fields

        df_schema_list = sorted([x['name'].lower() for x in df_schema_dict])
        table_schema_list = sorted([x.name.lower() for x in table_columns])

        missing_in_table = set(df_schema_list) - set(table_schema_list)
        missing_in_df = set(table_schema_list) - set(df_schema_list)

        df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict))
        missing_col_array = []

        for missing_col in missing_in_table:
            my_type = df_schema_dict_by_name[missing_col]['type']
            missing_col_array.append("`" + missing_col + "` " + my_type)
        if len(missing_col_array) != 0:
            self.sqlContext.sql(
                "ALTER TABLE " + table + " ADD COLUMNS ( " + ' , '.join(missing_col_array) + ')')

        table_schema_dict_by_name = dict((d.name, d) for d in table_columns)
        for missing_col in missing_in_df:
            df = df.withColumn(missing_col, lit(None).cast(table_schema_dict_by_name[missing_col].dataType.typeName()))
        # re-order all the columns otherwise the insert bugs
        all_columns = self.sqlContext.table(table).schema.fields
        columns_names = [df[x.name.lower()] for x in all_columns]
        df = df.select(*columns_names)
        return df
6 REPLIES 6

Re: add columns to hive/parquet table

New Contributor

Hi Maurin,

 

Did you ever get a resolution to this issue?  I am experiencing the exact same issue.  I am attempting to programmatically add columns to an existing table when our streaming data contains new attributes.

 

Thanks,

David

 

Re: add columns to hive/parquet table

Contributor

Hi,

I was able to do it with some code. Probably not the best solution but here it is in python :

    def _merge_schema(self, df, db, table):
        df_schema_dict = df.schema.jsonValue()['fields']

        table_columns = self.sqlContext.table(table).schema.fields

        df_schema_list = sorted([x['name'].lower() for x in df_schema_dict])
        table_schema_list = sorted([x.name.lower() for x in table_columns])

        missing_in_table = set(df_schema_list) - set(table_schema_list)
        missing_in_df = set(table_schema_list) - set(df_schema_list)

        df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict))
        missing_col_array = []

        for missing_col in missing_in_table:
            my_type = df_schema_dict_by_name[missing_col]['type']
            missing_col_array.append("`" + missing_col + "` " + my_type)
        if len(missing_col_array) != 0:
            self.sqlContext.sql(
                "ALTER TABLE " + table + " ADD COLUMNS ( " + ' , '.join(missing_col_array) + ')')

        table_schema_dict_by_name = dict((d.name, d) for d in table_columns)
        for missing_col in missing_in_df:
            df = df.withColumn(missing_col, lit(None).cast(table_schema_dict_by_name[missing_col].dataType.typeName()))
        # re-order all the columns otherwise the insert bugs
        all_columns = self.sqlContext.table(table).schema.fields
        columns_names = [df[x.name.lower()] for x in all_columns]
        df = df.select(*columns_names)
        return df

Re: add columns to hive/parquet table

New Contributor
Thanks for the reply! I was able to get it working on Friday by doing the same thing as you. I manually generate the ALTER TABLE statement by sorting the fields and determining which fields are missing in the table and the incoming data frame. Thanks again for confirming that you had to do the same thing!

Re: add columns to hive/parquet table

Contributor

@DWinters

can you elaborate the way you have solved it, cause i just ran in to this issue.
I have 11 columns in data frame(added a timestamp column to 10 columned RDD).
And i have a hive table with complete 11 columns one is partitioned by timestamp.

Re: add columns to hive/parquet table

Explorer

@maurin

 

I am getting the below error

TypeError: 'StructField' object has no attribute '__getitem__' 

 

at 

df_schema_dict_by_name = dict((d['name'], dict(d, index=index)) for (index, d) in enumerate(df_schema_dict))

 How do I rectify this 

Re: add columns to hive/parquet table

Explorer

@DWinters

 

How were you able to overcome this issue? Could you please post a sample solution

Don't have an account?
Coming from Hortonworks? Activate your account here