Created on 08-27-2021 02:24 PM - edited on 09-06-2021 01:59 AM by subratadas
Recently I was tasked to build a possible pattern regarding how to handle slowing changing dimensions (Type 2 to be specific) within CDP. The obvious answer is to use Hive ACID, but clearly, that answer is too generic. I needed to build a pipeline similar to what I used to do as an Informatica developer and verify legitimacy to the solution/pattern. Here we go.
The objective was to build a possible (one of many) pattern on how to handle SCD Type 2 dimensions with CDP. Outcome: I was able to repeat a typical ETL workflow with CDP. How? By taking off my horse blinders...Really.
What are SCDs?
For those who may not be family with slowly changing dimensions within a EDW context, here is a great quick read: Types Of Dimension Tables
Perfect. This article runs through a demo using CDP to do exactly that.
Assets required to produce this demo:
Airflow will orchestrate the workflow. First, a CDE Spark job will pick up the product changes from s3. Spark will perform the required ETL and then write the output to a staging table (Hive External). Lastly, using the new Airflow CDW operator; a Hive ACID merge will be executed between the staging table and product dimension. Not rocket science. I know.
For this demo, there is a single dimension table; product. The other table is product_ext, which is an external table with raw data which needs to be applied to the product dimension. Very common stuff.
Product DDL
Note: Hive 3 tables are by default internal full acid support
CREATE TABLE IF NOT EXISTS product(
product_id INT,
product_name STRING,
aisle_id int,
department_id int,
start_date date,
end_date date,
is_current string DEFAULT 'Y')
Product_ext Schema
Note: Replace <YOUR-S3-BUCKET> with your s3 bucket.
CREATE EXTERNAL TABLE IF NOT EXISTS product_ext(
product_id INT,
product_name STRING,
aisle_id int,
department_id int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION 's3a://<YOUR-S3-BUCKET>/sunman/products/'
tblproperties ("skip.header.line.count"="1");
Load the Production dimension table
insert into product
select product_id, product_name, aisle_id, department_id, current_date() as start_date, null as end_date, 'Y' as is_current from product_ext
Lastly, upload the product_changes.psv file to s3
Note: Replace <YOUR-S3-BUCKET> with your s3 bucket
s3a://<YOUR-S3-BUCKET>/sunman/product_changes/
Recap: A product dimension table has been created. A file with changes that need to be applied against the product dimension table has been uploaded to s3.
Via CDW, a Hive ACID merge statement will merge the staging table with the product dimension. This will be triggered by Airflow using the new CDW operator. More on that later.
Grab the JDBC URL from the virtual warehouse
Note: SSO must be disabled
For example:
jdbc:hive2://zzz.cloudera.site/default;transportMode=http;httpPath=cliservice;socketTimeout=60;ssl=true;retries=3;
To execute a CDW HQL statement(s), an airflow connection to CDW is required. The connection is referenced in the airflow dag, more on that later.
How to create a CDW Airflow connection: Automating data pipelines using Apache Airflow in Cloudera Data Engineering.
Important: Make note of the conn Id. It will be used later in this article
This job reads product_changes.psv file (contains the changes which need to be applied to the product dimension), performs cleansing/ETL, and stages the changes as an external Hive table.
The code is available on my Github page. If you're not interested in building the code, no worries. I have also provided the Spark jar which can be downloaded instead.
https://github.com/sunileman/spark-CDE-SCD/blob/master/target/scala-2.11/spark-scd_2.11-0.1.jar
Note: For this demo to work, all job specs/args/configs must match the screen shot below
Note: Update <YOUR-S3-BUCKET> with your s3 bucket
The airflow dag flow is the following:
First, a Spark CDE job will be called to stage the product changes into an external Hive table. Then CDW will be called to perform the Hive ACID Merge between the product dimension and staging table. The code for the dag is here:
https://github.com/sunileman/airflow-scd-dag
The dag file is airflow-scd.py
Open it and update the cli_conn_id. This is the Airflow CDW connection created earlier:
##https://docs.cloudera.com/data-engineering/cloud/manage-jobs/topics/cde-airflow-dag-pipeline.html
hive_cdw_job = CDWOperator(
task_id='cdw_hive_job',
dag=dag,
cli_conn_id='<YOUR-AIRFLOW-CDW-CONNECTION-ID>',
hql=hive_merge,
schema='default',
use_proxy_user=False,
query_isolation=False
)
Generally, airflow jobs can be executed through the UI. Since I parametrized the dag, at this time the only way to execute airflow job with run time configs is through CLI.
Download CDE CLI: Using the Cloudera Data Engineering command line interface
Note: Update <YOUR-S3-BUCKET> with your s3 bucket name
./cde job run --config c_stageTable='product_staged' --config c_sourceLoc='s3a://<YOUR-S3-BUCKET>/sunman/product_changes/' --config c_stageCleansedTable=product_staged_cleansed --config c_dimTable=product --name EDW-SCD-WorkFlow
This will return an airflow job ID. Return to the CDE UI. After the dag completes, run the following:
SELECT * FROM PRODUCT WHERE PRODUCT_ID = 232
Product ID 232 is one of several products, which was updated.
This end-to-end SCD pipeline demonstrated the capability to use the right tool for the right job to accomplish a highly important EDW task. All driven by using Airflow's super powerful orchestration engine to hand off work between Apache Spark and Apache Hive. Spark for data processing. Hive for EDW (Acid Merge).