Community Articles

Find and share helpful community-sourced technical articles.
avatar
Contributor

dbt-spark-cde.png

Overview:

This handbook focuses on Cloudera’s new adapter dbt-spark-cde that enables users to use dbt on top of Apache Spark by leveraging the use of CDE. Cloudera Data Engineering (CDE) is a serverless service for Cloudera Data Platform that allows you to submit Spark jobs to an auto-scaling cluster.

Links to a dbt example project that can be used to bootstrap a user’s dbt journey can be found below.

Version Notes:

The adapter has been tested on the following version:

  • python: 3.9
  • Cloudera Data Engineering release (1.15-h1)
  • dbt-core: 1.3.0

Installing dbt and requirements

Prerequisites: 

Cloning the repo:

  • Run the following command in your terminal to clone the example project repository:
    git clone https://github.com/cloudera/dbt-spark-cde-example/

Installing the packages:

  • Run the following commands in your terminal to install the required packages/dependencies using pip/pip3:
    pip install dbt-core
    pip install dbt-spark-cde

CDE and Spark

For our development and testing, we used Spark included in the Cloudera Data Engineering service. If you have not enabled the service, you can enable the service by referring to this link. https://docs.cloudera.com/data-engineering/cloud/enable-data-engineering/topics/cde-enable-data-engi...

Create Cloudera Data Engineering Cluster

Once you have an environment with CDE enabled the next step is to create a virtual cluster that can be used to run the spark jobs. You may create as many virtual clusters as you want. To create a virtual cluster you can refer to the link 

https://docs.cloudera.com/data-engineering/cloud/manage-clusters/topics/cde-create-cluster.html

Once you have the CDE environment enabled and virtual cluster setup you should see something similar to this in your CDE overview page

hajmera_0-1658338315176.png

Connecting to Cloudera Data Engineering (CDE) from the adapter.

To work with CDE we need to follow two steps. 

  1. Create a machine user to run the queries. 
  2. Cloudera Data Engineering uses JSON Web Tokens (JWT) for API authentication. To interact with a virtual cluster using the API, you must obtain an access token for that cluster.

Step 1. Creating a machine user

For this demo, we will use a Machine User, created inside CDP as the user running queries. Inside CDP > User Management, add a new Machine User and set the workload password.

The steps for this are documented here:

  1. Creating a Machine User
  2. Setting the workload password

With the user created and the workload password set, take a note of the Workload User Name & Workload Password. Notice in the below screenshot, that for a Machine User called ‘cia_test_user’ the workload username is ‘srv_cia_test_user’.

hajmera_1-1658338315218.png

 

Note: Keep the Workload User Name & Workload Password details handy for later.

Step 2. Noting down endpoints for generating auth token.

  1. Navigate to the Cloudera Data Engineering Overview page by clicking the Data Engineering tile in the Cloudera Data Platform (CDP) management console.
  2. In the Services column, select the environment containing the virtual cluster you want to interact with.
  3. In the Virtual Clusters column on the right, click the Cluster Details icon(highlighted in red) on the virtual cluster you want to interact with.
    hajmera_2-1658338315219.png
  4. Click the link next to GRAFANA CHARTS. The hostname of the URL before /grafana/d… in your browser is the base URL.
    E.g. https://service.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/ is the base URL or authend_endpoint. Note it down.
  5. Next, click the link next to the jobs API URL. This is our host endpoint. Note it down. E.g. https://x9b4ppfb.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/dex/api/v1hajmera_3-1658338315221.png

Connecting dbt to Spark-CDE

dbt needs a profile to connect to your data warehouse. The profile lives in a .dbt directory in your home directory and is called profiles.yml

On Unix-based systems, this would look like ~/.dbt/profiles.yml. If you haven't used dbt before, create the directory with the command:

 mkdir ~/.dbt

and create the profiles.yml file with your favorite text editor. You can learn more about the dbt profile from the dbt docs here.

 

Use the following template for the contents of the file:

dbt_spark_cde_demo:
  outputs:
    dev:
     type: spark_cde
     method: cde
     schema: <schemaname>
     dbname: <dbname>
     host: <hostname>
     threads: <no of threads> 
     user: <username>
     password: <password>
auth_endpoint: https://service.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/

  target: dev

NOTE: After you paste API URL in front of host, add a forward slash '/', else it will cause error.

First, add your
Workload User Name and Workload Password to the “user” and “password” fields. (refer to creating machine user)

Next, add the auth_endpoint and host. (refer to endpoints for generating auth token)

The sample completed profile looks like this:

dbt_spark_cde_demo:
  outputs:
    dev:
type: spark_cde
     method: cde
     schema: spark_cde_demo
     host: https://x9b4ppfb.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/dex/api/v1/

     user: srv_cia_test_user
     password: Password123!
     auth_endpoint: https://service.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/
  target: dev

Passing the CDE session parameters with profiles.yml

With our dbt-spark-cde 1.1.3 adapter, now user can pass the additional parameters as KEY:VALUE pair.
A sample profiles.yml with additional parameters will look like this:

