Created on
01-20-2025
04:06 AM
- edited on
01-24-2025
12:45 AM
by
VidyaSargur
This article assumes some knowledge of Spark and Iceberg. These examples make use of Cloudera Data Engineering and Cloudera Data Warehouse.
Note: Iceberg branch-based WAP requires Spark 3.5 or above.
Let's imagine we have a high-traffic table many users use in the platform. Each day, before the start of a work day, a Spark job populates this table with new records.
On most days, this simple architecture will work quite well. However, this morning the Spark job had a data error and wrote out a batch of duplicate records. This data quality issue was reported by one of the users and the data engineers are now trying to fix it. However, during this time, access to the high-traffic table is obviously disrupted.
Of course, data quality checks can be implemented to catch issues like this earlier. However, this still leaves you little time to roll back changes or fix the data issues before users are affected.
These two characteristics can be used to support write-audit-publish pipelines, which have three separate stages:
The Iceberg itself supports this pattern natively. However, before we can discuss that in more detail, we'll need to first introduce Iceberg branches:
Iceberg branches are an Iceberg feature that allows you to create multiple labeled copies of an Iceberg table – think of this like the table version of a git branch. Each branch is created from a certain point-in-time and can be read and written in isolation.
ALTER TABLE db.table CREATE BRANCH `testbranch` RETAIN 7 DAYS; -- create branch
INSERT INTO TABLE db.table.branch_testbranch VALUES ('bob', 'bloggs'); -- write to branch
Unlike full backups, branches are metadata-only copies – the underlying data files are not duplicated. You are still able to modify the data in your branch freely – the original branch will not be affected.
At a later point in time, if you wish to merge the new changes back into the original branch and no other changes have been made since creating the new branch, this can be achieved efficiently by “fast-forwarding” (one of the Iceberg operations supported by Spark and Hive).
Iceberg WAP (branch-based) is a feature designed to natively support the Write-Audit-Publish pattern in Apache Spark. [1]
With Iceberg WAP, we can configure Spark to automatically write to and read from our isolated branch, allowing you to:
To do this, we first need to set write.wap.enabled to true for any table we want to use with Iceberg WAP:
ALTER TABLE db.table SET TBLPROPERTIES (
'write.wap.enabled'='true'
);
Next, we will need to choose a unique ID for this pipeline run (e.g. job run number, cde-run-13). We'll create a new branch of our table using this ID:
ALTER TABLE db.table CREATE BRANCH `cde-run-13` RETAIN 7 DAYS;
We should now configure the spark.wap.branch parameter to match this ID in every Spark job used to write or audit the new data:
spark.wap.branch=cde-run-13
Now, with these parameters set, Spark jobs will automatically use our isolated copy of the table for both reads and writes, allowing us to make changes and validate those changes without affecting the main branch of the table.
Once you are happy with these changes and wish to publish them to the user community, you can run a fast-forward operation in Spark or Hive to merge the WAP branch into the main branch:
ALTER TABLE db.table EXECUTE FAST-FORWARD 'main' 'cde-run-13'
If the data has data quality issues and you choose not to publish the new version, the original table data will remain unaffected. This gives you time to debug and resolve issues without impacting others.
In this example, we're going to create the following pipeline using Spark and Airflow in CDE:
It is comprised of 5 dependent stages:
Name | Description |
Create WAP branch | Create the WAP branch using a unique job ID (we will use this branch in the next 2 Spark jobs) |
Perform ETL | Write new data to the WAP branch |
Perform DQ checks | Perform a variety of data quality checks on the new data in the WAP branch, failing if a critical data issue is found |
Fast-forward main branch | Following a successful data quality check, fast-forward the main branch of our table to include the new data in the WAP branch |
Delete WAP branch | Delete the WAP branch now that the data has been merged into the main branch |
Note: If any stage before fast-forward main branch fails, the main branch of the table is not affected.
This pipeline can be created using the Airflow pipeline editor, included in CDE. To access this UI, create a new Airflow job in your CDE Virtual Cluster and select Editor.
Note: Some of these steps may fail if the necessary access and permissions have not been configured (e.g. Airflow connection credentials for CDW not configured).
We can create the new branch using a CDW Operator using the run_id as a unique ID like so:
ALTER TABLE
db.table
CREATE BRANCH
`{{ run_id | replace("-","_") }}`
RETAIN 7 DAYS;
Note: You will need to configure a CDW connection in Airflow.
Note: The run ID is only unique within a single CDE Virtual Cluster. If you have multiple CDE Virtual Clusters deployed, append a prefix for each Virtual Cluster e.g. `vc1_{{ run_id | replace('-', '_') }}` to ensure it remains unique.
Set up the Spark job(s) used to populate the table contents and ensure that Override Spark Values are checked. Override the spark.wap.branch parameter and set this parameter to the WAP branch name:
spark.wap.branch={{ run_id | replace("-","_") }}
Note: Whilst this example only includes one table and one ETL job, this pattern can be used across multiple tables and ETL jobs, as long as the WAP branch ID is consistent across all of the tables and jobs.
In this toy example, the ETL job is the following PySpark job:
object PeopleCreateRecords {
def main(args: Array[String]) = {
if (args.length == 0) {
println("Usage: PROG TBL_NAME")
System.exit(1)
}
val tableName = args(0)
val spark = SparkSession.builder().appName("people-dq-create-records").getOrCreate()
import spark.implicits._
val rand = new scala.util.Random
val df = Seq(
(rand.nextInt, "Bob", "Bloggs", 167),
(rand.nextInt, "Alice", "Bloggs", 133),
(rand.nextInt, "Paul", "Smith", 165),
(rand.nextInt, "Rose", "Smith", 168),
).toDF("id", "fname", "lname", "height")
df.write.mode("append").insertInto(tableName)
}
}
Similar to the ETL jobs, set up the data validation Spark job(s) with the appropriate value for the spark.wap.branch parameter:
spark.wap.branch={{ run_id | replace("-","_") }}
In this toy example, the data-quality check job is the following (Scala) Spark job using the Deequ data-quality library: [2]
object PeopleDq {
def main(args: Array[String]) = {
if (args.length == 0) {
println("Usage: PROG TBL_NAME")
System.exit(1)
}
val tableName = args(0)
val spark = SparkSession.builder().appName("people-dq-check").getOrCreate()
val df = spark.table(tableName)
val verificationResult = VerificationSuite()
.onData(spark.table(tableName))
.addCheck(Check(CheckLevel.Error, "basic checks")
.areComplete(Seq("id", "fname", "lname", "height_cm"))
.isContainedIn("height_cm", 10, 1000)
.isUnique("id")).run()
println(s"Check result = ${verificationResult.status}")
val resultsForAllConstraints = verificationResult.checkResults
.flatMap { case (_, checkResult) => checkResult.constraintResults }
resultsForAllConstraints
.filter { _.status != ConstraintStatus.Success }
.foreach { res => println(s"${res.constraint}: ${res.message.get}") }
if (verificationResult.status == CheckStatus.Error) {
// Cause application to fail on DQ failure
System.exit(2)
}
}
}
Note: Deequ is a Spark library that allows you to perform expressive data quality checks using Spark. Its use is out-of-scope for this article. Feel free to use another library in its place but be careful to ensure that a data quality check failure is surfaced in CDE as a CDE job failure.
We can fast-forward the main branch to match the WAP branch like so:
ALTER TABLE
db.table
EXECUTE FAST-FORWARD
'main'
'{{ run_id | replace("-","_") }}';
Note: As this job is dependent on the Perform DQ checks job, it will only run if the data quality check job succeeds.
Note: You will need to configure a CDW connection in Airflow.
Note: Whilst it is possible to set up the pipeline to automatically delete the WAP branch on failure, we have chosen not to here to allow data engineers to perform RCA on the WAP dataset.
We can create the new branch using a CDW Operator using the run_id as a unique ID like so:
ALTER TABLE
db.table
DROP BRANCH
`{{ run_id | replace("-","_") }}`;
Note: You will need to configure a CDW connection in Airflow.
[1] https://iceberg.apache.org/docs/1.5.0/branching/#audit-branch