Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Serial Execution of flows

avatar
Expert Contributor

Hello,

"I have a QueryDatabaseTable processor where I've written custom SQL code and max value property ti pick only latest data from source. Suppose I'm creating the Dim1 flow. I want to ensure that once Dim1 is finished, Dim2 should start. However, QueryDatabaseTable does not accept input. How can I enable this solution?

saquibsk_1-1711368052227.png

 

 

4 REPLIES 4

avatar
Super Mentor

@saquibsk 

A couple thoughts come to mind here...

  1. Have you looked at maybe using a GenerateTableFetch processor in "Dim 2" which can be triggered by an incoming FlowFile.  This processor will take an optional inbound connection as a trigger.
  2. Other option might be to use an invokeHTTP processor after your PutDatabaseRecord processor to start the the "Dim 2" QueryDatabaseTable processor via the NiFi rest-api (REST API).  You could then do similar after DiM 2 QueryDatabaseTable processor to stop the processor again.

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Expert Contributor

Hi @MattWho,

GenerateTableFetch is a good option, but the issue is that it doesn't support custom queries

I believe additional settings are required for calling the next processor via REST API, correct? Is there an alternative method to achieve this?

For example, in a finance data mart, there will be 15-20 groups of flows will be executing after each other. Maintaining or calling them via API would entail additional work.

avatar
Super Mentor

@saquibsk 

Additional settings?

With a Secured NiFi (which you should always be using) there is authentication and authorization involved with any rest-api request.  The simplest approach is to generate a clientAuth certificate that is trusted via the truststore your secured NiFi is configured to use in nifi.properties file.   Then that certificate is added to a keystore.  The invokeHTTP processor can be configured to use a StandardRestrictedSSLContextService that you configure with the keystore you created and the truststore that NiFi already uses that can trust that certificate.

On NiFi side, you would need to add that client as a user entity so you can assign authorization policies to.  You can then authorize that client/user identity to the policies needed to start and stop specific processor components.  That policy would be the "operate the component" policy that you can set just on the QueryDatabaseTable processor or any other specific processor you want to automate.
component-level-access-policies

Yes, there are some initial steps to setup the keystore and truststores needed, but then those can be used over and over for all automation within NiFi you want to achieve.

NiFi processors execute based on the individual processor's configured scheduling. There is no other option to stop or start individual processors except manually through the UI by an authorized user of via the rest api.

NiFi was designed with an always running type architecture in mind.  Stepping our of that architecture would require extra steps or redesigning your dataflows to operate within that architecture style.

If your executions always happen at set times you could use cron scheduling, but that is not going to be an optimal design for performance.

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Expert Contributor

 I have try something like below which worked.

Once Dim1 is completed 

Dim2:-

1: Attribute to capture StartDate - UpdateAttribute

2: Create Log table in database which will maintain the LastRunTime

3: Custom SQL where updateddate>=$LastRunTime using   ExecuteSQL

4: Insert the records in database

5: Update the log file with StartTime attribute from step one1