Support Questions

Find answers, ask questions, and share your expertise
Announcements
Welcome to the upgraded Community! Read this blog to see What’s New!

Help with spark partition syntax (scala)

avatar
Super Collaborator

I have a hive table (in the glue metastore in AWS) like this:

  CREATE EXTERNAL TABLE `events_keyed`(
  `source_file_name` string, 
  `ingest_timestamp` timestamp, 
   ...
  `time_of_event` int
  ...)
PARTITIONED BY ( 
  `time_of_event_day` date)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'my_location'
TBLPROPERTIES (
  'PARQUET.COMPRESSION'='SNAPPY', 
  'transient_lastDdlTime'='1531187782')

I want to append data to it from spark:

val deviceData = hiveContext.table(deviceDataDBName + "." + deviceDataTableName)
val incrementalKeyed = sqlDFProdDedup.join(broadcast(deviceData),
    $"prod_clean.endpoint_id" === $"$deviceDataTableName.endpoint_id"
    && $"prod_clean.time_of_event" >= $"$deviceDataTableName.start_dt_unix"
      && $"prod_clean.time_of_event" <= coalesce($"$deviceDataTableName.end_dt_unix"),
"inner")
.select(
    $"prod_clean.source_file_name",
    $"prod_clean.ingest_timestamp",
    ...
    $"prod_clean.time_of_event",
    ...
    $"prod_clean.time_of_event_day"
)
// this show good data:
incrementalKeyed.show(20, false)
incrementalKeyed.repartition($"time_of_event_day")
  .write
  .partitionBy("time_of_event_day")
  .format("hive")
  .mode("append")
  .saveAsTable(outputDBName + "." + outputTableName + "_keyed")
But this gives me a failure: Exception encountered reading prod data: org.apache.spark.SparkException: Requested partitioning does not match the events_keyed table: Requested partitions: Table partitions: time_of_event_day What am I doing wrong? How can I accomplish the append operation I'm trying to get?
1 ACCEPTED SOLUTION

avatar
Super Collaborator

I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.

View solution in original post

3 REPLIES 3

avatar
Cloudera Employee

@Zack Riesland Can you provide schema of sqlDFProdDedup and deviceData dataframes ?

avatar
Super Collaborator

Thanks @hmatta

Printing schema for sqlDFProdDedup:
root
 |-- time_of_event_day: date (nullable = true)
 |-- endpoint_id: integer (nullable = true)
 ...
 |-- time_of_event: integer (nullable = true)
 ...
 |-- source_file_name: string (nullable = true)
Printing schema for deviceData:
root
...
 |-- endpoint_id: integer (nullable = true)
 |-- source_file_name: string (nullable = true)
 ...
 |-- start_dt_unix: long (nullable = true)
 |-- end_dt_unix: long (nullable = true)
Printing schema for incrementalKeyed (result of joining 2 sets above):
root
 |-- source_file_name: string (nullable = true)
 |-- ingest_timestamp: timestamp (nullable = false)
 ...
 |-- endpoint_id: integer (nullable = true)
 ...
 |-- time_of_event: integer (nullable = true)
...
 |-- time_of_event_day: date (nullable = true)

avatar
Super Collaborator

I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.

Labels