Member since
06-19-2019
2
Posts
0
Kudos Received
0
Solutions
01-20-2025
04:06 AM
Prerequisites:
This article assumes some knowledge of Spark and Iceberg. These examples make use of Cloudera Data Engineering and Cloudera Data Warehouse. Note: Iceberg branch-based WAP requires Spark 3.5 or above.
Motivation (simulated scenario)
Let's imagine we have a high-traffic table many users use in the platform. Each day, before the start of a work day, a Spark job populates this table with new records.
On most days, this simple architecture will work quite well. However, this morning the Spark job had a data error and wrote out a batch of duplicate records. This data quality issue was reported by one of the users and the data engineers are now trying to fix it. However, during this time, access to the high-traffic table is obviously disrupted.
Of course, data quality checks can be implemented to catch issues like this earlier. However, this still leaves you little time to roll back changes or fix the data issues before users are affected.
Write Audit Publish a pattern
These two characteristics can be used to support write-audit-publish pipelines, which have three separate stages:
Write: The ETL jobs modify the existing datasets (hidden from the main user base)
Audit: The new changes are checked for data quality and other data issues
Publish: Once approved, these changes are pushed out to the main user base
The Iceberg itself supports this pattern natively. However, before we can discuss that in more detail, we'll need to first introduce Iceberg branches:
What are Iceberg branches?
Iceberg branches are an Iceberg feature that allows you to create multiple labeled copies of an Iceberg table – think of this like the table version of a git branch. Each branch is created from a certain point-in-time and can be read and written in isolation.
ALTER TABLE db.table CREATE BRANCH `testbranch` RETAIN 7 DAYS; -- create branch INSERT INTO TABLE db.table.branch_testbranch VALUES ('bob', 'bloggs'); -- write to branch
Unlike full backups, branches are metadata-only copies – the underlying data files are not duplicated. You are still able to modify the data in your branch freely – the original branch will not be affected.
At a later point in time, if you wish to merge the new changes back into the original branch and no other changes have been made since creating the new branch, this can be achieved efficiently by “fast-forwarding” (one of the Iceberg operations supported by Spark and Hive).
What is Iceberg WAP?
Iceberg WAP (branch-based) is a feature designed to natively support the Write-Audit-Publish pattern in Apache Spark. [1]
With Iceberg WAP, we can configure Spark to automatically write to and read from our isolated branch, allowing you to:
Write data to an isolated copy of the table without affecting platform users
Audit the isolated copy of the table before publishing the results to the whole user community
To do this, we first need to set write.wap.enabled to true for any table we want to use with Iceberg WAP:
ALTER TABLE db.table SET TBLPROPERTIES ( 'write.wap.enabled'='true' );
Next, we will need to choose a unique ID for this pipeline run (e.g. job run number, cde-run-13). We'll create a new branch of our table using this ID:
ALTER TABLE db.table CREATE BRANCH `cde-run-13` RETAIN 7 DAYS;
We should now configure the spark.wap.branch parameter to match this ID in every Spark job used to write or audit the new data:
spark.wap.branch=cde-run-13
Now, with these parameters set, Spark jobs will automatically use our isolated copy of the table for both reads and writes, allowing us to make changes and validate those changes without affecting the main branch of the table.
Once you are happy with these changes and wish to publish them to the user community, you can run a fast-forward operation in Spark or Hive to merge the WAP branch into the main branch:
ALTER TABLE db.table EXECUTE FAST-FORWARD 'main' 'cde-run-13'
If the data has data quality issues and you choose not to publish the new version, the original table data will remain unaffected. This gives you time to debug and resolve issues without impacting others.
Example use-case in CDE
In this example, we're going to create the following pipeline using Spark and Airflow in CDE:
It is comprised of 5 dependent stages:
Name
Description
Create WAP branch
Create the WAP branch using a unique job ID (we will use this branch in the next 2 Spark jobs)
Perform ETL
Write new data to the WAP branch
Perform DQ checks
Perform a variety of data quality checks on the new data in the WAP branch, failing if a critical data issue is found
Fast-forward main branch
Following a successful data quality check, fast-forward the main branch of our table to include the new data in the WAP branch
Delete WAP branch
Delete the WAP branch now that the data has been merged into the main branch
Note: If any stage before fast-forward main branch fails, the main branch of the table is not affected.
This pipeline can be created using the Airflow pipeline editor, included in CDE. To access this UI, create a new Airflow job in your CDE Virtual Cluster and select Editor.
Note: Some of these steps may fail if the necessary access and permissions have not been configured (e.g. Airflow connection credentials for CDW not configured).
Create WAP branch
We can create the new branch using a CDW Operator using the run_id as a unique ID like so:
ALTER TABLE db.table CREATE BRANCH `{{ run_id | replace("-","_") }}` RETAIN 7 DAYS;
Note: You will need to configure a CDW connection in Airflow.
Note: The run ID is only unique within a single CDE Virtual Cluster. If you have multiple CDE Virtual Clusters deployed, append a prefix for each Virtual Cluster e.g. `vc1_{{ run_id | replace('-', '_') }}` to ensure it remains unique.
Perform ETL
Set up the Spark job(s) used to populate the table contents and ensure that Override Spark Values are checked. Override the spark.wap.branch parameter and set this parameter to the WAP branch name:
spark.wap.branch={{ run_id | replace("-","_") }}
Note: Whilst this example only includes one table and one ETL job, this pattern can be used across multiple tables and ETL jobs, as long as the WAP branch ID is consistent across all of the tables and jobs.
In this toy example, the ETL job is the following PySpark job:
object PeopleCreateRecords { def main(args: Array[String]) = { if (args.length == 0) { println("Usage: PROG TBL_NAME") System.exit(1) } val tableName = args(0) val spark = SparkSession.builder().appName("people-dq-create-records").getOrCreate() import spark.implicits._ val rand = new scala.util.Random val df = Seq( (rand.nextInt, "Bob", "Bloggs", 167), (rand.nextInt, "Alice", "Bloggs", 133), (rand.nextInt, "Paul", "Smith", 165), (rand.nextInt, "Rose", "Smith", 168), ).toDF("id", "fname", "lname", "height") df.write.mode("append").insertInto(tableName) } }
Perform DQ checks
Similar to the ETL jobs, set up the data validation Spark job(s) with the appropriate value for the spark.wap.branch parameter:
spark.wap.branch={{ run_id | replace("-","_") }}
In this toy example, the data-quality check job is the following (Scala) Spark job using the Deequ data-quality library: [2]
object PeopleDq { def main(args: Array[String]) = { if (args.length == 0) { println("Usage: PROG TBL_NAME") System.exit(1) } val tableName = args(0) val spark = SparkSession.builder().appName("people-dq-check").getOrCreate() val df = spark.table(tableName) val verificationResult = VerificationSuite() .onData(spark.table(tableName)) .addCheck(Check(CheckLevel.Error, "basic checks") .areComplete(Seq("id", "fname", "lname", "height_cm")) .isContainedIn("height_cm", 10, 1000) .isUnique("id")).run() println(s"Check result = ${verificationResult.status}") val resultsForAllConstraints = verificationResult.checkResults .flatMap { case (_, checkResult) => checkResult.constraintResults } resultsForAllConstraints .filter { _.status != ConstraintStatus.Success } .foreach { res => println(s"${res.constraint}: ${res.message.get}") } if (verificationResult.status == CheckStatus.Error) { // Cause application to fail on DQ failure System.exit(2) } } }
Note: Deequ is a Spark library that allows you to perform expressive data quality checks using Spark. Its use is out-of-scope for this article. Feel free to use another library in its place but be careful to ensure that a data quality check failure is surfaced in CDE as a CDE job failure.
Fast-forward main branch
We can fast-forward the main branch to match the WAP branch like so:
ALTER TABLE db.table EXECUTE FAST-FORWARD 'main' '{{ run_id | replace("-","_") }}';
Note: As this job is dependent on the Perform DQ checks job, it will only run if the data quality check job succeeds.
Note: You will need to configure a CDW connection in Airflow.
Note: Whilst it is possible to set up the pipeline to automatically delete the WAP branch on failure, we have chosen not to here to allow data engineers to perform RCA on the WAP dataset.
Delete WAP branch
We can create the new branch using a CDW Operator using the run_id as a unique ID like so:
ALTER TABLE db.table DROP BRANCH `{{ run_id | replace("-","_") }}`;
Note: You will need to configure a CDW connection in Airflow.
References
[1] https://iceberg.apache.org/docs/1.5.0/branching/#audit-branch
[2] https://github.com/awslabs/deequ
... View more
10-14-2022
12:30 PM
Introduction When users interact with services in CDP Public, services generate audits to detail: Which resource the user is attempting to access Action the user is attempting to perform (e.g. SELECT) Result of the policy evaluation (i.e. Allowed or Denied) The time the request was made Administrators can inspect these audits using the Ranger Web UI. The audits are also archived to object storage as JSON-per-line files which can be easily consumed by downstream applications for monitoring purposes. Audits include a wide range of data points that describe the nature of the request and the result of the policy evaluation (see the Apache wiki for a complete list). In our example, we use Flink to read these files and calculate the number of denied requests made by each user within a single session. Flink example The complete project can be found on GitHub [link]. Clone this repository to follow along as we break down the application logic. Data Source The Ranger audits are stored in the Data Lake Storage Location in the following layout: $DATALAKE_STORAGE_LOCATION/ranger/audit |--atlas (service type) |----atlas (service name) |------20221013 |--------atlas_ranger_audit_test-dl-master0.test-env.xxxx-xxxx.cloudera.site.1.log |--------atlas_ranger_audit_test-dl-master0.test-env.xxxx-xxxx.cloudera.site.log ... Where each line in the files contain a JSON object similar to (pretty-printed): { "repo":"cm_kafka", "reqUser":"wdyson", "evtTime":"2022-09-15 11:00:16.271", "action":"publish", "result":1, "event_count":1, ... } Set up required CC: readme.md To follow along with this example, you will need: CDP Environment Streaming Analytics Data Hub cluster You must also ensure that Maven is installed on your developer machine. Once installed, the application can be compiled and packaged using the following command: $ mvn package This command will generate a JAR file under ./target/auditsession-*-jar-with-dependencies.jar – we will need this later. Next, you will need to create the following properties file with parameters that match your environment (replace the values marked with <...>): audit.path=<base_audit_path_starting_with_one_of_s3a://,abfs://,gs://> audit.poll=240 session.duration=600 session.output=kafka kafka.topic=<topic_name> kafka.bootstrap.servers=<bootstrap_servers> kafka.security.protocol=SASL_SSL kafka.sasl.kerberos.service.name=kafka Save this file with a meaningful filename like flink-audit-sessions.properties. Finally, you will need to download a copy of your Kerberos keytab from the Cloudera Control Plane – we will configure Flink to use this keytab for authentication: https://docs.cloudera.com/data-hub/cloud/access-clusters/topics/dh-retrieving-keytabs.html Copy the JAR file, properties file and keytab file to the management node of your Streaming Analytics Data Hub cluster. At this point, we're ready to start the example application. SSH onto the management node of the Streaming Analytics Data Hub cluster and run the following command (replacing the values marked with <...>); $ flink run \ -yD security.kerberos.login.keytab=<path_to_keytab> \ -yD security.kerberos.login.principal=<full_kerberos_principal> \ -yD security.kerberos.login.contexts=KafkaClient \ auditsession-*-jar-with-dependencies.jar \ <properties_file> If everything was configured correctly, the application should appear in the Flink Web UI – accessible from the Data Hub page in the Cloudera Control Plane – and begin to push results to the configured Kafka topic. To understand what's going on, we'll break down and explain each part of the application in the sections below: Read Ranger audits from object storage CC: App.java Before we can analyse the audits, we need to load them from object storage. Flink allows you to read files line-by-line in object storage using the StreamExecutionEnvironment.readFile class method like so: ParameterTool params = ParameterTool.fromPropertiesFile(args[0]); String auditPath = params.getRequired(PARAM_AUDIT_FS_PATH); int auditPollMs = params.getInt(PARAM_AUDIT_FS_POLL_SECONDS)*1000; FileInputFormat<String> format = new TextInputFormat(new Path(auditPath)); format.setNestedFileEnumeration(true); DataStream<String> rawAudits = env .readFile(format, auditPath, FileProcessingMode.PROCESS_CONTINUOUSLY, auditPollMs); This produces a DataStream of String objects (rawAudits) – the lines from the files in the audit directory ordered by modification time. As the audit files are stored in different sub-directories, we must enable enumeration (recursive listing) with format.setNestedFileEnumeration. We've used the FileProcessingMode.PROCESS_CONTINUOUSLY mode here. In this mode, Flink will periodically poll the path for new audit files. Parsing the raw audits CC: App.java Audit.java In our project, we have a simple POJO Java class called Audit which contains the fields included in a typical Ranger audit – the name of these fields match the names used in the JSON objects (more details on the fields can be found in the Apache wiki). The Audit class also includes a static method called Audit.fromJson that uses Jackson to parse the JSON-encoded audits into an Audit object, returning null on failure. We can use this method to parse our DataStream of raw JSON-encoded audits like so: DataStream<Audit> audits = rawAudits .map((rawAudit) -> Audit.fromJson(rawAudit)) .filter((audit) -> audit != null && audit.reqUser != null); Note: Some audits may not include a requesting user – we omit these using the filter expression as we require this field to build a user session. Assign event times to each audit CC: App.java In our application, we group audits into sessions with a 10 minute gap i.e. audits generated by a single user that have all occurred within 10 minutes of at least one other audit in the session. Flink automatically records the time that events are read from a source system (object storage in our case) – this time is known as Processing Time, However, suppose two audits from the same user are read by Flink within a 10 minute interval. Are we going to want to include them in the same user session? These audits may have been created months or years apart, and so, using Processing Time to generate user sessions isn't going to make sense here. Fortunately, each audit includes the time is was created in the evtTime field – this is known as Event Time in Flink. To register the Event Time for a stream of events, we can use the DataStream.assignTimestampsAndWatermarks class method like so: DataStream<Audit> auditsWithTime = audits .assignTimestampsAndWatermarks(WatermarksStrategy .<Audit>forBoundedOutOfOrderness(Duration.ofDays(2)) .withTimestampAssigner((e, t) -> e.evtTime.getTime()); The timestamp assigner at the bottom of the expression speaks for itself, returning the evtTime field without modification. However, the WatermarkStrategy may take a bit more explaining: Let's imagine we need to answer the following question: "How many audits have occurred before Wednesday this week?" If audits don't arrive in-order, how can we be sure we've received all of the audits that have occurred before Wednesday? Flink answers this question with a Watermarks. A watermark is a monotonically increasing timestamp that marks the minimum Event Time expected for new events that are yet to be read. For the question above, Flink will wait until the watermark has reached Wednesday before committing the result. In the expression above, we've set our WatermarkStrategy to a bounded out-of-orderness of 2 days. With this strategy, Flink track the watermark for the audits 2 days behind the maximum Event Time observed so far. This strategy is quite effective in our case as: Services write at least one audit file per day Files reads files in object storage in the date modified order Meaning that audits will be read in roughly date order. Create session windows using Event Time CC: App.java DeniedCountAggregate.java WrapUserAndWindowWithCount.java UserSessionCountResult.java In our application, we are counting the number of denied audits generated by each user within a single user session – keeping things nice and simple. We do this by: Setting the key of the stream to the reqUser field – the requesting user that triggered the audit Windowing audits into sessions using the Event Time Aggregating the windows using DeniedCountAggregate and andWrapUserAndWindowWithCount classes DataStream<UserSessionCountResult> userSessionDeniedAuditCounts = auditsWithTime .keyBy((audit) -> audit.reqUser) .window(EventTimeSessionWindows.withGap(Time.seconds(sessionGapSeconds))) .aggregate(new DeniedCountAggregate(), new WrapUserAndWindowWithCount()) .filter((res) -> res.count != 0); DeniedCountAggregate implements AggregateFunction<> and it keeps a count of the number of denied audits found in the user session so far. WrapUserAndWindowWithCount implements ProcessWindowFunction<> and it takes the final count produced for each user session and wraps it in an object that includes the requesting user and details about the user session window. This class outputs the final objects returned by the DataStream. Write the results to Kafka CC: App.java Once generated, we output the results to Kafka. The steps required to write our final results stream to Kafka will look something like so: Properties kafkaProps = readKafkaProperties(PARAM_KAFKA_PREFIX); String bootstrapServers = kafkaProps.getProperty("bootstrap.servers"); String topic =kafkaProps.getProperty("topic"); KafkaSink<String> sink = KafkaSink.<String>builder() .setBootstrapServers(bootstrapServers) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(topic) .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setKafkaProducerConfig(kafkaProps) .build(); userSessionDeniedAuditCounts .map((res) -> String.format("user='%s' denies=%d start=%d end=%d", res.reqUser, res.count, res.window.getStart(), res.window.getEnd())) .sinkTo(sink); Note: This example uses the new KafkaSink class, included in the latest version of the Cloudera Runtime. Exercises Modify the aggregator so that it counts the total number of Hive audits instead of denied audits Filter all results that contain a count of less than 5 before writing the stream to Kafka Change the test case in TestAuditSession.java to pass with the new modifications Note: Some of these changes will cause the existing tests to fail. You can skip the tests like so: $ mvn package -DskipTests Resources [link] Example code used in this article [link] Cloudera Flink tutorials [link] Flink documentation on watermarks
... View more