<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: How to apply wait processor for capture complete records before merger them in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/How-to-apply-wait-processor-for-capture-complete-records/m-p/359971#M238256</link>
    <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/35454"&gt;@MattWho&lt;/a&gt;&amp;nbsp;Thanks for the response, Below is the inline comments and my scenario.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;1. Is this a one time execution flow?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;-Not&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;SPAN&gt;2. if not, how do you differentiate between different complete batches (when does new one merge bundle end and another begin?)?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;- I'm calling the same rest-api with the record limit of 1000, and let suppose my attribute "&lt;SPAN&gt;X-Total-Count&lt;/SPAN&gt;" is 5600 (record), then am going to call the rest-api 6 times. (1000*6) to capture the complete record. so till reach to 5600 records i want "Wait" processor before merge it all.&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;SPAN&gt;3. Are all 1000 records from each rest-api call going into a single NiFi FlowFile or 1 FlowFile per record?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;-One FlowFile per record, as after "invokeHTTP" am using "splitjson".&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;4. Is there some correlation identifier as a rest of rest-api call that identifies all 1000 Record batch pulls as part of same complete bundle?&lt;/SPAN&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;-i'm using offset for this.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;What I have :&lt;/P&gt;&lt;P&gt;rest-api with limit set to 1000, total number of records is not always same, capturing from "&lt;SPAN&gt;X-Total-Count&lt;/SPAN&gt;".&amp;nbsp;&lt;/P&gt;&lt;P&gt;What am looking for :&lt;/P&gt;&lt;P&gt;Before merge the whole data, i want "Wait" processor for waiting the completion of API (api could be run 2 times or 15times) after getting the whole data that "FlowFile" (fragment.count) is equals to "X-Total-Count" i.e ${fragment.count:equals(${X-Total-Count})}. then it notify to pass all data "MergerRecord" and a single FlowFile would generates.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Wed, 21 Dec 2022 15:53:47 GMT</pubDate>
    <dc:creator>samrathal</dc:creator>
    <dc:date>2022-12-21T15:53:47Z</dc:date>
    <item>
      <title>How to apply wait processor for capture complete records before merger them</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-apply-wait-processor-for-capture-complete-records/m-p/359965#M238251</link>
      <description>&lt;P&gt;&lt;SPAN&gt;I have a scenario where am triggering API in batches (1000 records in each). from the first API I get the value of total records in attribute list&amp;nbsp;&lt;/SPAN&gt;X-Total-Count&lt;SPAN&gt;, i wants to merge them all. Please let me know how i can leverage the wait processor in it to wait until we get the whole data till value of&amp;nbsp;&lt;/SPAN&gt;X-Total-Count&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="samrathal_0-1671631560657.png" style="width: 400px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/36523iA53C1BB2501BC91A/image-size/medium?v=v2&amp;amp;px=400" role="button" title="samrathal_0-1671631560657.png" alt="samrathal_0-1671631560657.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;Wait Processor properties, i have configured.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 21 Dec 2022 14:07:07 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-apply-wait-processor-for-capture-complete-records/m-p/359965#M238251</guid>
      <dc:creator>samrathal</dc:creator>
      <dc:date>2022-12-21T14:07:07Z</dc:date>
    </item>
    <item>
      <title>Re: How to apply wait processor for capture complete records before merger them</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-apply-wait-processor-for-capture-complete-records/m-p/359967#M238253</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/99493"&gt;@samrathal&lt;/a&gt;&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;The "Wait" processor works in conjunction with the "Notify" processor in NiFi.&amp;nbsp; See below example use case:&lt;BR /&gt;&lt;A href="https://pierrevillard.com/2018/06/27/nifi-workflow-monitoring-wait-notify-pattern-with-split-and-merge/" target="_blank"&gt;https://pierrevillard.com/2018/06/27/nifi-workflow-monitoring-wait-notify-pattern-with-split-and-merge/&lt;/A&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;And simply waiting until you have received all 1000 record record batches will not ensure a downstream MergeContent or MergeRecord processor will merge them all together.&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;1. Is this a one time execution flow?&lt;BR /&gt;2. if not, how do you differentiate between different complete batches (when does new one merge bundle end and another begin?)?&lt;BR /&gt;3. Are all 1000 records from each rest-api call going into a single NiFi FlowFile or 1 FlowFile per record?&lt;BR /&gt;4. Is there some correlation identifier as a rest of rest-api call that identifies all 1000 Record batch pulls as part of same complete bundle?&lt;BR /&gt;&lt;BR /&gt;The details of yoru use case would make it easier for the community to provide suggestions.&lt;BR /&gt;&lt;BR /&gt;Assuming You have some Correlation Attribute and you know that max number of records would never exceed some upper limit, you may be able to simply use a well configured MergeRecord processor using min records set higher then you would ever expect, a correlation attribute, and a max bin age (forced bin to merge after x amount of time even if min has not been satisfied) to accomplish the merging of all your records.&amp;nbsp; But keep in mind the answers to questions asked play a role in whether this is possible or needs some additional consideration put in place.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;FONT face="batang,apple gothic"&gt;If you found that the provided solution(s) assisted you with your query, please take a moment to login and click&lt;/FONT&gt;&amp;nbsp;&lt;FONT face="arial black,avant garde" color="#FF0000"&gt;Accept as Solution&amp;nbsp;&lt;/FONT&gt;&lt;FONT face="batang,apple gothic" color="#000000"&gt;below each response that helped.&lt;BR /&gt;&lt;BR /&gt;Thank you,&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="batang,apple gothic" color="#000000"&gt;Matt&lt;/FONT&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 21 Dec 2022 14:33:54 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-apply-wait-processor-for-capture-complete-records/m-p/359967#M238253</guid>
      <dc:creator>MattWho</dc:creator>
      <dc:date>2022-12-21T14:33:54Z</dc:date>
    </item>
    <item>
      <title>Re: How to apply wait processor for capture complete records before merger them</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-apply-wait-processor-for-capture-complete-records/m-p/359971#M238256</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/35454"&gt;@MattWho&lt;/a&gt;&amp;nbsp;Thanks for the response, Below is the inline comments and my scenario.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;1. Is this a one time execution flow?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;-Not&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;SPAN&gt;2. if not, how do you differentiate between different complete batches (when does new one merge bundle end and another begin?)?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;- I'm calling the same rest-api with the record limit of 1000, and let suppose my attribute "&lt;SPAN&gt;X-Total-Count&lt;/SPAN&gt;" is 5600 (record), then am going to call the rest-api 6 times. (1000*6) to capture the complete record. so till reach to 5600 records i want "Wait" processor before merge it all.&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;SPAN&gt;3. Are all 1000 records from each rest-api call going into a single NiFi FlowFile or 1 FlowFile per record?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;-One FlowFile per record, as after "invokeHTTP" am using "splitjson".&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;4. Is there some correlation identifier as a rest of rest-api call that identifies all 1000 Record batch pulls as part of same complete bundle?&lt;/SPAN&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;-i'm using offset for this.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;What I have :&lt;/P&gt;&lt;P&gt;rest-api with limit set to 1000, total number of records is not always same, capturing from "&lt;SPAN&gt;X-Total-Count&lt;/SPAN&gt;".&amp;nbsp;&lt;/P&gt;&lt;P&gt;What am looking for :&lt;/P&gt;&lt;P&gt;Before merge the whole data, i want "Wait" processor for waiting the completion of API (api could be run 2 times or 15times) after getting the whole data that "FlowFile" (fragment.count) is equals to "X-Total-Count" i.e ${fragment.count:equals(${X-Total-Count})}. then it notify to pass all data "MergerRecord" and a single FlowFile would generates.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 21 Dec 2022 15:53:47 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-apply-wait-processor-for-capture-complete-records/m-p/359971#M238256</guid>
      <dc:creator>samrathal</dc:creator>
      <dc:date>2022-12-21T15:53:47Z</dc:date>
    </item>
    <item>
      <title>Re: How to apply wait processor for capture complete records before merger them</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-apply-wait-processor-for-capture-complete-records/m-p/359985#M238257</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/99493"&gt;@samrathal&lt;/a&gt;&amp;nbsp;&lt;BR /&gt;1. What is the purpose of the SplitJson in your dataflow?&lt;BR /&gt;2. If you have 1 FlowFile with 1000 records in it, why use SplitJson to split that in to 1000 FlowFiles having 1 record each?&amp;nbsp; Why not just merge the larger FlowFiles with multiple records in it?&amp;nbsp; Or am i missing part of the use case here?&lt;BR /&gt;&lt;BR /&gt;--- Can you share a template of flow definition of yoru dataflow?&lt;BR /&gt;1. It is not clear to me how you get "&lt;SPAN&gt;X-Total-Count" and how you are adding this FlowFile attribute to every FlowFile.&lt;BR /&gt;&lt;/SPAN&gt;2. You have configured the "Release Signal Identifier" with a boolean NiFi Expression Language (NEL) that using your example will return "false" until "fragment.count" FlowFile attribute value equals the FlowFile attribute&amp;nbsp;"&lt;SPAN&gt;X-Total-Count" value.&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 2a. I assume you are writing "X-Total-Count" to every FlowFile coming out of the SplitJson?&amp;nbsp; How are incrementing the "fragment.count" across all FlowFile in the complete 5600 record batch.&amp;nbsp; Each FlowFile that splits into 1000 FlowFiles via splitJson will have fragment.count set to 1 - 1000.&amp;nbsp; So fragment.count would never reach 5600 unless you are handling this count somewhere else in your dataflow.&lt;BR /&gt;&lt;/SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 2b. If a FlowFile where value from "fragment.count" actually equals value from "&lt;SPAN&gt;X-Total-Count" attribute, your&amp;nbsp;"Release Signal Identifier" will resolve to "true".&amp;nbsp; The ""Release Signal Identifier" value (true or false) in your configuration is looked up in the configured "distributed map cache server.&amp;nbsp; So where in your dataflow to you write the release signal to the distributed map cache? (usually&amp;nbsp;handled by a notify processor)&lt;BR /&gt;&lt;BR /&gt;&lt;/SPAN&gt;I am in no way implying that what you are trying to accomplish can't be done.&amp;nbsp; However, coming up with an end-to-end workable solution requires knowing all the steps in the use case along the way.&amp;nbsp;&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;I would recommend going through the example Wait/Notify linked in my original response to get a better understanding of how wait and notify processors work together.&amp;nbsp; Then maybe you can makes some changes to your existing dataflow implementation.&amp;nbsp; With more use case details (detailed process steps) I could suggest further changes if needed.&lt;BR /&gt;&lt;BR /&gt;I really hope this helps you get some traction on your use case here.&lt;BR /&gt;&lt;BR /&gt;If you have a contract with Cloudera, you can reach out to your&amp;nbsp; account owner who could help arrange for professional services that can work with your to solution your use cases in to workable NiFi dataflows.&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="batang,apple gothic"&gt;If you found that the provided solution(s) assisted you with your query, please take a moment to login and click&lt;/FONT&gt;&amp;nbsp;&lt;FONT face="arial black,avant garde" color="#FF0000"&gt;Accept as Solution&amp;nbsp;&lt;/FONT&gt;&lt;FONT face="batang,apple gothic" color="#000000"&gt;below each response that helped.&lt;BR /&gt;&lt;BR /&gt;Thank you,&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="batang,apple gothic" color="#000000"&gt;Matt&lt;/FONT&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 21 Dec 2022 20:25:00 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-apply-wait-processor-for-capture-complete-records/m-p/359985#M238257</guid>
      <dc:creator>MattWho</dc:creator>
      <dc:date>2022-12-21T20:25:00Z</dc:date>
    </item>
  </channel>
</rss>

