Created on 07-11-2022 04:13 PM - edited 06-26-2023 05:36 AM
This article focuses on Cloudera’s new adapter dbt-spark-livy that enables users to use dbt on top of Apache Spark by connecting to it using Apache Livy. Livy is a REST web service for managing long-running Spark Contexts and submitting Spark jobs. Instead of running the Spark Contexts in the Server itself, Livy manages Contexts running on the cluster managed by a Resource Manager like YARN.
Cloudera Data Platform provides Spark with Livy in the Datahub form factor. Links to a dbt example project that can be used to bootstrap a user’s dbt journey can be found below.
The adapter has been tested on the following version:
python: 3.9.13
Cloudera Manager CM VERSION 7.6.2 RUNTIME VERSION 7.2.15
CDP Datahub with the template - 7.2.14 - Data Engineering: Apache Spark2/Spark3
dbt-core: 1.3.0
git clone https://github.com/cloudera/dbt-spark-livy-example/
pip install dbt-core
pip install dbt-spark-livy
For our development and testing, we used Spark included in Cloudera Runtime version 7. x.x with a data hub cluster. The cluster includes Spark, Livy, Hive, Zeppelin, and Oozie, along with supporting services (HDFS, YARN, and Zookeeper). More info on data hub clusters and templates at Cloudera data hub. You need a data cluster similar to this to be able to test the adapter. Also, note that because of using the datahub cluster there is a Hive Metastore Service (HMS) running in the cluster. Spark interacts with the Hive metastore which is already configured during the creation of the datahub. All datahub clusters are attached to a data lake that runs within an environment (in our case AWS) and provides security and governance for the environment's clusters. A Data Lake storage is created when a Data Lake is first instantiated for an environment. We have AWS S3 as our storage that hive metastore interacts with and uses.
To work with Datahub Spark in CDP we need a machine user to run the queries.
For this demo, we will use a Machine User, created inside CDP as the user running queries. Inside CDP > User Management, add a new Machine User and set the workload password.
The steps for this are documented here:
With the machine user created and the workload password set, take a note of the Workload User Name & Workload Password. Notice in the below screenshot, that for a Machine User called ‘cia_test_user’ the workload username is ‘srv_cia_test_user’.
Note: Keep the Workload User Name & Workload Password details handy for later.
Note: The steps are similar if you create a data hub providing Apache Spark(spark v2) as the service instead of spark3 (spark v3).
With the data hub cluster in running state, you can view the details of the Data Hub by clicking on it. The livy server UI and other relevant addresses are accessible from the same page.
Go to Endpoints and note down the Livy Server 3/Livy server endpoint depending on your version of the spark that you used to create the datahub cluster. This is the endpoint dbt will be connecting to
dbt needs a profile to connect to your data warehouse. The profile lives in a .dbt directory in your home directory and is called profiles.yml.
On Linux, this would look like ~/.dbt/profiles.yml. If you haven't used dbt before, create the directory with the command:
mkdir ~/.dbt
and create the profiles.yml file with your favorite text editor.
You can learn more about the dbt profile from the dbt docs here
Use the following template for the contents of the file:
dbt_spark_livy_demo:
outputs:
dev:
type: spark_livy
method: livy
schema: <schemaname>
dbname: <dbname>
host: <hostname>
threads: 2
user: <username>
password: <password>
target: dev
First, add your Workload User Name and Workload Password to the “user” and “password” fields.
Next, add the “host” field in the template we noted earlier from CDP UI (step 5 of Create Cloudera Data HUB Spark account)
The sample completed profile looks like this:
dbt_spark_livy_demo:
outputs:
dev:
type: spark_livy
method: livy
schema: dbt_spark_livy_demo
dbname: dbt_spark_livy_demo
host: https://dbt-spark-2-gateway.ciadev.cna2-sx9y.cloudera.site/dbt-spark-2/cdp-proxy-api/livy/
threads: 2
user: srv_cia_test_user
password: Password123!
target: dev
Note: Because spark uses hive metastore in the background remember to use proper syntax for dbname and schema name. Valid names should only include letters, numbers, and underscores {a-zA-z_0-9). No dashes or hyphens are allowed.
To ensure we’ve configured our profile correctly, run a connection test. In the project directory, for us dbt_spark_livy_demo run the following command:
dbt debug
In the output of this command, you should see the following: All checks passed similar to this.
In the example repo, we have a sample dbt project called ‘dbt_spark_demo’.
Inside this demo project, we can issue dbt commands to run parts of the project. The demo project contains examples of the following operations and commands:
More commands and detailed descriptions of each command can be found here
Move to the util/data_gen folder inside dbt-spark-example and run the following command:
python generate_data.py --days 2 --start-date 2022-01-01
This generates 2 days of fake data for the dates 01/01/2022 and 02/01/2022. You can modify the parameters like days and start-date to generate more fake data.
Generated two files raw_covid__cases.csv and raw_covid__vaccines.csv can be found in the util/data_gen/data/ folder. Copy these two files to the seeds folder inside the dbt_spark_livy_demo dbt project directory.
dbt seed
Our Seeds are configured with a couple of tests. Users can read more about it here
We also have a custom test created in dbt_spark_livy_demo/tests/generic/test_length.sql
Description: This test is used to check the character length of a column. Our reference data has columns that include ISO Alpha2 and Alpha3 country codes - we know that these columns should always be 2 or 3 columns respectively. To ensure that our reference data is high quality, we can use dbt to test these assumptions and report the results.
We expect that Alpha2 and Alpha3 columns are the correct lengths and that no fields should be null.
dbt test
We have 3 sets of models in this demo project.
Firstly, we have raw. Our raw models make use of Sources. This is data that already exists in our database that dbt needs to refer to. This is the fake data we loaded earlier.
Our raw models are defined in models/raw/covid
Next, we have staging. These are Models. Our staging models use the source() method to refer to the Sources we defined in our raw models. The staging models are intermediate views created over our raw data to handle some basic type conversion. These are materialized as views, and we don't expect our end users to query the staging models.
Our staging models are defined in models/staging/covid
Lastly, we have mart. These are Models. Our mart models use the ref() method to refer to the staging models and reference seeds we created using dbt. We use the staging views to handle most of the logic for type casting, but we do some renaming here to make our models easier for users to understand. These models are materialized as tables, giving a greater performance for user queries. We can use incremental models to make building the model more performant.
Our mart models are defined in models/mart/covid
dbt run
User should now have the following databases & tables:
In the raw, staging, and mart tables, you should see 2 days' worth of data for the dates 01/01/2022 and 02/01/2022. You can generate more fake data and play around.
dbt docs generate
dbt docs serve –port 8001The dbt docs serve command is only intended for local/development hosting of the documentation site. You can view it locally through http://localhost:8001/ or the port number you used to create the dbt docs serve.