dbt_spark_cde_demo:
outputs:
dev:
type: spark_cde
method: cde
schema: spark_cde_demo
host: https://x9b4ppfb.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/dex/api/v1/
user: srv_cia_test_user
password: Password123!
auth_endpoint: https://service.cde-cb6tw6r7.ciadev.cna2-sx9y.cloudera.site/
cde_session_parameters:
fs.s3.canned.acl: BucketOwnerFullControl
fs.s3a.acl.default: BucketOwnerFullControl
spark.hadoop.fs.s3a.acl.default: BucketOwnerFullControl
fs.s3a.server-side-encryption-algorithm: AES256
fs.s3.server-side-encryption-algorithm: AES256
spark.kerberos.access.hadoopFileSystems: s3a://dev-dl-services-e1-us-east-1-509747
[some-other-key]: [value]
target: dev

 

 

Passed parameters are visible in CDE jobs UI. 

dbt debug

In the output of this command, you should see the following: All checks passed similar to this. 

hajmera_4-1658338315230.png

Running the demo project

In the example repo, we have a sample dbt project called ‘dbt_spark_demo’.

Inside this demo project, we can issue dbt commands to run parts of the project. The demo project contains examples of the following operations and commands:

More commands and a detailed description of each command can be found here.

Generate fake data

Move to the util/data_gen folder inside dbt-spark-example and run the following command: 

python generate_data.py --days 2 --start-date 2022-01-01 

This generates 2 days of fake data for the dates 01/01/2022 and 02/01/2022. You can modify the parameters like days and start-date to generate more fake data.

Generated two files raw_covid__cases.csv and raw_covid__vaccines.csv  can be found in the util/data_gen/data/ folder. Copy these two files to the seeds folder inside the dbt_spark_cde_demo dbt project directory.

Loading fake data

  1. To run the seeds, move to our dbt project dbt_spark_cde_demo inside our cloned project dbt-spark-cde-example and run the following command:
    dbt seed
  2. The terminal should look like this:
    hajmera_5-1658338315217.png
  3. This will load the reference data into the warehouse.

Running tests:

Our Seeds are configured with a couple of tests. Users can read more about it here.

We also have a custom test created in dbt_spark_cde_demo/tests/generic/test_length.sql 

Description: This test is used to check the character length of a column. Our reference data has columns that include ISO Alpha2 and Alpha3 country codes - we know that these columns should always be 2 or 3 columns respectively. To ensure that our reference data is high quality, we can use dbt to test these assumptions and report the results.

We expect that Alpha2 and Alpha3 columns are the correct lengths and that no fields should be null.

  1. Run the following command to tests:
    dbt test
  2. The terminal should look something like this. Don’t be concerned about the errors:
    hajmera_1-1658434290870.png

Running models:

We have 3 sets of models in this demo project.

Firstly, we have raw. Our raw models make use of Sources. This is data that already exists in our database that dbt needs to refer to. This is the fake data we loaded earlier.

Our raw models are defined in models/raw/covid

Next, we have staging model. These are Models. Our staging models use the source() method to refer to the Sources we defined in our raw models. The staging models are intermediate views created over our raw data to handle some basic type conversion. These are materialized as views, and we don't expect our end users to query the staging models.

Our staging models are defined in models/staging/covid

Lastly, we have mart. These are Models. Our mart models use the ref() method to refer to the staging models and reference seeds we created using dbt. We use the staging views to handle most of the logic for type casting, but we do some renaming here to make our models easier for users to understand. These models are materialized as tables, as this gives greater performance for user queries. We can use incremental models to make the building of the model more performant.

Our mart models are defined in models/mart/covid

  1. Run models with the following commands
    dbt run
  2. The terminal should look like this:
    hajmera_2-1658434489659.png

Reviewing the data and models

Reviewing the database and table in Hue UI

User should now have the following databases & tables:

  • reference (database)
    • ref__population (table)
    • ref__country_codes (table)
  • dbt_spark_cde_demo_raw_covid (database)
    • raw_covid__vaccines (table)
    • raw_covid__cases (table)
  • staging_covid (database)
    • stg_covid__cases (view)
  • mart_covid (database)
    • covid_cases (table)

In the raw, staging, and mart tables, you should see 2 days' worth of data for the dates 01/01/2022 and 02/01/2022. You can generate more fake data and play around.

Generating and viewing dbt documentation:

  1. Run the following command to generate the documentation:
    dbt docs generate
  2. The terminal should look like this:
    hajmera_7-1658338315227.png
  3. Users can read more about dbt docs here
  4. Run the following command to open the documentation on the browser:
    dbt docs serve –port 8001
    The dbt docs serve command is only intended for local/development hosting of the documentation site.  You can view it locally through http://localhost:8001/  or the port number you used to create the dbt docs serve.
  5. The terminal should look like this:
    hajmera_8-1658338315247.png
    Notes: dbt's documentation website was built in a way that makes it easy to host on the web. The site itself is "static", meaning that you don't need any type of "dynamic" server to serve the docs. Sample dbt docs site can be viewed here. Some common methods for hosting the docs are:
    1. Host on S3
    2. Publish on Netlify
    3. Spin up a web server like Apache/Nginx

Debugging dbt job:

We have made code improvement to offer user friendly errors and logs.

  • Any dbt profiles errors or connection issues using dbt commands will show a user friendly messages:
    hajmera_0-1662960168628.png
  • Details for each query is now available in the logs like this sample file dbt.log:
    Screen Shot 2022-09-11 at 10.24.54 PM.png
    Such detailed log will help user to analyze the time taken by each job to execute the query with CDE.
1,570 Views
0 Kudos