Created on 06-27-202410:32 PM - edited 06-27-202410:34 PM
Purpose
Building Custom Operators helps Data engineers abstract and reuse code across workloads. A typical use case could be that some custom logic wrapped around Airflow tasks (e.g. config setups) needs to be executed before business functionality is run in a workload. In this example, we will create a custom arithmetic operator that will do some arithmetic operations as a part of our pipeline
Instructions
There are 2 tasks in this example, in part 1 we will understand how to create custom operators and upload them to a PyPI mirror. In part 2 we will use these operators in CDE.
Create and use API tokens to authenticate and upload your package. Refer to pypi.org documentation on using API tokenshere
Clone / Download the source code from the link here
Part 1: Creating a Python package with a new Custom Airflow Operator
Clone this repository in your local folder
Change the current directory to the folder cde_custom_airflowoperator:
cd your-path/cde_custom_airflowoperator
Next, we need to create a custom Python library with our operator. Create a Python package out of our custom DAG. Review the Airflow DAG in src/arithmetic_operator.py. This DAG extends the Airflow baseoperator to create a new custom operator called ArithmeticOperator. The operator performs arithmetic operations on 2 numbers based on the type of operation specified in the parameter.
Finally, let us create our Python package and deploy it to a test Pypi mirror. The easiest way to do so is to follow step-by-step instructions in the link provided by python.org here. Do not forget to create a testpypi account and authentication with API tokens as mentioned in the pre-requisites.
Once this operator is uploaded and the instructions mentioned earlier are followed, you should be able to see the package such as below:
Part 2: Using our Custom Airflow Operator in an Airflow Job
CDE SETUP:
First, set up our airflow environment in CDE to fetch the custom operator build file. Select the Airflow configuration tab by first clicking on the cluster details as shown below
Next, use the airflow configuration tab shown below and set up the CDE Airflow environment with your custom operator as follows:
change the Pypi repository URL tohttps://test.pypi.org( or any other pypi mirror repo that you have used for uploading your python package). if you have used testpypi for your custom airflow operator keep other fields blank and click on validate configutations
now you need to add a single requirements.txt in step 2 i.e. the Build stage. Modify the requirements.txt in the CDE_setup folder to use the package name that you have used for your custom operator
you should see the package getting built as shown below.
once succesfully built you should be able to see the package installed as shown below. Your version number may vary based on what you have used during package build
Finally click on the Activate button to activate the package in the airflow experience.
Using Custom OperatorsNow let us create a new CDE Job and run it. Let us use the CDE User Interface to create a new airflow job which uses this operator. Use the dag file Dag1.py in the folder CDE_Setup to upload for this job. Click on create and run to execute the job
Let us validate by checking the Airflow UI within CDE. As shown below the job has executed succesfully
Finally, let us open one of the logs to check the execution of the addition task of the job
Summary
Here is a summary of the learning goals we have reached through this example:
Building Custom Airflow Operators
Setup the airflow environment in Cloudera Data Engineering
Deployed an airflow DAG that uses our custom operator