Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to implement the given problem statement in NiFi

avatar
Super Collaborator

Hello all,

 

Problem statement: We have 3 layers in our project,for each layer we have  nifi flow. Currently, the layers or nifi flow for each layer is getting executed one after another (sequentially).

 

Execution flow: layer1->layer2->layer3->  so on.

 

The trigger point or input parameters to the flow is nothing but number of source files. Which will be processed further in every layer one by one.

 

I am looking for such solution ,where my next nifi flow for next layer should not get triggered until completion of previous one for previous layer and for all files.

 

And if current layer got succeeded for all the files then only next one will kick off.

 

Please suggest me your thought process on the same. I will be very appreciated on your response.

 

Thank you in advance.

 

 

2 ACCEPTED SOLUTIONS

avatar

It seems that you are looking to simulate batch processing in Nifi. In general this is not a good idea. 

Consider just processing the files as they come in.

 

If that is not possible, perhaps you want to do some scheduling based on a trigger. For this tools like Oozie are made, they can trigger spark for instance (possibly even Nifi). 

 

If that doesn't work and you want a pure nifi solution it might be possible to set up a waiting processor that eventually gives up, but this will be more of a hack than a solution. 


- Dennis Jaheruddin

If this answer helped, please mark it as 'solved' and/or if it is valuable for future readers please apply 'kudos'.

View solution in original post

avatar
Super Collaborator

I have resolved this problem statement with the below approach
1. Count the number of input_file_count (fragments.count gives me the result)

2. Prepare transnational table with the scheam (primary_key_id , processesed_file_cnt).

3. Add inital record at the beginning of the NiFi pipeline using  INSERT INTO command.   

     e.g. INSERT INTO <tableName> values(23,0)

4. Keep on update the processed_file_cnt using update query by one for each batch

5. select processed_file_cnt column and compare it with input_file_count. 

6. If matched , then kick start next flow else terminate Unmatched relationship to same RouteOnAttribute processor.

View solution in original post

6 REPLIES 6

avatar

It seems that you are looking to simulate batch processing in Nifi. In general this is not a good idea. 

Consider just processing the files as they come in.

 

If that is not possible, perhaps you want to do some scheduling based on a trigger. For this tools like Oozie are made, they can trigger spark for instance (possibly even Nifi). 

 

If that doesn't work and you want a pure nifi solution it might be possible to set up a waiting processor that eventually gives up, but this will be more of a hack than a solution. 


- Dennis Jaheruddin

If this answer helped, please mark it as 'solved' and/or if it is valuable for future readers please apply 'kudos'.

avatar
Super Collaborator

Hi @DennisJaheruddi 

Thanks for you reply.
Don't you think, if we use wait and Notify processor then pipeline will get proceeded ahead for next execution after the expiration of Expiration Duration attributes value of wait processor. The situation may occur like the execution is not completed for previous layer but NiFi flow get triggered due expiration of above attribute. It seems wrong behavior to me.

avatar
Super Guru

@Manus This is a perfect Use Case for Wait / Notify.  You just need to be aware of the pitfalls, and recognize the behavior when your wait/notify condition doesn't act and of course test that the timeout condition works as needed.    The beauty of NiFi is that you should be able to account for every possible scenario, and part of the fun in NiFi for me is being creative, being able to capture exceptions, and either further educate my existing flow to handle them, or by creating new parts of my flow specifically for the exceptions.

avatar
Super Collaborator

Hi ,

Thanks for your replay.

 

Yes ,we can use wait and notify processor. But how would i ensure that previous has been completed or failed. What is that criteria or condition?

 

Let me explain you current nifi flow,we have 3 layers and in/out for each layer always requires 4 flowfiles (number of flowfiles depend on input parameters).

How can i integrate wait/notify pattern by keeping this in mind?

 

Do you have any nifi example where it's already implemented? Could you please share nifi template xml with me.

 

 

 

avatar
Super Collaborator

I have resolved this problem statement with the below approach
1. Count the number of input_file_count (fragments.count gives me the result)

2. Prepare transnational table with the scheam (primary_key_id , processesed_file_cnt).

3. Add inital record at the beginning of the NiFi pipeline using  INSERT INTO command.   

     e.g. INSERT INTO <tableName> values(23,0)

4. Keep on update the processed_file_cnt using update query by one for each batch

5. select processed_file_cnt column and compare it with input_file_count. 

6. If matched , then kick start next flow else terminate Unmatched relationship to same RouteOnAttribute processor.

avatar
Super Collaborator

Hi all,

 

Above solution is failing at one scenario,

Scenario: if multiple flow files processed at a time and  landed in the nifi queue which is used after update query ( i.e. puthiveql which increment processed_file_cnt by one for every flow file ) processor ,then there might be chances of triggering the next flow multiple times and that is wrong.

 

Because we do select processed_file_cnt first and then doing the comparison for processed_file_cnt with input_file_cnt.