Created on 07-21-2022 01:29 PM - edited 06-26-2023 05:34 AM
This handbook focuses on Cloudera’s new adapter dbt-spark-cde that enables users to use dbt on top of Apache Spark by leveraging the use of CDE. Cloudera Data Engineering (CDE) is a serverless service for Cloudera Data Platform that allows you to submit Spark jobs to an auto-scaling cluster.
Links to a dbt example project that can be used to bootstrap a user’s dbt journey can be found below.
The adapter has been tested on the following version:
git clone https://github.com/cloudera/dbt-spark-cde-example/
pip install dbt-core
pip install dbt-spark-cde
For our development and testing, we used Spark included in the Cloudera Data Engineering service. If you have not enabled the service, you can enable the service by referring to this link. https://docs.cloudera.com/data-engineering/cloud/enable-data-engineering/topics/cde-enable-data-engi...
Once you have an environment with CDE enabled the next step is to create a virtual cluster that can be used to run the spark jobs. You may create as many virtual clusters as you want. To create a virtual cluster you can refer to the link
https://docs.cloudera.com/data-engineering/cloud/manage-clusters/topics/cde-create-cluster.html
Once you have the CDE environment enabled and virtual cluster setup you should see something similar to this in your CDE overview page
To work with CDE we need to follow two steps.
For this demo, we will use a Machine User, created inside CDP as the user running queries. Inside CDP > User Management, add a new Machine User and set the workload password.
The steps for this are documented here:
With the user created and the workload password set, take a note of the Workload User Name & Workload Password. Notice in the below screenshot, that for a Machine User called ‘cia_test_user’ the workload username is ‘srv_cia_test_user’.
Note: Keep the Workload User Name & Workload Password details handy for later.
dbt needs a profile to connect to your data warehouse. The profile lives in a .dbt directory in your home directory and is called profiles.yml.
On Unix-based systems, this would look like ~/.dbt/profiles.yml. If you haven't used dbt before, create the directory with the command:
mkdir ~/.dbt
and create the profiles.yml file with your favorite text editor. You can learn more about the dbt profile from the dbt docs here.
Use the following template for the contents of the file:
dbt_spark_cde_demo:
outputs:
dev:
type: spark_cde
method: cde
schema: <schemaname>
dbname: <dbname>
host: <hostname>
threads: <no of threads>
user: <username>
password: <password>
auth_endpoint: https://service.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/
target: dev
NOTE: After you paste API URL in front of host, add a forward slash '/', else it will cause error.
First, add your Workload User Name and Workload Password to the “user” and “password” fields. (refer to creating machine user)
Next, add the auth_endpoint and host. (refer to endpoints for generating auth token)
The sample completed profile looks like this:
dbt_spark_cde_demo:
outputs:
dev:
type: spark_cde
method: cde
schema: spark_cde_demo
host: https://x9b4ppfb.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/dex/api/v1/
user: srv_cia_test_user
password: Password123!
auth_endpoint: https://service.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/
target: dev
With our dbt-spark-cde 1.1.3 adapter, now user can pass the additional parameters as KEY:VALUE pair.
A sample profiles.yml with additional parameters will look like this:
dbt_spark_cde_demo:
outputs:
dev:
type: spark_cde
method: cde
schema: spark_cde_demo
host: https://x9b4ppfb.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/dex/api/v1/
user: srv_cia_test_user
password: Password123!
auth_endpoint: https://service.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/
cde_session_parameters:
fs.s3.canned.acl: BucketOwnerFullControl
fs.s3a.acl.default: BucketOwnerFullControl
spark.hadoop.fs.s3a.acl.default: BucketOwnerFullControl
fs.s3a.server-side-encryption-algorithm: AES256
fs.s3.server-side-encryption-algorithm: AES256
spark.kerberos.access.hadoopFileSystems: s3a://dev-dl-services-e1-us-east-1-509747
[some-other-key]: [value]
target: dev
Passed parameters are visible in CDE jobs UI.
dbt debug
In the output of this command, you should see the following: All checks passed similar to this.
In the example repo, we have a sample dbt project called ‘dbt_spark_demo’.
Inside this demo project, we can issue dbt commands to run parts of the project. The demo project contains examples of the following operations and commands:
More commands and a detailed description of each command can be found here.
Move to the util/data_gen folder inside dbt-spark-example and run the following command:
python generate_data.py --days 2 --start-date 2022-01-01
This generates 2 days of fake data for the dates 01/01/2022 and 02/01/2022. You can modify the parameters like days and start-date to generate more fake data.
Generated two files raw_covid__cases.csv and raw_covid__vaccines.csv can be found in the util/data_gen/data/ folder. Copy these two files to the seeds folder inside the dbt_spark_cde_demo dbt project directory.
dbt seed
Our Seeds are configured with a couple of tests. Users can read more about it here.
We also have a custom test created in dbt_spark_cde_demo/tests/generic/test_length.sql
Description: This test is used to check the character length of a column. Our reference data has columns that include ISO Alpha2 and Alpha3 country codes - we know that these columns should always be 2 or 3 columns respectively. To ensure that our reference data is high quality, we can use dbt to test these assumptions and report the results.
We expect that Alpha2 and Alpha3 columns are the correct lengths and that no fields should be null.
dbt test
We have 3 sets of models in this demo project.
Firstly, we have raw. Our raw models make use of Sources. This is data that already exists in our database that dbt needs to refer to. This is the fake data we loaded earlier.
Our raw models are defined in models/raw/covid
Next, we have staging model. These are Models. Our staging models use the source() method to refer to the Sources we defined in our raw models. The staging models are intermediate views created over our raw data to handle some basic type conversion. These are materialized as views, and we don't expect our end users to query the staging models.
Our staging models are defined in models/staging/covid
Lastly, we have mart. These are Models. Our mart models use the ref() method to refer to the staging models and reference seeds we created using dbt. We use the staging views to handle most of the logic for type casting, but we do some renaming here to make our models easier for users to understand. These models are materialized as tables, as this gives greater performance for user queries. We can use incremental models to make the building of the model more performant.
Our mart models are defined in models/mart/covid
dbt run
User should now have the following databases & tables:
In the raw, staging, and mart tables, you should see 2 days' worth of data for the dates 01/01/2022 and 02/01/2022. You can generate more fake data and play around.
dbt docs generate
dbt docs serve –port 8001The dbt docs serve command is only intended for local/development hosting of the documentation site. You can view it locally through http://localhost:8001/ or the port number you used to create the dbt docs serve.
We have made code improvement to offer user friendly errors and logs.