Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar

What is CDC

Change Data Capture (CDC) in Data Warehousing typically refers to the process of capturing changes over time in Dimension tables. Dimension tables qualify Fact tables (measures) by containing information to answer questions around Who (e.g. Customer) , What (e.g. product), Where (e.g. Location), and When (e.g. Time).

This article steps will demonstrate how to implement a very basic and rudimentary solution to CDC in Hadoop using MySQL, Sqoop, Spark, and Hive. It includes basic PySpark code to get you started with using Spark Data Frames. In a real world example you would include audit tables to store information for each run.

 

How to do CDC in Hadoop

In the past, I have worked at a company in which CDC on Hadoop was a big challenge for us. Initially, we were using Hive to merge source changes (Sqoop extracts) with the existing Dimension tables and then building new tables. This worked well until our data volumes starting getting in the hundreds of millions of records. At those larger data volumes, not only did the CDC take longer, but memory and CPU consumption also increased causing resource contention on the Hadoop cluster. The solution was to use HBase and the HBase Bulk Loader to load the changes from the source.

The Hive update functionality was not a solution for our use case.

If you have small data volumes in the millions of records to even tens of millions of records, then Spark could suffice as a solution. The test I did below was performed on a Dimension table containing 5 million existing customer records and merging 1 million changed customer records + 4 new customer records. It took about 1 minute for the Spark code to complete. The cluster was an 8 node AWS cluster (M4 large instances) running HDP 2.4.

The solution below shows how to perform a Type 1 Slowly Changing Dimension (SCD). Type 1 SCD is a Data Warehouse term used to describe a type of CDC in which we overwrite the current value in the Dimension table with the new value from our source.

Again, this is a very basic way to perform CDC and was put together for demo purposes to show how easy it is to use Spark with Parquet files and join with existing Hive tables.

 

Problem:

The CUSTOMER Hive Dimension table needs to capture changes from the source MySQL database. It's currently at 5 million records. 1 Million customer records in the source has changes and 4 new customer records were added to the source.

 

Step 1:

Run Sqoop with the incremental option to get new changes from the source MySQL database and import this into HDFS as a Parquet file

The source MySQL database has the column modified_date. For each run we capture the maximum modified_date so that on the next run we get all records greater (>) than this date. Sqoop will do this automatically with the incremental option. You just specify the column name and give a value.

sqoop import --connect jdbc:mysql://ip-172-31-2-69.us-west-2.compute.internal:3306/mysql --username root --password password -table customer -m4 --target-dir /landing/staging_customer_update_spark --incremental lastmodified --check-column modified_date --last-value "2016-05-23 00:00:00" --as-parquetfile

 

 

Step 2:

Merge the data from the Sqoop extract with the existing Hive CUSTOMER Dimension table. Read the Parquet file extract into a Spark DataFrame and lookup against the Hive table to create a new table. Go to end of article to view the PySpark code with enough comments to explain what the code is doing. This is basic code to demonstrate how easily Spark integrates with Parquet to easily infer a schema and then perform SQL operations on the data.

 

Zeppelin screenshots:

The count of records in the CUSTOMER Dimension table before capturing new updates:

4552-screen-shot-2016-05-25-at-110852-pm.png

Hive query to compare counts of old table with new table:

4553-screen-shot-2016-05-25-at-111252-pm.png

Sample 5 records from the new table and compare with old table. Notice the changes for first_name, last_name, and modified_date:

New data:

246283_1_new.png

What these records looked like before the changes:

246283_2_new.png

PySpark code:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import os
from pyspark.sql import *
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.types import *

## write to snappy compressed output
conf = (SparkConf()
         .setAppName("spark_cdc")
         .set("spark.dynamicAllocation.enabled", "true")
         .set("spark.shuffle.service.enabled", "true")
         .set("spark.rdd.compress", "true"))
sc = SparkContext(conf = conf)

sqlContext = HiveContext(sc)

## read parquet file generated by the Sqoop incremental extract from MySQL and imported into HDFS
path = "hdfs://ip-172-31-2-63.us-west-2.compute.internal:8020/landing/staging_customer_update_spark/*parquet*"
parquetFile = sqlContext.read.parquet(path)
parquetFile.registerTempTable("customer_extract");


sql = "DROP TABLE IF EXISTS customer_update_spark"
sqlContext.sql(sql)

sql = """
CREATE TABLE customer_update_spark
(
 cust_no       int
 ,birth_date    date
 ,first_name    string
 ,last_name     string
 ,gender        string
 ,join_date     date
 ,created_date  timestamp
 ,modified_date timestamp
)
STORED AS PARQUET

"""
sqlContext.sql(sql)


## get those records that did not change
## these are those records from the existing Dimension table that are not in the Sqoop extract

sql = """
INSERT INTO TABLE customer_update_spark
SELECT
a.cust_no,
a.birth_date,
a.first_name,
a.last_name,
a.gender,
a.join_date,
a.created_date,
a.modified_date
FROM customer a LEFT OUTER JOIN customer_extract b ON a.cust_no = b.cust_no
WHERE b.cust_no IS NULL
"""

sqlContext.sql(sql)


## get the changed records from the Parquet extract generated from Sqoop
## the dates in the Parquet file will be in epoch time with milliseconds
## this will be a 13 digit number
## we don't need milliseconds so only get first 10 digits and not all 13

## for birth_date and join date convert to date in format YYYY-MM-DD
## for created_date and modified date convert to format YYYY-MM-DD HH:MI:SS

sql = """
INSERT INTO customer_update_spark
SELECT
a.cust_no,
TO_DATE(FROM_UNIXTIME(CAST(SUBSTR(a.created_date, 1,10) AS INT))) AS birth_date,
a.first_name,
a.last_name,
a.gender,
TO_DATE(FROM_UNIXTIME(CAST(SUBSTR(a.join_date, 1,10) AS INT))) AS join_date,
FROM_UNIXTIME(CAST(SUBSTR(a.created_date, 1,10) AS INT)) AS created_date,
FROM_UNIXTIME(CAST(SUBSTR(a.modified_date, 1,10) AS INT)) AS modified_date
FROM customer_extract a
"""

sqlContext.sql(sql)

246283_3_new.png

246283_4_new.pngscreen-shot-2016-05-25-at-110852-pm.png

49,170 Views
Comments
avatar
Super Collaborator

In my previous project, I had to solve a very similar problem with data being ingested from the GoldenGate interface. I see you are doing several hops before data gets persisted into Hive/HDFS. We had to write several merges before we could put the data into the final state. May be Apache NIFI or some other technology can solve this now in much easier way!!..

avatar

How you are capturing --last-value "2016-05-23 00:00:00". Is it a hard coded value or you are capturing from database ?

avatar
New Contributor

will this code capture records which are deleted from source ?

Its good approach but the only point which I could find as disadvantage is multiple hops to achieve the desired result. Instead of performing joins we can apply windowing function to achieve the same in a single hop assuming you unique value column and last modified date in your scenario.