Created on 05-30-2024 05:48 PM - edited on 06-05-2024 09:59 PM by VidyaSargur
CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
Apache Iceberg is an open table format for huge analytic datasets. It provides features that, coupled with Spark as the compute engine, allow you to build data processing pipelines with dramatic gains in terms of scalability, performance, and overall developer productivity.
CDE Provides native Iceberg support. With the release of CDE 1.20, the Spark Runtime has been updated with Apache Iceberg 1.3. This version introduces new features that provide great benefits to Data Engineers.
This article will familiarize you with Iceberg Table Branching and Tagging in CDE.
Requirements
cde resource create --name myFiles --type files
cde resource upload --name myFiles --local-path resources/cell_towers_1.csv --local-path resources/cell_towers_2.csv
cde session create --name icebergSession --type pyspark --mount-1-resource myFiles
cde session interact --name icebergSession
USERNAME = "pauldefusco"
df = spark.read.csv("/app/mount/cell_towers_1.csv", header=True, inferSchema=True)
df.writeTo("CELL_TOWERS_{}".format(USERNAME)).using("iceberg").tableProperty("write.format.default", "parquet").createOrReplace()
# LOAD NEW TRANSACTION BATCH
batchDf = spark.read.csv("/app/mount/cell_towers_2.csv", header=True, inferSchema=True)
batchDf.printSchema()
batchDf.createOrReplaceTempView("BATCH_TEMP_VIEW".format(USERNAME))
# CREATE TABLE BRANCH
spark.sql("ALTER TABLE CELL_TOWERS_{} CREATE BRANCH ingestion_branch".format(USERNAME))
# WRITE DATA OPERATION ON TABLE BRANCH
batchDf.write.format("iceberg").option("branch", "ingestion_branch").mode("append").save("CELL_TOWERS_{}".format(USERNAME))
spark.sql("SELECT * FROM CELL_TOWERS_{};".format(USERNAME)).show()
spark.sql("SELECT * FROM CELL_TOWERS_{} VERSION AS OF 'ingestion_branch';".format(USERNAME)).show()
# QUERY ICEBERG METADATA HISTORY TABLE
spark.sql("SELECT * FROM CELL_TOWERS_{}.snapshots".format(USERNAME)).show(20, False)
The cherrypick_snapshot procedure creates a new snapshot incorporating the changes from another snapshot in a metadata-only operation (no new data files are created). To run the cherrypick_snapshot procedure you need to provide two parameters: the name of the table you’re updating and the ID of the snapshot the table should be updated based on. This transaction will return the snapshot IDs before and after the cherry-pick operation as source_snapshot_id and current_snapshot_id.
You will use the cherrypick operation to commit the changes to the table that were staged in the 'ingestion_branch' branch up until now.
# SHOW PAST BRANCH SNAPSHOT ID'S
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_{}.refs;".format(USERNAME)).show()
# SAVE THE SNAPSHOT ID CORRESPONDING TO THE CREATED BRANCH
branchSnapshotId = spark.sql("SELECT snapshot_id FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_{}.refs WHERE NAME == 'ingestion_branch';".format(USERNAME)).collect()[0][0]
# USE THE PROCEDURE TO CHERRY-PICK THE SNAPSHOT
# THIS IMPLICITLY SETS THE CURRENT TABLE STATE TO THE STATE DEFINED BY THE CHOSEN PRIOR SNAPSHOT ID
spark.sql("CALL spark_catalog.system.cherrypick_snapshot('SPARK_CATALOG.DEFAULT.CELL_TOWERS_{0}',{1})".format(USERNAME, branchSnapshotId))
# VALIDATE THE CHANGES
# THE TABLE ROW COUNT IN THE CURRENT TABLE STATE REFLECTS THE APPEND OPERATION - IT PREVIOSULY ONLY DID BY SELECTING THE BRANCH
spark.sql("SELECT COUNT(*) FROM CELL_TOWERS_{};".format(USERNAME)).show()
Tags are immutable labels for Iceberg Snapshot IDs and can be used to reference a particular table version via a simple tag rather than having to work with Snapshot IDs directly.
spark.sql("ALTER TABLE SPARK_CATALOG.DEFAULT.CELL_TOWERS_{} CREATE TAG businessOrg RETAIN 365 DAYS".format(USERNAME)).show()
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_{} VERSION AS OF 'businessOrg';".format(USERNAME)).show()
The refs metadata table helps you understand and manage your table’s snapshot history and retention policy, making it a crucial part of maintaining data versioning and ensuring that your table’s size is under control. Among its many use cases, the table provides a list of all the named references within an Iceberg table such as Branch names and corresponding Snapshot IDs.
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_{}.refs;".format(USERNAME)).show()
CDE supports Apache Iceberg which provides a table format for huge analytic datasets in the cloud. Iceberg enables you to work with large tables, especially on object stores and supports concurrent reads and writes on all storage media. You can use Cloudera Data Engineering virtual clusters running Spark 3 to interact with Apache Iceberg tables.
The Iceberg Metadata Layer can track snapshots under different paths or give particular snapshots a name. These features are respectively called table branching and tagging. Thanks to them, Iceberg Data Engineers can implement data pipelines with advanced isolation, reproducibility, and experimentation capabilities.