Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Rising Star

Objective

Great Expectations is a Python-based open-source library for validating, documenting, and profiling your data. It helps you maintain data quality and improve communication about data between teams. Software developers have long known that automated testing is essential for managing complex codebases. Great Expectations brings the same discipline, confidence, and acceleration to data science and data engineering teams. (Source: Great Expectations)

CDP Data Engineering is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.

Data Engineering is fully integrated with the Cloudera Data Platform, enabling end-to-end visibility and security with SDX as well as seamless integrations with CDP services such as data warehouses and machine learning. Data engineering on CDP powers consistent, repeatable, and automated data engineering workflows on a hybrid cloud platform anywhere. (Source: Streamline and operationalize data pipelines securely at any scale.)

Spark Data Engineers use Great Expectations in CDE to enforce data quality standards on enterprise datasets and data engineering pipelines at massive scale. In the rest of this tutorial, we will walk through a basic workflow with Great Expectations and Spark.

All code, dependencies, and CLI commands are available for reuse in this git repository.

Requirements

The following are required to reproduce the demo in your CDE virtual cluster:

  • CDE Service version 1.19 and above
  • A working installation of the CDE CLI Instructions to install the CLI are provided here.
  • A working installation of git on your local machine Please clone the git repository and keep in mind that all commands assume they are run in the project's main directory.

Custom Docker Runtime as CDE Resource

By default, CDE workloads rely on CDE Resources to mount dependencies into a job container. CDE resources can be of three types:"files,, Python environment, and custom Docker untime.

In our case, we will create a custom Docker runtime to ensure Great Expectations and all required packages are correctly installed. Please open the Dockerfile located in the CDE_Runtime folder to explore more details.

The below steps have already been executed, and the image is available for reuse in my public Docker registry. If you want, create your own image by personalizing the value in the --name parameter to reflect your username and replacing the value in the --docker-username parameter with your DockerHub username.

docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile

docker run it pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile /bin/bash

docker push pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality
Next, create CDE Resource Credentials and the CDE Custom Docker Runtime resource:
cde credential create --name docker-creds-pauldefusco --type docker-basic --docker-server hub.docker.com --docker-username pauldefusco

cde resource create --name dex-spark-runtime-great-expectations-data-quality --image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality --image-engine spark3 --type custom-runtime-image

CDE FIles Resources for Job Code

Next, create another CDE Files Resource to store your code and dependencies in your virtual cluster for reuse.

Before running the below command, open properties.conf and replace your Cloud Default Storage location and CDP Workload Username. If you need help finding these, please contact your CDP administrator. Then upload scripts and dependencies.

cde resource create --name job_code_data_quality

cde resource upload --name job_code_data_quality --local-path code/airflow.py --local-path code/batch_load.py --local-path code/great_expectations.py --local-path code/parameters.conf --local-path code/utils.py

Finally, we will create two CDE Spark jobs for the data quality pipeline and one CDE Airflow job to orchestrate it. Before doing so, open code/great_expectations.py and familiarize yourself with the code. Here are some of the most important points about the job:

  • We create a synthetic banking dataset with the Faker and dbldatagen libraries. This gives us 100,000 banking transactions with transaction amount, currency, account numbers, routing numbers, and much more banking data. The data columns are of different types, including numerical, timestamp, and categorical. Review the utils.py script to learn more details.
FakerTextUS = FakerTextFactory(locale=['en_US'], providers=[bank])

partition parameters, etc.
self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)

fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested))
.withColumn("name", percentNulls=0.1, text=FakerTextUS("name") )
.withColumn("address", text=FakerTextUS("address" ))
.withColumn("email", text=FakerTextUS("ascii_company_email") )
.withColumn("aba_routing", text=FakerTextUS("aba" ))
.withColumn("bank_country", text=FakerTextUS("bank_country") )
.withColumn("account_no", text=FakerTextUS("bban" ))
.withColumn("int_account_no", text=FakerTextUS("iban") )
.withColumn("swift11", text=FakerTextUS("swift11" ))
.withColumn("credit_card_number", text=FakerTextUS("credit_card_number") )
.withColumn("credit_card_provider", text=FakerTextUS("credit_card_provider") )
.withColumn("event_type", "string", values=["purchase", "cash_advance"],random=True)
.withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00, end="2020-12-31 23:59:00",interval="1 minute", random=True )
.withColumn("longitude", "float", minValue=-180, maxValue=180, random=True)
.withColumn("latitude", "float", minValue=-90, maxValue=90, random=True)
.withColumn("transaction_currency", values=["USD", "EUR", "KWD", "BHD", "GBP", "CHF", "MEX"])
.withColumn("transaction_amount", "decimal", minValue=0.01, maxValue=30000, random=True)
)
df = fakerDataspec.build()
  • In ge_data_quality.py, the "SparkDFDataset" class is imported from the "great_expectations.dataset.sparkdf_dataset" module. This allows you to wrap a Spark Dataframe and make it compatible with the Great Expectations API
gdf = SparkDFDataset(df)
  • From the GE Documentation, "Expectations are declarative statements and the interface to the Great Expectations library, supporting your validation, profiling, and translation. Expectations are implemented as classes; some are in the core library, with many others contributed by community members." In the script, we implemented a few wrapper methods for a variety of built-in expectations. For example, the "expect_column_to_exist" expectation checks for column existence.
assert gdf.expect_column_to_exist(column).success, Column {column} is not found."
  • The GE Gallery offers a constantly growing number of expectations to test for different conditions on different column value types. For example, in the script, we use expectations to check for nulls for all columns: longitude and latitude min and max for transactions; mean and standard deviation for transaction amounts; email string validity with REGEX; and finally, whether the values in the categorical Currency column match or are contained in a provided test set. Here are some more examples that can be found in the ge_data_quality.py script:
# EXPECTATION ON ALL COLUMNS

Ensure the existence of all mandatory columns.

def run_existance_expactation(gdf, MANDATORY_COLUMNS):
for the column in MANDATORY_COLUMNS:
try:
assert gdf.expect_column_to_exist(column).success, Column {column} is not found."
print(f"Column {column} is found")
except Exception as e:
print(e)
# EXPECTATION ON NUMERIC COLUMN

# Ensure minimum longitude is between -180 and 180

def run_longitude_min_expectation(gdf):
try:
test_result = gdf.expect_column_min_to_be_between(column="longitude", min_value=-180, max_value=180).success, f"Min for column longitude is not within expected range\n"
assert test_result.success, f"Min for column longitude is within expected range\n"
except Exception as e:
print(e)
# EXPECTATION ON STRING COLUMNS

Use REGEX to ensure email is in correct format

def run_email_match_regex_expectation(gdf):
try:
test_result = gdf.expect_column_values_to_match_regex(column="email", regex="^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$").success, f"Values for column Email are not within REGEX pattern."
assert test_result.success, f"Values for column Email are within REGEX Pattern\n"
except Exception as e:
print(e)

Please review the ge_data_quality.py script to find more examples.

Again, the idea is to organize tests in a clear, reliable, and reusable manner. When combined with the modularity of the CDE Job and Dependency construct, using Great Expectations and Spark allows you to structure your data pipelines under clear data quality rules.

Finally, the Airflow Job is scheduled to run every five minutes by default. Therefore, there is no need to manually run any of the above jobs. Run the below commands to create the jobs. Then open the CDE Job Runs page to validate the pipeline.

cde job create --name batch_load --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/batch_load.py

cde job create --name data_quality --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/great_expectations.py

cde job create --name data_quality_orchestration --type airflow --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --dag-file airflow.py

Summary

With CDE Runtimes, you can choose any data quality package of your choice. This article in particular showcased a simple data quality pipeline with Great Expectations, a leading open source package for testing and validating data at scale. You can easily leverage GE at scale with Spark in Cloudera Data Engineering in order to complement Great Expectations' reusable declarative expectations with advanced Spark and Airflow Job observability, monitoring, and development capabilities provided by CDE.

1,807 Views
0 Kudos