Created on 05-26-2016 06:43 AM - edited on 02-25-2020 02:46 AM by VidyaSargur
Change Data Capture (CDC) in Data Warehousing typically refers to the process of capturing changes over time in Dimension tables. Dimension tables qualify Fact tables (measures) by containing information to answer questions around Who (e.g. Customer) , What (e.g. product), Where (e.g. Location), and When (e.g. Time).
This article steps will demonstrate how to implement a very basic and rudimentary solution to CDC in Hadoop using MySQL, Sqoop, Spark, and Hive. It includes basic PySpark code to get you started with using Spark Data Frames. In a real world example you would include audit tables to store information for each run.
In the past, I have worked at a company in which CDC on Hadoop was a big challenge for us. Initially, we were using Hive to merge source changes (Sqoop extracts) with the existing Dimension tables and then building new tables. This worked well until our data volumes starting getting in the hundreds of millions of records. At those larger data volumes, not only did the CDC take longer, but memory and CPU consumption also increased causing resource contention on the Hadoop cluster. The solution was to use HBase and the HBase Bulk Loader to load the changes from the source.
The Hive update functionality was not a solution for our use case.
If you have small data volumes in the millions of records to even tens of millions of records, then Spark could suffice as a solution. The test I did below was performed on a Dimension table containing 5 million existing customer records and merging 1 million changed customer records + 4 new customer records. It took about 1 minute for the Spark code to complete. The cluster was an 8 node AWS cluster (M4 large instances) running HDP 2.4.
The solution below shows how to perform a Type 1 Slowly Changing Dimension (SCD). Type 1 SCD is a Data Warehouse term used to describe a type of CDC in which we overwrite the current value in the Dimension table with the new value from our source.
Again, this is a very basic way to perform CDC and was put together for demo purposes to show how easy it is to use Spark with Parquet files and join with existing Hive tables.
The CUSTOMER Hive Dimension table needs to capture changes from the source MySQL database. It's currently at 5 million records. 1 Million customer records in the source has changes and 4 new customer records were added to the source.
Run Sqoop with the incremental option to get new changes from the source MySQL database and import this into HDFS as a Parquet file
The source MySQL database has the column modified_date. For each run we capture the maximum modified_date so that on the next run we get all records greater (>) than this date. Sqoop will do this automatically with the incremental option. You just specify the column name and give a value.
sqoop import --connect jdbc:mysql://ip-172-31-2-69.us-west-2.compute.internal:3306/mysql --username root --password password -table customer -m4 --target-dir /landing/staging_customer_update_spark --incremental lastmodified --check-column modified_date --last-value "2016-05-23 00:00:00" --as-parquetfile
Merge the data from the Sqoop extract with the existing Hive CUSTOMER Dimension table. Read the Parquet file extract into a Spark DataFrame and lookup against the Hive table to create a new table. Go to end of article to view the PySpark code with enough comments to explain what the code is doing. This is basic code to demonstrate how easily Spark integrates with Parquet to easily infer a schema and then perform SQL operations on the data.
The count of records in the CUSTOMER Dimension table before capturing new updates:
Hive query to compare counts of old table with new table:
Sample 5 records from the new table and compare with old table. Notice the changes for first_name, last_name, and modified_date:
New data:
What these records looked like before the changes:
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import os from pyspark.sql import * from pyspark import SparkConf, SparkContext, SQLContext from pyspark.sql import HiveContext from pyspark.sql.types import * ## write to snappy compressed output conf = (SparkConf() .setAppName("spark_cdc") .set("spark.dynamicAllocation.enabled", "true") .set("spark.shuffle.service.enabled", "true") .set("spark.rdd.compress", "true")) sc = SparkContext(conf = conf) sqlContext = HiveContext(sc) ## read parquet file generated by the Sqoop incremental extract from MySQL and imported into HDFS path = "hdfs://ip-172-31-2-63.us-west-2.compute.internal:8020/landing/staging_customer_update_spark/*parquet*" parquetFile = sqlContext.read.parquet(path) parquetFile.registerTempTable("customer_extract"); sql = "DROP TABLE IF EXISTS customer_update_spark" sqlContext.sql(sql) sql = """ CREATE TABLE customer_update_spark ( cust_no int ,birth_date date ,first_name string ,last_name string ,gender string ,join_date date ,created_date timestamp ,modified_date timestamp ) STORED AS PARQUET """ sqlContext.sql(sql) ## get those records that did not change ## these are those records from the existing Dimension table that are not in the Sqoop extract sql = """ INSERT INTO TABLE customer_update_spark SELECT a.cust_no, a.birth_date, a.first_name, a.last_name, a.gender, a.join_date, a.created_date, a.modified_date FROM customer a LEFT OUTER JOIN customer_extract b ON a.cust_no = b.cust_no WHERE b.cust_no IS NULL """ sqlContext.sql(sql) ## get the changed records from the Parquet extract generated from Sqoop ## the dates in the Parquet file will be in epoch time with milliseconds ## this will be a 13 digit number ## we don't need milliseconds so only get first 10 digits and not all 13 ## for birth_date and join date convert to date in format YYYY-MM-DD ## for created_date and modified date convert to format YYYY-MM-DD HH:MI:SS sql = """ INSERT INTO customer_update_spark SELECT a.cust_no, TO_DATE(FROM_UNIXTIME(CAST(SUBSTR(a.created_date, 1,10) AS INT))) AS birth_date, a.first_name, a.last_name, a.gender, TO_DATE(FROM_UNIXTIME(CAST(SUBSTR(a.join_date, 1,10) AS INT))) AS join_date, FROM_UNIXTIME(CAST(SUBSTR(a.created_date, 1,10) AS INT)) AS created_date, FROM_UNIXTIME(CAST(SUBSTR(a.modified_date, 1,10) AS INT)) AS modified_date FROM customer_extract a """ sqlContext.sql(sql)
Created on 05-26-2016 07:55 PM
In my previous project, I had to solve a very similar problem with data being ingested from the GoldenGate interface. I see you are doing several hops before data gets persisted into Hive/HDFS. We had to write several merges before we could put the data into the final state. May be Apache NIFI or some other technology can solve this now in much easier way!!..
Created on 01-20-2017 11:16 AM
How you are capturing --last-value "2016-05-23 00:00:00". Is it a hard coded value or you are capturing from database ?
Created on 07-11-2018 09:57 AM
will this code capture records which are deleted from source ?
Created on 12-12-2018 04:29 PM
Its good approach but the only point which I could find as disadvantage is multiple hops to achieve the desired result. Instead of performing joins we can apply windowing function to achieve the same in a single hop assuming you unique value column and last modified date in your scenario.