Support Questions

Find answers, ask questions, and share your expertise

How to let Airflow know that a NiFi pipeline involving multiple flows is complete

avatar
Contributor
Hi!

I have created a DAG file to control few scripts and an Apache Ni-Fi data pipeline with Apache Airflow.

The data pipeline starts with a GenerateTableFetch processor which is followed by a ExecuteSQL and few more processors which have the scope of performing certain data transformations. Then, I use a PutDatabaseRecord processor to load the data into a database.

Below is the Ni-FI flow.

nifi_flow.png

Airflow tells Ni-FI to start the pipeline by setting the GenerateTableFetch processor into RUN_ONCE state by using a similar approach to the one nicely described in this post:

https://towardsdatascience.com/interconnecting-airflow-with-a-nifi-etl-pipeline-8abea0667b8a

The same article proposes a method to let Airflow know that the Ni-Fi pipeline is over. That method is based on a UpdateAttribute processor which sets the variable "last_tms" to the current time every time it runs. This processor gets input from the success relationship of the previous processor (PutDatabaseRecord).

The problem with that approach is that my pipeline is meant to transfer a huge amount of data and uses multiple flows. So, waiting that the value of "last_tms" changes is not a solution since that will happen many times during the life of the pipeline. Waiting that this value remains the same for some time would require setting a pause timer and this would require knowledge of the maximum possible interval between the end of two flows in the pipeline. This last approach is not very practical since it has the risk of waiting for too long or of continuing with the DAG tasks before the pipeline is over.

I wonder if it would be possible to automatize the method I have been using manually by looking Ni-Fi GUI: checking that there are no more active threads and no more queued data in the Ni-Fi pipeline. How I can retrieve this information from Ni-Fi in Airflow?

Any other suggestion to solve the issue would be highly appreciated.

I run Apache Airflow version 2.6.2 and Apache Ni-Fi 1.23.2 on Java 11.0.22. Both machines run Linux Ubuntu 22.04.3.

Thanks,

Bernardo

0 REPLIES 0