Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Help with spark partition syntax (scala)

avatar
Super Collaborator

I have a hive table (in the glue metastore in AWS) like this:

  CREATE EXTERNAL TABLE `events_keyed`(
  `source_file_name` string, 
  `ingest_timestamp` timestamp, 
   ...
  `time_of_event` int
  ...)
PARTITIONED BY ( 
  `time_of_event_day` date)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'my_location'
TBLPROPERTIES (
  'PARQUET.COMPRESSION'='SNAPPY', 
  'transient_lastDdlTime'='1531187782')

I want to append data to it from spark:

val deviceData = hiveContext.table(deviceDataDBName + "." + deviceDataTableName)
val incrementalKeyed = sqlDFProdDedup.join(broadcast(deviceData),
    $"prod_clean.endpoint_id" === $"$deviceDataTableName.endpoint_id"
    && $"prod_clean.time_of_event" >= $"$deviceDataTableName.start_dt_unix"
      && $"prod_clean.time_of_event" <= coalesce($"$deviceDataTableName.end_dt_unix"),
"inner")
.select(
    $"prod_clean.source_file_name",
    $"prod_clean.ingest_timestamp",
    ...
    $"prod_clean.time_of_event",
    ...
    $"prod_clean.time_of_event_day"
)
// this show good data:
incrementalKeyed.show(20, false)
incrementalKeyed.repartition($"time_of_event_day")
  .write
  .partitionBy("time_of_event_day")
  .format("hive")
  .mode("append")
  .saveAsTable(outputDBName + "." + outputTableName + "_keyed")
But this gives me a failure: Exception encountered reading prod data: org.apache.spark.SparkException: Requested partitioning does not match the events_keyed table: Requested partitions: Table partitions: time_of_event_day What am I doing wrong? How can I accomplish the append operation I'm trying to get?
1 ACCEPTED SOLUTION

avatar
Super Collaborator

I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.

View solution in original post

3 REPLIES 3

avatar
Contributor

@Zack Riesland Can you provide schema of sqlDFProdDedup and deviceData dataframes ?

avatar
Super Collaborator

Thanks @hmatta

Printing schema for sqlDFProdDedup:
root
 |-- time_of_event_day: date (nullable = true)
 |-- endpoint_id: integer (nullable = true)
 ...
 |-- time_of_event: integer (nullable = true)
 ...
 |-- source_file_name: string (nullable = true)
Printing schema for deviceData:
root
...
 |-- endpoint_id: integer (nullable = true)
 |-- source_file_name: string (nullable = true)
 ...
 |-- start_dt_unix: long (nullable = true)
 |-- end_dt_unix: long (nullable = true)
Printing schema for incrementalKeyed (result of joining 2 sets above):
root
 |-- source_file_name: string (nullable = true)
 |-- ingest_timestamp: timestamp (nullable = false)
 ...
 |-- endpoint_id: integer (nullable = true)
 ...
 |-- time_of_event: integer (nullable = true)
...
 |-- time_of_event_day: date (nullable = true)

avatar
Super Collaborator

I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.