Hello all,
Problem statement: We have 3 layers in our project,for each layer we have nifi flow. Currently, the layers or nifi flow for each layer is getting executed one after another (sequentially).
Execution flow: layer1->layer2->layer3-> so on.
The trigger point or input parameters to the flow is nothing but number of source files. Which will be processed further in every layer one by one.
I am looking for such solution ,where my next nifi flow for next layer should not get triggered until completion of previous one for previous layer and for all files.
And if current layer got succeeded for all the files then only next one will kick off.
Please suggest me your thought process on the same. I will be very appreciated on your response.
Thank you in advance.
Created 01-20-2020 03:25 PM
It seems that you are looking to simulate batch processing in Nifi. In general this is not a good idea.
Consider just processing the files as they come in.
If that is not possible, perhaps you want to do some scheduling based on a trigger. For this tools like Oozie are made, they can trigger spark for instance (possibly even Nifi).
If that doesn't work and you want a pure nifi solution it might be possible to set up a waiting processor that eventually gives up, but this will be more of a hack than a solution.
Created 01-31-2020 01:08 AM
I have resolved this problem statement with the below approach
1. Count the number of input_file_count (fragments.count gives me the result)
2. Prepare transnational table with the scheam (primary_key_id , processesed_file_cnt).
3. Add inital record at the beginning of the NiFi pipeline using INSERT INTO command.
e.g. INSERT INTO <tableName> values(23,0)
4. Keep on update the processed_file_cnt using update query by one for each batch
5. select processed_file_cnt column and compare it with input_file_count.
6. If matched , then kick start next flow else terminate Unmatched relationship to same RouteOnAttribute processor.
Created 01-20-2020 03:25 PM
It seems that you are looking to simulate batch processing in Nifi. In general this is not a good idea.
Consider just processing the files as they come in.
If that is not possible, perhaps you want to do some scheduling based on a trigger. For this tools like Oozie are made, they can trigger spark for instance (possibly even Nifi).
If that doesn't work and you want a pure nifi solution it might be possible to set up a waiting processor that eventually gives up, but this will be more of a hack than a solution.
Created 01-20-2020 08:33 PM
Hi @DennisJaheruddi
Thanks for you reply.
Don't you think, if we use wait and Notify processor then pipeline will get proceeded ahead for next execution after the expiration of Expiration Duration attributes value of wait processor. The situation may occur like the execution is not completed for previous layer but NiFi flow get triggered due expiration of above attribute. It seems wrong behavior to me.
Created 01-21-2020 06:11 AM
@Manus This is a perfect Use Case for Wait / Notify. You just need to be aware of the pitfalls, and recognize the behavior when your wait/notify condition doesn't act and of course test that the timeout condition works as needed. The beauty of NiFi is that you should be able to account for every possible scenario, and part of the fun in NiFi for me is being creative, being able to capture exceptions, and either further educate my existing flow to handle them, or by creating new parts of my flow specifically for the exceptions.
Created 01-21-2020 09:52 AM
Hi ,
Thanks for your replay.
Yes ,we can use wait and notify processor. But how would i ensure that previous has been completed or failed. What is that criteria or condition?
Let me explain you current nifi flow,we have 3 layers and in/out for each layer always requires 4 flowfiles (number of flowfiles depend on input parameters).
How can i integrate wait/notify pattern by keeping this in mind?
Do you have any nifi example where it's already implemented? Could you please share nifi template xml with me.
Created 01-31-2020 01:08 AM
I have resolved this problem statement with the below approach
1. Count the number of input_file_count (fragments.count gives me the result)
2. Prepare transnational table with the scheam (primary_key_id , processesed_file_cnt).
3. Add inital record at the beginning of the NiFi pipeline using INSERT INTO command.
e.g. INSERT INTO <tableName> values(23,0)
4. Keep on update the processed_file_cnt using update query by one for each batch
5. select processed_file_cnt column and compare it with input_file_count.
6. If matched , then kick start next flow else terminate Unmatched relationship to same RouteOnAttribute processor.
Created on 02-04-2020 09:18 AM - edited 02-04-2020 09:24 AM
Hi all,
Above solution is failing at one scenario,
Scenario: if multiple flow files processed at a time and landed in the nifi queue which is used after update query ( i.e. puthiveql which increment processed_file_cnt by one for every flow file ) processor ,then there might be chances of triggering the next flow multiple times and that is wrong.
Because we do select processed_file_cnt first and then doing the comparison for processed_file_cnt with input_file_cnt.