I am new to Nifi.My requirement is to trigger Nifi process group using external scheduling tool called Control M. I tried using shell script to start and.stop the process group using curl command. Process group will fetch data from text file and writes into a database but unable to determine when the process group gets completed because I could see status like Started, Running and Stopped but not Completed state. Struck with this issue and need your valuable inputs on this of how to determine all the records got inserted into database placed inside process group.
In general, Nifi is not very well suited for event-based processing (E.G. an external scheduling tool pinging nifi to start a process group run). I do not know how Control M works, but what you're describing sounds like it could be achieved with Nifi's REST API (you can directly start/stop a specific process group by its ID).
The requirement for checking if everything got inserted to your database is also quite hard to accomplish accurately. You could use the REST API once more to check your process group has no queued files (which would mean all your flowfiles successfully passed through the flow), though you'll also have to think about what should happen if writing to the DB fails. I don't believe there is any great way to check if your scheduled run 'completed', but you could definitely use some other processor to 'notify' yourself if something failed.
If this answer helped, please mark it as 'solved' and/or apply 'kudos' 🙂.
We are orchestrating NiFi flow status(completion/Failed) through Control-M. This is the step below to be followed .
1. Control-M put the files in source folder (SFTP server) and will be waiting for status through Status API (which you have to build separate REST API flow using Request processor to expose the status message).
2. NiFi List SFTP will be keep listening the file(.txt) and once new file has placed by Control-M , NiFi will process the file and NiFi processor load the content into Database , Database processor will have Success and Failure relationship . Success flow files you can capture status as success and 1 value using Update attribute processor , this values should be stored into distributed cache/other storage area using relevant processor .Same process for failure (-1) flow as well .
3. Now status message stored into distributed cache/other storage area, You can query the status from distributed cache/other storage area using Fetch processor and pass to Response processor to the waiting Control-M job (Control-M should keep polling the status until it receive response 1 and -1.
4. When Control-M finds 1 value then the flow is success and if -1 then processing has failed .
It's a bit hard to imagine your flow just from the description, but I think I understood it. What other questions do you have about it?
In my opinion it doesn't sound too great adding an attribute to every flowfile after it is written to the DB, only to then write it to a cache which control m will query (if I understood correctly).
If your only requirement is to know whether all the files were successfully written to your DB, you should simply ignore files which were successfully inserted and only apply some logic when an insert fails. Perhaps if a file fails you can write it to someplace else so you will be able to investigate why it failed (some place more persistent than a cache). If you just want to be alerted when an insert fails / want to return a response to control m, just add an invokehttp processor after the failure relationship from your putDB processor (if I correctly understood that control m expects http calls).
Because nifi is stream oriented, it's hard to tell exactly when a batch of files has finished writing to your DB unless you know exactly how many records should be written (and then counting the flowfiles passed to success is actually reasonable).
@Chakkara As far as i remember , distributed cache is not having consistency . You could use Hbase or HDFS for storing the status of success or failure of the processors for downstream application. Once you saved the Success and Failure at Hbase . You can retrieve it from the Hbase processor using the row ID.
Build a REST API NiFi flow to pull the status from Hbase
HandleHTTPRequest --> FetchHbaseRow - HandleHTTPResponse
You can call the HTTP API (Request and Response) via shell script/curl and call the script from Control-M.
@Chakkara As this is an older post, you would have a better chance of receiving a resolution by starting a new thread. This will also be an opportunity to provide details specific to your environment that could aid others in assisting you with a more accurate answer to your question. You can link this thread as a reference in your new post. Thanks!