Created on 10-25-2023 08:27 PM - edited on 10-30-2023 12:57 AM by VidyaSargur
Objective
Great Expectations is a Python-based open-source library for validating, documenting, and profiling your data. It helps you maintain data quality and improve communication about data between teams. Software developers have long known that automated testing is essential for managing complex codebases. Great Expectations brings the same discipline, confidence, and acceleration to data science and data engineering teams. (Source: Great Expectations)
CDP Data Engineering is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
Data Engineering is fully integrated with the Cloudera Data Platform, enabling end-to-end visibility and security with SDX as well as seamless integrations with CDP services such as data warehouses and machine learning. Data engineering on CDP powers consistent, repeatable, and automated data engineering workflows on a hybrid cloud platform anywhere. (Source: Streamline and operationalize data pipelines securely at any scale.)
Spark Data Engineers use Great Expectations in CDE to enforce data quality standards on enterprise datasets and data engineering pipelines at massive scale. In the rest of this tutorial, we will walk through a basic workflow with Great Expectations and Spark.
All code, dependencies, and CLI commands are available for reuse in this git repository.
Requirements
The following are required to reproduce the demo in your CDE virtual cluster:
Custom Docker Runtime as CDE Resource
By default, CDE workloads rely on CDE Resources to mount dependencies into a job container. CDE resources can be of three types:"files,, Python environment, and custom Docker untime.
In our case, we will create a custom Docker runtime to ensure Great Expectations and all required packages are correctly installed. Please open the Dockerfile located in the CDE_Runtime folder to explore more details.
The below steps have already been executed, and the image is available for reuse in my public Docker registry. If you want, create your own image by personalizing the value in the --name parameter to reflect your username and replacing the value in the --docker-username parameter with your DockerHub username.
docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile
docker run it pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile /bin/bash
docker push pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality
cde credential create --name docker-creds-pauldefusco --type docker-basic --docker-server hub.docker.com --docker-username pauldefusco
cde resource create --name dex-spark-runtime-great-expectations-data-quality --image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality --image-engine spark3 --type custom-runtime-image
CDE FIles Resources for Job Code
Next, create another CDE Files Resource to store your code and dependencies in your virtual cluster for reuse.
Before running the below command, open properties.conf and replace your Cloud Default Storage location and CDP Workload Username. If you need help finding these, please contact your CDP administrator. Then upload scripts and dependencies.
cde resource create --name job_code_data_quality
cde resource upload --name job_code_data_quality --local-path code/airflow.py --local-path code/batch_load.py --local-path code/great_expectations.py --local-path code/parameters.conf --local-path code/utils.py
Finally, we will create two CDE Spark jobs for the data quality pipeline and one CDE Airflow job to orchestrate it. Before doing so, open code/great_expectations.py and familiarize yourself with the code. Here are some of the most important points about the job:
FakerTextUS = FakerTextFactory(locale=['en_US'], providers=[bank])
partition parameters, etc.
self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested))
.withColumn("name", percentNulls=0.1, text=FakerTextUS("name") )
.withColumn("address", text=FakerTextUS("address" ))
.withColumn("email", text=FakerTextUS("ascii_company_email") )
.withColumn("aba_routing", text=FakerTextUS("aba" ))
.withColumn("bank_country", text=FakerTextUS("bank_country") )
.withColumn("account_no", text=FakerTextUS("bban" ))
.withColumn("int_account_no", text=FakerTextUS("iban") )
.withColumn("swift11", text=FakerTextUS("swift11" ))
.withColumn("credit_card_number", text=FakerTextUS("credit_card_number") )
.withColumn("credit_card_provider", text=FakerTextUS("credit_card_provider") )
.withColumn("event_type", "string", values=["purchase", "cash_advance"],random=True)
.withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00, end="2020-12-31 23:59:00",interval="1 minute", random=True )
.withColumn("longitude", "float", minValue=-180, maxValue=180, random=True)
.withColumn("latitude", "float", minValue=-90, maxValue=90, random=True)
.withColumn("transaction_currency", values=["USD", "EUR", "KWD", "BHD", "GBP", "CHF", "MEX"])
.withColumn("transaction_amount", "decimal", minValue=0.01, maxValue=30000, random=True)
)
df = fakerDataspec.build()
gdf = SparkDFDataset(df)
assert gdf.expect_column_to_exist(column).success, Column {column} is not found."
# EXPECTATION ON ALL COLUMNS
Ensure the existence of all mandatory columns.
def run_existance_expactation(gdf, MANDATORY_COLUMNS):
for the column in MANDATORY_COLUMNS:
try:
assert gdf.expect_column_to_exist(column).success, Column {column} is not found."
print(f"Column {column} is found")
except Exception as e:
print(e)
# EXPECTATION ON NUMERIC COLUMN
# Ensure minimum longitude is between -180 and 180
def run_longitude_min_expectation(gdf):
try:
test_result = gdf.expect_column_min_to_be_between(column="longitude", min_value=-180, max_value=180).success, f"Min for column longitude is not within expected range\n"
assert test_result.success, f"Min for column longitude is within expected range\n"
except Exception as e:
print(e)
# EXPECTATION ON STRING COLUMNS
Use REGEX to ensure email is in correct format
def run_email_match_regex_expectation(gdf):
try:
test_result = gdf.expect_column_values_to_match_regex(column="email", regex="^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$").success, f"Values for column Email are not within REGEX pattern."
assert test_result.success, f"Values for column Email are within REGEX Pattern\n"
except Exception as e:
print(e)
Please review the ge_data_quality.py script to find more examples.
Again, the idea is to organize tests in a clear, reliable, and reusable manner. When combined with the modularity of the CDE Job and Dependency construct, using Great Expectations and Spark allows you to structure your data pipelines under clear data quality rules.
Finally, the Airflow Job is scheduled to run every five minutes by default. Therefore, there is no need to manually run any of the above jobs. Run the below commands to create the jobs. Then open the CDE Job Runs page to validate the pipeline.
cde job create --name batch_load --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/batch_load.py
cde job create --name data_quality --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/great_expectations.py
cde job create --name data_quality_orchestration --type airflow --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --dag-file airflow.py
Summary
With CDE Runtimes, you can choose any data quality package of your choice. This article in particular showcased a simple data quality pipeline with Great Expectations, a leading open source package for testing and validating data at scale. You can easily leverage GE at scale with Spark in Cloudera Data Engineering in order to complement Great Expectations' reusable declarative expectations with advanced Spark and Airflow Job observability, monitoring, and development capabilities provided by CDE.