Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Master Guru

2021-08-27_15-14-29.jpg

Recently I was tasked to build a possible pattern regarding how to handle slowing changing dimensions (Type 2 to be specific) within CDP. The obvious answer is to use Hive ACID, but clearly, that answer is too generic. I needed to build a pipeline similar to what I used to do as an Informatica developer and verify legitimacy to the solution/pattern. Here we go.

Why?

The objective was to build a possible (one of many) pattern on how to handle SCD Type 2 dimensions with CDP.  Outcome: I was able to repeat a typical ETL workflow with CDP.  How? By taking off my horse blinders...Really. 

What are SCDs?

For those who may not be family with slowly changing dimensions within a EDW context, here is a great quick read: Types Of Dimension Tables

How?
  • Data lands in a sourcing area
    • Super typical
  • Pull source data & run it through a cleaning/processing layer for staging
    • Ready for merging against SCD table
  • Run an ACID merge between stage and SCD Table
    • Not rocket science
I don't believe you...I wanna try this in CDP Public Cloud or Private Cloud

Perfect. This article runs through a demo using CDP to do exactly that.  

What is required for this demo?

  • CDP Public Cloud Account
  • Cloudera Data Engineering (CDE)...which includes airflow
  • Cloudera Data Warehouse (HiveLLAP)

Assets required to produce this demo:

Workflow Details

Airflow will orchestrate the workflow. First, a CDE Spark job will pick up the product changes from s3. Spark will perform the required ETL and then write the output to a staging table (Hive External). Lastly, using the new Airflow CDW operator; a Hive ACID merge will be executed between the staging table and product dimension. Not rocket science. I know.

2021-08-30_11-06-06.jpg

Data/Tables Setup

For this demo, there is a single dimension table; product. The other table is product_ext, which is an external table with raw data which needs to be applied to the product dimension. Very common stuff. 

Product DDL

Note: Hive 3 tables are by default internal full acid support

 

CREATE TABLE IF NOT EXISTS product(
  product_id INT,
  product_name STRING,
  aisle_id int,
  department_id int,
  start_date date,
  end_date date,
  is_current string DEFAULT 'Y')

 

Product_ext Schema

Note: Replace <YOUR-S3-BUCKET> with your s3 bucket.  

 

CREATE EXTERNAL TABLE IF NOT EXISTS product_ext(
  product_id INT,
  product_name STRING,
  aisle_id int,
  department_id int)
  ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
  STORED AS TEXTFILE
  LOCATION 's3a://<YOUR-S3-BUCKET>/sunman/products/'
  tblproperties ("skip.header.line.count"="1");

 

Load the Production dimension table

 

insert into product
select product_id, product_name, aisle_id, department_id, current_date() as start_date, null as end_date, 'Y' as is_current from product_ext

 

Lastly, upload the product_changes.psv file to s3

Note: Replace <YOUR-S3-BUCKET> with your s3 bucket

 

s3a://<YOUR-S3-BUCKET>/sunman/product_changes/

 

Recap: A product dimension table has been created. A file with changes that need to be applied against the product dimension table has been uploaded to s3.  

Cloudera Data Warehousing

Via CDW, a Hive ACID merge statement will merge the staging table with the product dimension. This will be triggered by Airflow using the new CDW operator. More on that later.

Grab the JDBC URL from the virtual warehouse

Note: SSO must be disabled

2021-08-27_15-27-20.jpg

 

 

 

 

 

 

 

For example:

 

jdbc:hive2://zzz.cloudera.site/default;transportMode=http;httpPath=cliservice;socketTimeout=60;ssl=true;retries=3;

 

Create an Airflow CDW Connection

To execute a CDW HQL statement(s), an airflow connection to CDW is required. The connection is referenced in the airflow dag, more on that later.

How to create a CDW Airflow connection: Automating data pipelines using Apache Airflow in Cloudera Data Engineering.

 

Important: Make note of the conn Id. It will be used later in this article

Create a CDE Spark Job

This job reads product_changes.psv file (contains the changes which need to be applied to the product dimension), performs cleansing/ETL, and stages the changes as an external Hive table.

The code is available on my Github page. If you're not interested in building the code, no worries. I have also provided the Spark jar which can be downloaded instead.

 

https://github.com/sunileman/spark-CDE-SCD/blob/master/target/scala-2.11/spark-scd_2.11-0.1.jar

 

CDE Spark Job Details

Note: For this demo to work, all job specs/args/configs must match the screen shot below

Note: Update <YOUR-S3-BUCKET> with your s3 bucket

2021-08-27_15-37-29.jpg

Create a CDE Airflow Job

The airflow dag flow is the following: 

2021-08-27_15-58-43.jpg

First, a Spark CDE job will be called to stage the product changes into an external Hive table.  Then CDW will be called to perform the Hive ACID Merge between the product dimension and staging table.  The code for the dag is here:

 

https://github.com/sunileman/airflow-scd-dag

 

The dag file is airflow-scd.py

Open it and update the cli_conn_id. This is the Airflow CDW connection created earlier:

 

##https://docs.cloudera.com/data-engineering/cloud/manage-jobs/topics/cde-airflow-dag-pipeline.html
hive_cdw_job = CDWOperator(
    task_id='cdw_hive_job',
    dag=dag,
    cli_conn_id='<YOUR-AIRFLOW-CDW-CONNECTION-ID>',
    hql=hive_merge,
    schema='default',
    use_proxy_user=False,
    query_isolation=False

)

 

 

2021-08-27_15-46-04.jpg

 

 

 

 

 

 

 

 

 

 

 

 

Run the SCD workflow

Generally, airflow jobs can be executed through the UI. Since I parametrized the dag, at this time the only way to execute airflow job with run time configs is through CLI.

Download CDE CLI: Using the Cloudera Data Engineering command line interface

Note: Update <YOUR-S3-BUCKET> with your s3 bucket name

 

./cde job run --config c_stageTable='product_staged' --config c_sourceLoc='s3a://<YOUR-S3-BUCKET>/sunman/product_changes/' --config c_stageCleansedTable=product_staged_cleansed  --config c_dimTable=product --name EDW-SCD-WorkFlow

 

This will return an airflow job ID. Return to the CDE UI. After the dag completes, run the following:

 

SELECT * FROM PRODUCT WHERE PRODUCT_ID = 232

 

Product ID 232 is one of several products, which was updated.

Wrap It Up!

This end-to-end SCD pipeline demonstrated the capability to use the right tool for the right job to accomplish a highly important EDW task. All driven by using Airflow's super powerful orchestration engine to hand off work between Apache Spark and Apache Hive.  Spark for data processing. Hive for EDW (Acid Merge).

2,735 Views