Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Help with spark partition syntax (scala)

avatar
Super Collaborator

I have a hive table (in the glue metastore in AWS) like this:

  CREATE EXTERNAL TABLE `events_keyed`(
  `source_file_name` string, 
  `ingest_timestamp` timestamp, 
   ...
  `time_of_event` int
  ...)
PARTITIONED BY ( 
  `time_of_event_day` date)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'my_location'
TBLPROPERTIES (
  'PARQUET.COMPRESSION'='SNAPPY', 
  'transient_lastDdlTime'='1531187782')

I want to append data to it from spark:

val deviceData = hiveContext.table(deviceDataDBName + "." + deviceDataTableName)
val incrementalKeyed = sqlDFProdDedup.join(broadcast(deviceData),
    $"prod_clean.endpoint_id" === $"$deviceDataTableName.endpoint_id"
    && $"prod_clean.time_of_event" >= $"$deviceDataTableName.start_dt_unix"
      && $"prod_clean.time_of_event" <= coalesce($"$deviceDataTableName.end_dt_unix"),
"inner")
.select(
    $"prod_clean.source_file_name",
    $"prod_clean.ingest_timestamp",
    ...
    $"prod_clean.time_of_event",
    ...
    $"prod_clean.time_of_event_day"
)
// this show good data:
incrementalKeyed.show(20, false)
incrementalKeyed.repartition($"time_of_event_day")
  .write
  .partitionBy("time_of_event_day")
  .format("hive")
  .mode("append")
  .saveAsTable(outputDBName + "." + outputTableName + "_keyed")
But this gives me a failure: Exception encountered reading prod data: org.apache.spark.SparkException: Requested partitioning does not match the events_keyed table: Requested partitions: Table partitions: time_of_event_day What am I doing wrong? How can I accomplish the append operation I'm trying to get?
1 ACCEPTED SOLUTION

avatar
Super Collaborator

I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.

View solution in original post

3 REPLIES 3

avatar
Contributor

@Zack Riesland Can you provide schema of sqlDFProdDedup and deviceData dataframes ?

avatar
Super Collaborator

Thanks @hmatta

Printing schema for sqlDFProdDedup:
root
 |-- time_of_event_day: date (nullable = true)
 |-- endpoint_id: integer (nullable = true)
 ...
 |-- time_of_event: integer (nullable = true)
 ...
 |-- source_file_name: string (nullable = true)
Printing schema for deviceData:
root
...
 |-- endpoint_id: integer (nullable = true)
 |-- source_file_name: string (nullable = true)
 ...
 |-- start_dt_unix: long (nullable = true)
 |-- end_dt_unix: long (nullable = true)
Printing schema for incrementalKeyed (result of joining 2 sets above):
root
 |-- source_file_name: string (nullable = true)
 |-- ingest_timestamp: timestamp (nullable = false)
 ...
 |-- endpoint_id: integer (nullable = true)
 ...
 |-- time_of_event: integer (nullable = true)
...
 |-- time_of_event_day: date (nullable = true)

avatar
Super Collaborator

I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.