Created on 06-27-2024 10:32 PM - edited 06-27-2024 10:34 PM
Building Custom Operators helps Data engineers abstract and reuse code across workloads. A typical use case could be that some custom logic wrapped around Airflow tasks (e.g. config setups) needs to be executed before business functionality is run in a workload. In this example, we will create a custom arithmetic operator that will do some arithmetic operations as a part of our pipeline
There are 2 tasks in this example, in part 1 we will understand how to create custom operators and upload them to a PyPI mirror. In part 2 we will use these operators in CDE.
cd your-path/cde_custom_airflowoperator
CDE SETUP:
First, set up our airflow environment in CDE to fetch the custom operator build file. Select the Airflow configuration tab by first clicking on the cluster details as shown below
Next, use the airflow configuration tab shown below and set up the CDE Airflow environment with your custom operator as follows:
change the Pypi repository URL to https://test.pypi.org ( or any other pypi mirror repo that you have used for uploading your python package). if you have used testpypi for your custom airflow operator keep other fields blank and click on validate configutations
now you need to add a single requirements.txt in step 2 i.e. the Build stage. Modify the requirements.txt in the CDE_setup folder to use the package name that you have used for your custom operator
you should see the package getting built as shown below.
once succesfully built you should be able to see the package installed as shown below. Your version number may vary based on what you have used during package build
Finally click on the Activate button to activate the package in the airflow experience.
Using Custom Operators Now let us create a new CDE Job and run it. Let us use the CDE User Interface to create a new airflow job which uses this operator. Use the dag file Dag1.py in the folder CDE_Setup to upload for this job. Click on create and run to execute the job
Let us validate by checking the Airflow UI within CDE. As shown below the job has executed succesfully
Finally, let us open one of the logs to check the execution of the addition task of the job
Here is a summary of the learning goals we have reached through this example: