Support Questions

Find answers, ask questions, and share your expertise

MergeContent processor is erroring with Defragment strategy

avatar
Master Collaborator

Hello Experts,

I have a flow where I use 'SplitRecord' processors to split json records and then in downstream I am merging them back to single file using defragment strategy.

Flow is dividing into 2 route downstream so I use merge in 2 places, where merge works fine in one place and does not work in another place.
Flow diagram how it is setup -

hegdemahendra_0-1741857982262.png

 

So on the left side MergeContent its merging fine with defragment strategy. But on the right side (red marked) its giving below error (since another api call is in between the flow file arrival to merge record processor might be not in order.)

hegdemahendra_1-1741858007386.png

(Example Error : "Cannot defragment flow files with fragment Id XXXXXbecause the expected number of fragment is 5 but found only 3.")


I am sending 20 request to end point (HandleHttpRequest) and each request has 5 json records so each request gets split into 5 FF. so it will be come total 100 flow files.
(I am sending all 20 request one after other or parallely then this is the issue)
Note : If I send only one request (which gets split into 5) then there is no error at all it works fine.

Referred below post answer from @MattWho and tried same settings (Prioritizers is set and max bins increased to 50) -
https://community.cloudera.com/t5/Support-Questions/MergeContent-defrag-errors-when-handling-multipl...


Now the behaviour is it works some time and errors out some time.

hegdemahendra_2-1741858124859.png

And thing to note is , if I just stop the merge content processor and keep it stopped till all messages arrives and the start, then everything works fine.

So I tried setting "Run schedule " to 60 seconds from 0 seconds and concurrency to 5 from 1, then it looked like worked for all the time.
But my cases is kind of dynamic so 60 seconds Run schedule may not be meaningful.

hegdemahendra_3-1741858200566.png

 

Is there any things I am missing? your suggestions would be much appreciated

 

Thanks in advance,

Mahendra

1 ACCEPTED SOLUTION

avatar
Master Mentor

@hegdemahendra 

When using the "Defragment" merge strategy, the order of the FlowFiles will not have any affect. Defragment is dependent on the following FlowFile attributes being set on the FlowFiles:

  • Fragment.identifier - For each unique fragment.identifier, the FlowFile will will be allocated to a different bin in mergeContent.
  • fragment.index - This one-up number controls the order in which the content will be merged.
  • fragment.count - This records the total number of splits for this specific fragment.identifier.

The MergeContent will only merge a defragmented FlowFile if all fragments are allocated to a bin before the max bin age is reached. The bin age starts the moment the first fragment is allocated to a bin.

Lets look at the two scenarios that would result in what you are seeing:

  1. Insufficient bins - lets say max bins are set to 18 but you have more then 18 unique fragment.identifiers.  The first 25 FlowFiles that arrive consist of 18 unique fragment.identifiers.  That means that some of those 18 bins have more then one FlowFile associated to them because you have 7 FlowFiles that had a fragment.identifier that matched a fragment.identifier already used for one of the 18 bins.  Lets assume the oldest bin (first to have a FlowFile allocated to it) has 3 of the 25 allocated to it and the next FlowFile to get queued to MergeContent has a totally different fragment.identifier then the first 25.  In this scenario mergeContent is forced to route all FlowFiles allocated to the oldest bin to failure (you would see the ERROR you reported) in order to free up a bin so the FlowFile with the new fragment.identifier could be allocated to a bin.  Now you know that that bins FlowFiles that went to failure still have x number of fragments upstream in your dataflow somewhere that will eventually go to failure as well.
  2. Max bin age reached before all fragments are allocated to bin - In this case you have set max num bins high enough to account for all unique fragment.identifiers being binned at same time.  However bin age is set to maybe 60 minutes.  If the time from which the first fragment being allocated to bin reached 60 minutes, all the fragments in that time expired bin will route to failure (you would see the ERROR you reported).

I verified both these scenarios on my Apache NiFi 1.26 based NiFi cluster setup.

Another thing to consider... flow design.

  1. Are you running a NIFi Cluster or a single NiFi instance
  2. If clustered, are your redistributed your split FlowFiles across different nodes (maybe using load balanced connections?).  Each node in a NiFi cluster is only aware of the FlowFiles present on that specific node. So if you ends up with some  fragments on different nodes, they will never merge and end up eventually routing to failure because of one of the two scenarios I described.
  3. I assume you are not clustered since you stated that waiting for all upstream processing to complete before starting MergeContent it resulted in success.
  4. Run schedule: Changing run schedule from 0 sec to 60 sec just controls how often MergeContent gets scheduled to execute (when it executes it allocates FlowFiles from the inbound connection to bins and merges bins eligible to be merged.  With 60 second between scheduled runs it allows more time for upstream to process the fragments, but creates latency in your dataflow of a minute.
  5. Increasing concurrent tasks allows for more then one thread to execute at same time for  binning FlowFiles and merge of bins.  From your screenshot, it appears your fragments are small so merging should be pretty quick as evident from the tasks/time in your mergeContent screenshot.  So I don't see this change contributing to any impact.

Things you can check and try:

  • Add a funnel before MergeContent and connect that funnel to your MergeContent.  Route your upstream dataflow in to the funnel. Also route the "failure" relationship from MergeContent into the funnel.  this creates a loop where the FlowFiles continuously loop until all fragments successfully arrive and can be binned together for merging.MattWho_0-1742235049828.png

     

  • You may consider adding an updateAttribute (used to set an attribute you can increment with each loop) and routeOnAttribute to your failure loop to create a loop counter that would allow RouteOnAttribute to kick FlowFiles out of the infinite loop after x number of looped attempts.

Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

View solution in original post

2 REPLIES 2

avatar
Master Mentor

@hegdemahendra 

When using the "Defragment" merge strategy, the order of the FlowFiles will not have any affect. Defragment is dependent on the following FlowFile attributes being set on the FlowFiles:

  • Fragment.identifier - For each unique fragment.identifier, the FlowFile will will be allocated to a different bin in mergeContent.
  • fragment.index - This one-up number controls the order in which the content will be merged.
  • fragment.count - This records the total number of splits for this specific fragment.identifier.

The MergeContent will only merge a defragmented FlowFile if all fragments are allocated to a bin before the max bin age is reached. The bin age starts the moment the first fragment is allocated to a bin.

Lets look at the two scenarios that would result in what you are seeing:

  1. Insufficient bins - lets say max bins are set to 18 but you have more then 18 unique fragment.identifiers.  The first 25 FlowFiles that arrive consist of 18 unique fragment.identifiers.  That means that some of those 18 bins have more then one FlowFile associated to them because you have 7 FlowFiles that had a fragment.identifier that matched a fragment.identifier already used for one of the 18 bins.  Lets assume the oldest bin (first to have a FlowFile allocated to it) has 3 of the 25 allocated to it and the next FlowFile to get queued to MergeContent has a totally different fragment.identifier then the first 25.  In this scenario mergeContent is forced to route all FlowFiles allocated to the oldest bin to failure (you would see the ERROR you reported) in order to free up a bin so the FlowFile with the new fragment.identifier could be allocated to a bin.  Now you know that that bins FlowFiles that went to failure still have x number of fragments upstream in your dataflow somewhere that will eventually go to failure as well.
  2. Max bin age reached before all fragments are allocated to bin - In this case you have set max num bins high enough to account for all unique fragment.identifiers being binned at same time.  However bin age is set to maybe 60 minutes.  If the time from which the first fragment being allocated to bin reached 60 minutes, all the fragments in that time expired bin will route to failure (you would see the ERROR you reported).

I verified both these scenarios on my Apache NiFi 1.26 based NiFi cluster setup.

Another thing to consider... flow design.

  1. Are you running a NIFi Cluster or a single NiFi instance
  2. If clustered, are your redistributed your split FlowFiles across different nodes (maybe using load balanced connections?).  Each node in a NiFi cluster is only aware of the FlowFiles present on that specific node. So if you ends up with some  fragments on different nodes, they will never merge and end up eventually routing to failure because of one of the two scenarios I described.
  3. I assume you are not clustered since you stated that waiting for all upstream processing to complete before starting MergeContent it resulted in success.
  4. Run schedule: Changing run schedule from 0 sec to 60 sec just controls how often MergeContent gets scheduled to execute (when it executes it allocates FlowFiles from the inbound connection to bins and merges bins eligible to be merged.  With 60 second between scheduled runs it allows more time for upstream to process the fragments, but creates latency in your dataflow of a minute.
  5. Increasing concurrent tasks allows for more then one thread to execute at same time for  binning FlowFiles and merge of bins.  From your screenshot, it appears your fragments are small so merging should be pretty quick as evident from the tasks/time in your mergeContent screenshot.  So I don't see this change contributing to any impact.

Things you can check and try:

  • Add a funnel before MergeContent and connect that funnel to your MergeContent.  Route your upstream dataflow in to the funnel. Also route the "failure" relationship from MergeContent into the funnel.  this creates a loop where the FlowFiles continuously loop until all fragments successfully arrive and can be binned together for merging.MattWho_0-1742235049828.png

     

  • You may consider adding an updateAttribute (used to set an attribute you can increment with each loop) and routeOnAttribute to your failure loop to create a loop counter that would allow RouteOnAttribute to kick FlowFiles out of the infinite loop after x number of looped attempts.

Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Master Collaborator

Thank you so much @MattWho for the detailed answer.
The retry logic helped a lot, I have added 'RetryFlowFile' processors in between to avoid infinite loop of retry.