Created 12-23-2024 07:15 AM
I have a processor group PG with below processors for example
A -> B -> C
D-> E->F
Processor C (Execute Stream Command) runs a Python code using multi threads - generates files
Processor D ( List File) will list the files generated by above processor C and transforms and processes to F (PutElasticSearchJSON)
Have a requirement to run this processor group for one date at a time, start execution for next date only after capturing the status of previous date execution status (success/failure)
I am ok using Python/Shell Script. Guide me how can this be achieved.
Created 12-24-2024 07:02 AM
@BK84
I would avoid if possible designing dataflows that rely on making rest-api calls to control the processing of data. Any issue that may prevent the success of the rest-api call would have negative impacts on yoru data processing.
Based on what you have shared, i'd suggest having your A->B->C dataflow directly connect to your D->E->F dataflow. Since your Python script (C) is responsible for creating the files in the directory from which ListFile (D) checks, why not have your python Script output a FlowFile that contains a list of filenames that it created. A splitContent processor could be used to split that into individual FlowFiles that can be passed directly to a FetchFile so that can be consumed (no need for listFile anymore) for writing to the ElasticSearch.
Then lets consider the error handling and how to trigger next run without rest-api.
SplitContent->FetchFile->PutElasticSearchJson should be placed in a child process group.
That process group should be configured to use FlowFile Concurrency (Single FlowFile per Node) and an Outbound Policy (Batch Output). This means that only 1 FlowFile (FlowFile produced by Python Script that contains list of all files to be fetched) will be allowed to enter this child process group at a time. PutElasticSearchJson has relationships for handling retry, success, failure, and errors. These can be used to handle success as well as report (possibly using PutEmail processor) when processing has issues. On the Success relationship path you could use ModifyBytes to zero out the content and then MergeContent to merge the split FlowFiles back into one FlowFile using "Defragment" strategy. Add a max bin age to force failure of a bin after x amount of time if it does not have all its fragments. SplitContent creates all the required FlowFile attributes needed to support the defragment strategy. Assuming all Files were "success" from PutElasticSearchJson processor, the single defragmented File will be output which you send to an output port in that child process group. Once all FlowFiles in the child process are queued at output ports they will allowed to exit the child process group. Assuming a mergeContent "success". you can use this output FlowFile to trigger the next run (hopefully without using rest-api calls). Since you ddi not share how your data gets started in A and B, I can't really suggest anything here.
Bottom line is that avoiding using rest-api calls to control your dataflows leads to faster more efficient processing of your data. Allow your dataflow to handle the error handling,
But if you do choose to use rest-api calls, the best way to figure them out is to open developer tools in your browser and then manually perform the interactions needed via the NiFi UI. Through the developer tools you can capture (copy as CURL) the call being made in response to your UI action. Now you are likely using username and password to access your NiFi, but this adds more work to do so through NiFi dataflows (for one thing your username and password would be in plain text in the necessary dataflow to get your user token). So you will want to setup a Keystore and Truststore so authentication is handle via a mutualTLS handshake thus avoiding exposed passwords and need to get a user token. The InvokeHTTP processor can be used to make the rest-api calls and can be configured with and SSLContextService.
This is a lot of high level information, but hope it has set you on the path to success.
Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt