Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Help with spark partition syntax (scala)

Solved Go to solution
Highlighted

Help with spark partition syntax (scala)

Super Collaborator

I have a hive table (in the glue metastore in AWS) like this:

  CREATE EXTERNAL TABLE `events_keyed`(
  `source_file_name` string, 
  `ingest_timestamp` timestamp, 
   ...
  `time_of_event` int
  ...)
PARTITIONED BY ( 
  `time_of_event_day` date)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'my_location'
TBLPROPERTIES (
  'PARQUET.COMPRESSION'='SNAPPY', 
  'transient_lastDdlTime'='1531187782')

I want to append data to it from spark:

val deviceData = hiveContext.table(deviceDataDBName + "." + deviceDataTableName)
val incrementalKeyed = sqlDFProdDedup.join(broadcast(deviceData),
    $"prod_clean.endpoint_id" === $"$deviceDataTableName.endpoint_id"
    && $"prod_clean.time_of_event" >= $"$deviceDataTableName.start_dt_unix"
      && $"prod_clean.time_of_event" <= coalesce($"$deviceDataTableName.end_dt_unix"),
"inner")
.select(
    $"prod_clean.source_file_name",
    $"prod_clean.ingest_timestamp",
    ...
    $"prod_clean.time_of_event",
    ...
    $"prod_clean.time_of_event_day"
)
// this show good data:
incrementalKeyed.show(20, false)
incrementalKeyed.repartition($"time_of_event_day")
  .write
  .partitionBy("time_of_event_day")
  .format("hive")
  .mode("append")
  .saveAsTable(outputDBName + "." + outputTableName + "_keyed")
But this gives me a failure: Exception encountered reading prod data: org.apache.spark.SparkException: Requested partitioning does not match the events_keyed table: Requested partitions: Table partitions: time_of_event_day What am I doing wrong? How can I accomplish the append operation I'm trying to get?
1 ACCEPTED SOLUTION

Accepted Solutions

Re: Help with spark partition syntax (scala)

Super Collaborator

I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.

3 REPLIES 3

Re: Help with spark partition syntax (scala)

Cloudera Employee

@Zack Riesland Can you provide schema of sqlDFProdDedup and deviceData dataframes ?

Re: Help with spark partition syntax (scala)

Super Collaborator

Thanks @hmatta

Printing schema for sqlDFProdDedup:
root
 |-- time_of_event_day: date (nullable = true)
 |-- endpoint_id: integer (nullable = true)
 ...
 |-- time_of_event: integer (nullable = true)
 ...
 |-- source_file_name: string (nullable = true)
Printing schema for deviceData:
root
...
 |-- endpoint_id: integer (nullable = true)
 |-- source_file_name: string (nullable = true)
 ...
 |-- start_dt_unix: long (nullable = true)
 |-- end_dt_unix: long (nullable = true)
Printing schema for incrementalKeyed (result of joining 2 sets above):
root
 |-- source_file_name: string (nullable = true)
 |-- ingest_timestamp: timestamp (nullable = false)
 ...
 |-- endpoint_id: integer (nullable = true)
 ...
 |-- time_of_event: integer (nullable = true)
...
 |-- time_of_event_day: date (nullable = true)

Re: Help with spark partition syntax (scala)

Super Collaborator

I was able to get this to work by using the insertInto() function, rather than the saveAsTable() function.