Created 07-11-2018 06:38 PM
I have a hive table (in the glue metastore in AWS) like this:
CREATE EXTERNAL TABLE `events_keyed`( `source_file_name` string, `ingest_timestamp` timestamp, ... `time_of_event` int ...) PARTITIONED BY ( `time_of_event_day` date) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'my_location' TBLPROPERTIES ( 'PARQUET.COMPRESSION'='SNAPPY', 'transient_lastDdlTime'='1531187782')
I want to append data to it from spark:
val deviceData = hiveContext.table(deviceDataDBName + "." + deviceDataTableName) val incrementalKeyed = sqlDFProdDedup.join(broadcast(deviceData), $"prod_clean.endpoint_id" === $"$deviceDataTableName.endpoint_id" && $"prod_clean.time_of_event" >= $"$deviceDataTableName.start_dt_unix" && $"prod_clean.time_of_event" <= coalesce($"$deviceDataTableName.end_dt_unix"), "inner") .select( $"prod_clean.source_file_name", $"prod_clean.ingest_timestamp", ... $"prod_clean.time_of_event", ... $"prod_clean.time_of_event_day" ) // this show good data: incrementalKeyed.show(20, false) incrementalKeyed.repartition($"time_of_event_day") .write .partitionBy("time_of_event_day") .format("hive") .mode("append") .saveAsTable(outputDBName + "." + outputTableName + "_keyed")But this gives me a failure: Exception encountered reading prod data: org.apache.spark.SparkException: Requested partitioning does not match the events_keyed table: Requested partitions: Table partitions: time_of_event_day What am I doing wrong? How can I accomplish the append operation I'm trying to get?
Created 07-12-2018 01:58 PM
I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.
Created 07-12-2018 09:23 AM
@Zack Riesland Can you provide schema of sqlDFProdDedup and deviceData dataframes ?
Created 07-12-2018 10:29 AM
Thanks @hmatta
Printing schema for sqlDFProdDedup: root |-- time_of_event_day: date (nullable = true) |-- endpoint_id: integer (nullable = true) ... |-- time_of_event: integer (nullable = true) ... |-- source_file_name: string (nullable = true) Printing schema for deviceData: root ... |-- endpoint_id: integer (nullable = true) |-- source_file_name: string (nullable = true) ... |-- start_dt_unix: long (nullable = true) |-- end_dt_unix: long (nullable = true) Printing schema for incrementalKeyed (result of joining 2 sets above): root |-- source_file_name: string (nullable = true) |-- ingest_timestamp: timestamp (nullable = false) ... |-- endpoint_id: integer (nullable = true) ... |-- time_of_event: integer (nullable = true) ... |-- time_of_event_day: date (nullable = true)
Created 07-12-2018 01:58 PM
I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.