Support Questions

Find answers, ask questions, and share your expertise

Apache Nifi: how to identify one activity gets completed, then next processor should get started

avatar
Explorer

Hi All,

 

I am loading the data into one table, once data  loaded, the store procedure processor should be executed automatically. which processor can be used for this scenarios. 

 

do we have any options in WAIT/Notify processor's?

6 REPLIES 6

avatar

@Manimaran,

For the future, it would really help if you could mention your NiFi Version and the Database you are using, because each version (NiFi and the DB) has different ways of working. Besides that, it would also help to know what processors you are using so that we could understand your flow and provide a personalized answer.


As for your problem, without any other information, you could use an ExecuteStreamCommand in which you define an Python/Bash/Groovy (anything you want basically) which will call your stored procedure. You link your Processor which is saving the data into your database to ExecuteStrreamCommand using the success Queue and once the data is inserted into your database, the flowfile will go into your ExecuteStreamCommand and call your script, which will execute your stored procedure.

In newer Versions of NiFi you could also try to call the procedure using PutSQL. Or you could further try ExecuteScript (have a look here: http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html).

As for the Wait/Notify processor, as far as I know, there is nothing implemented to be used directly (out of the box) and you will have to use a combination of multiple processors to achieve this. A detailed answer to this would be: https://pierrevillard.com/2018/06/27/nifi-workflow-monitoring-wait-notify-pattern-with-split-and-mer...

avatar
Explorer

Thanks for reached out me.

 

This is how my flow and i am using nifi 11.4.2 and loading the data in MYSQL one of the table... Once this table is loaded(Put database records processor) then our generate flow file should be activated. We are trying make it as one flow file, with what ever we loaded on the  per/daily load. then we are having executing groovy script to make date field definition. then we are calling Store procedure using execute SQL record processor. 

 

our condition is in store procedure, whatever loaded with current time, those records which we are pulling in next execute sql records.

avatar

sorry but I am not understanding what you are trying to do.

 

First of all, GenerateFlowFile does not accept incoming connections, meaning that you cannot use it in your flow, especially if you are dependent by some other actions. What are you trying to do with GenerateFlowFile exactly?

We are trying make it as one flow file, with what ever we loaded on the per/daily load.
So you are executing your flow once per day at a specific hour, or? How do you know what should be added in your flow file? What exactly are you trying to achieve? Are you extracting something from your database?

 

Unfortunately you have described your use case very vaguely. If you do require assistance, I strongly recommend you to provide a more detailed description of what you are doing, what you are trying to achieve, what you tried and why it failed.

If coming back to your original post, once you saved your data in your database using PutDatabaseRecord, you can further go with a success queue in your next processor and do whatever you need. In this way, once the data is saved in your database, you can call your stored procedure as expected.

avatar
Explorer

Thanks for your reply. please find the details what i am trying to achieve in the flow.

 

First of all, i am loading the source data into  table1  on daily scheduled basis. 

 

Then what am trying to do, whatever the data is loaded in Table 1.(using put data database record processor). i am doing few manipulation and loading into table 2 through store procedure.

 

High level flow:

Generate flow file--> Executegroovy(Get teh current time)-->executesqlrecord(using the store procedure-->executesqlrecord(fetch the latest records)

 

Coming back to Generate flow file. there is no incoming connection to this. Just trying generate one flow file. Whatever its been loaded on the Table 1 on daily basis(~ 1m records) to have it single flow file.

 

Then i am using one of groovy script to get the current date once this is done, we are calling store procedure with below condition

 

CALL PROC1 (${CURR_TIME})

 

In this store procedure, we are tracking Start time of execution and end time of execution and keep maintain in audit tables.

 

Then we are again executing below query to fetch the success records from table 2 and continue to further.

 

Select * from Table 2 where CREATED_DTTM >= (select max(START_TIME) from AUDIT WHERE PROC_NAME like 'PROC1 ' AND RUN_STATUS = 'SUCCESS')

 

Why we used max start time: Its happening on daily basis, so we are picking the records from max.

 

Currently there is connectivity between table 1 load (put database record processor)  and the table 2 loads. additionally wants to check, how can be add the dependency( Have to wait until the data has been loaded into table 1 then start the store procedure for table 2 loads) 

 

 

 

 

 

 

And more than welcome, if any alternative approach can be done for this requirement. 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

avatar

@Manimaran Wait/Notify should be able to do what you are describing in the deeper dialouge.   Based on your current flow complexity,  I would recommend that you build a new sample to learn how wait/notify behavior works in nifi.   This is very difficult setup, so having a learning flow w/o context of the existing functionality is highly optimal.   Then you can refactor your sql flow accordingly, without having to figure out how to get wait/notify working during that refactor.

avatar
Explorer

HI @steven-matison ,

I have added monitor activity processor for this scenario and working fine. The first table load success queue connected to monitor activity processor, once load completed, the inactive message connected to second table... then second table execution starts after the getting the inactive message from monitor activity processor Thanks.