Created 07-11-2018 06:38 PM
I have a hive table (in the glue metastore in AWS) like this:
CREATE EXTERNAL TABLE `events_keyed`( `source_file_name` string, `ingest_timestamp` timestamp, ... `time_of_event` int ...) PARTITIONED BY ( `time_of_event_day` date) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'my_location' TBLPROPERTIES ( 'PARQUET.COMPRESSION'='SNAPPY', 'transient_lastDdlTime'='1531187782')
I want to append data to it from spark:
val deviceData = hiveContext.table(deviceDataDBName + "." + deviceDataTableName)
val incrementalKeyed = sqlDFProdDedup.join(broadcast(deviceData),
$"prod_clean.endpoint_id" === $"$deviceDataTableName.endpoint_id"
&& $"prod_clean.time_of_event" >= $"$deviceDataTableName.start_dt_unix"
&& $"prod_clean.time_of_event" <= coalesce($"$deviceDataTableName.end_dt_unix"),
"inner")
.select(
$"prod_clean.source_file_name",
$"prod_clean.ingest_timestamp",
...
$"prod_clean.time_of_event",
...
$"prod_clean.time_of_event_day"
)
// this show good data:
incrementalKeyed.show(20, false)
incrementalKeyed.repartition($"time_of_event_day")
.write
.partitionBy("time_of_event_day")
.format("hive")
.mode("append")
.saveAsTable(outputDBName + "." + outputTableName + "_keyed")
But this gives me a failure:
Exception encountered reading prod data:
org.apache.spark.SparkException: Requested partitioning does not match the events_keyed table:
Requested partitions:
Table partitions: time_of_event_day
What am I doing wrong? How can I accomplish the append operation I'm trying to get?
Created 07-12-2018 01:58 PM
I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.
Created 07-12-2018 09:23 AM
@Zack Riesland Can you provide schema of sqlDFProdDedup and deviceData dataframes ?
Created 07-12-2018 10:29 AM
Thanks @hmatta
Printing schema for sqlDFProdDedup: root |-- time_of_event_day: date (nullable = true) |-- endpoint_id: integer (nullable = true) ... |-- time_of_event: integer (nullable = true) ... |-- source_file_name: string (nullable = true) Printing schema for deviceData: root ... |-- endpoint_id: integer (nullable = true) |-- source_file_name: string (nullable = true) ... |-- start_dt_unix: long (nullable = true) |-- end_dt_unix: long (nullable = true) Printing schema for incrementalKeyed (result of joining 2 sets above): root |-- source_file_name: string (nullable = true) |-- ingest_timestamp: timestamp (nullable = false) ... |-- endpoint_id: integer (nullable = true) ... |-- time_of_event: integer (nullable = true) ... |-- time_of_event_day: date (nullable = true)
Created 07-12-2018 01:58 PM
I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.