Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to apply wait processor for capture complete records before merger them

avatar
Contributor

I have a scenario where am triggering API in batches (1000 records in each). from the first API I get the value of total records in attribute list X-Total-Count, i wants to merge them all. Please let me know how i can leverage the wait processor in it to wait until we get the whole data till value of X-Total-Count.

 

samrathal_0-1671631560657.png

Wait Processor properties, i have configured. 

3 REPLIES 3

avatar
Super Mentor

@samrathal 

The "Wait" processor works in conjunction with the "Notify" processor in NiFi.  See below example use case:
https://pierrevillard.com/2018/06/27/nifi-workflow-monitoring-wait-notify-pattern-with-split-and-mer...

And simply waiting until you have received all 1000 record record batches will not ensure a downstream MergeContent or MergeRecord processor will merge them all together. 

1. Is this a one time execution flow?
2. if not, how do you differentiate between different complete batches (when does new one merge bundle end and another begin?)?
3. Are all 1000 records from each rest-api call going into a single NiFi FlowFile or 1 FlowFile per record?
4. Is there some correlation identifier as a rest of rest-api call that identifies all 1000 Record batch pulls as part of same complete bundle?

The details of yoru use case would make it easier for the community to provide suggestions.

Assuming You have some Correlation Attribute and you know that max number of records would never exceed some upper limit, you may be able to simply use a well configured MergeRecord processor using min records set higher then you would ever expect, a correlation attribute, and a max bin age (forced bin to merge after x amount of time even if min has not been satisfied) to accomplish the merging of all your records.  But keep in mind the answers to questions asked play a role in whether this is possible or needs some additional consideration put in place.

 

If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.

Thank you,

Matt

avatar
Contributor

@MattWho Thanks for the response, Below is the inline comments and my scenario.

 

1. Is this a one time execution flow?

-Not


2. if not, how do you differentiate between different complete batches (when does new one merge bundle end and another begin?)?

- I'm calling the same rest-api with the record limit of 1000, and let suppose my attribute "X-Total-Count" is 5600 (record), then am going to call the rest-api 6 times. (1000*6) to capture the complete record. so till reach to 5600 records i want "Wait" processor before merge it all.


3. Are all 1000 records from each rest-api call going into a single NiFi FlowFile or 1 FlowFile per record?

-One FlowFile per record, as after "invokeHTTP" am using "splitjson".

 

4. Is there some correlation identifier as a rest of rest-api call that identifies all 1000 Record batch pulls as part of same complete bundle? 

-i'm using offset for this.

 

What I have :

rest-api with limit set to 1000, total number of records is not always same, capturing from "X-Total-Count". 

What am looking for :

Before merge the whole data, i want "Wait" processor for waiting the completion of API (api could be run 2 times or 15times) after getting the whole data that "FlowFile" (fragment.count) is equals to "X-Total-Count" i.e ${fragment.count:equals(${X-Total-Count})}. then it notify to pass all data "MergerRecord" and a single FlowFile would generates.

 

avatar
Super Mentor

@samrathal 
1. What is the purpose of the SplitJson in your dataflow?
2. If you have 1 FlowFile with 1000 records in it, why use SplitJson to split that in to 1000 FlowFiles having 1 record each?  Why not just merge the larger FlowFiles with multiple records in it?  Or am i missing part of the use case here?

--- Can you share a template of flow definition of yoru dataflow?
1. It is not clear to me how you get "X-Total-Count" and how you are adding this FlowFile attribute to every FlowFile.
2. You have configured the "Release Signal Identifier" with a boolean NiFi Expression Language (NEL) that using your example will return "false" until "fragment.count" FlowFile attribute value equals the FlowFile attribute "X-Total-Count" value. 

        2a. I assume you are writing "X-Total-Count" to every FlowFile coming out of the SplitJson?  How are incrementing the "fragment.count" across all FlowFile in the complete 5600 record batch.  Each FlowFile that splits into 1000 FlowFiles via splitJson will have fragment.count set to 1 - 1000.  So fragment.count would never reach 5600 unless you are handling this count somewhere else in your dataflow.
        2b. If a FlowFile where value from "fragment.count" actually equals value from "X-Total-Count" attribute, your "Release Signal Identifier" will resolve to "true".  The ""Release Signal Identifier" value (true or false) in your configuration is looked up in the configured "distributed map cache server.  So where in your dataflow to you write the release signal to the distributed map cache? (usually handled by a notify processor)

I am in no way implying that what you are trying to accomplish can't be done.  However, coming up with an end-to-end workable solution requires knowing all the steps in the use case along the way.  

I would recommend going through the example Wait/Notify linked in my original response to get a better understanding of how wait and notify processors work together.  Then maybe you can makes some changes to your existing dataflow implementation.  With more use case details (detailed process steps) I could suggest further changes if needed.

I really hope this helps you get some traction on your use case here.

If you have a contract with Cloudera, you can reach out to your  account owner who could help arrange for professional services that can work with your to solution your use cases in to workable NiFi dataflows.

If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.

Thank you,

Matt