- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Apache NiFi Flow: Merging Performance
- Labels:
-
Apache NiFi
-
HDFS
Created 03-24-2025 04:22 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello everyone,
I have developed an Apache NiFi flow that ingests streaming data, converts it to JSON, merges the records, and then stores them in HDFS. However, I have observed a couple of issues:
File Structure: After the merge operation, many files retain their original structure rather than being merged as expected.
Performance: Increasing the number of pins along with the minimum and maximum record counts and bin sizes causes the merge process to become significantly slower. Conversely, reducing these parameters results in files that mostly maintain their original structure.
Could anyone provide insights or recommendations on how to resolve these issues?
Thank you.
My crrunt cnfigration that cause the flow to go the orginal
current comigration where it causes go to original
The flow
the flow
The congigration that never showsnmerged records.
with this cofigration will take forever to merge I didn't see any output
Created 03-24-2025 08:26 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello @MattWho @Shelton @SAMSAL do you have any insights here? Thanks!
Regards,
Diana Torres,Community Moderator
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:
Created 03-24-2025 12:20 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@MarinaM
Suggested reading:
https://community.cloudera.com/t5/Community-Articles/How-to-address-JVM-OutOfMemory-errors-in-NiFi/t...
I'll start with this observation you shared:
"My current configuration that cause the flow to go the original"
The Merge processors will always send all input FlowFiles to the "Original" relationship when a bin is merged. The "Merged" relationship is where he newly produced FlowFiles will be output with the contents of the merged records.
Let break down your two different configurations to understand what each is doing:
- Here you are parsing inbound FlowFiles and allocating the content to a bin (each bin will contain only "like" records. A "like" record/FlowFile is one that has the exact same schema as another. At each scheduled execution a thread is requested that reads from the inbound connection queue and allocates FlowFiles to one or more bins. Judging by your screenshots, it appears you have numerous concurrent tasks configured on this processor. Here is where multiple different scenarios can happen:
- You have multiple concurrent threads processing inbound FlowFiles and allocating FlowFiles to bin(s). After a FlowFiles records are allocated to a bin, the processor checks to see if a bin is eligible for merge (In above case, a bin is merged if it has at least 10,000 FlowFiles in it AND the bin size is at least 20 MB.) So your merged records could have as few as 10,000 records. It is hard to tell from your screenshots what the average size of each record is. If 5,000 FlowFiles where to add up to 35MB of content, then that bin would not be able to accept anymore records; however, the min entries was not satisfied. So that bin just sits since you have no max bin age configured. New records, even if they are "like" records, would start being allocated in another bin. Eventually, there will be no more available bins to allocate to, so the processor will force merge the oldest bin to free a bin to allocate more records to.
- You have many "unlike" records (more then 5 unique schemas). So quickly you exhaust you available bins and the oldest bin is force merge with very few or even one record in it. I also don't know the average number of records in each inbound FlowFile.
- Your input FlowFile already has a size of 20 MB, so it is only FlowFile allocated to a bin and then merge when bin is forced output FlowFile contains same number of records.
Now looking at alternate configuration:
- Here you have increased min num records, min bin size, max bin size, and num of bins.
Things that can happen here:
- With 50 bins and a min num FlowFiles of 1,000,000, there is potential for heap memory issues since that large number of binned FlowFiles are all held in your NiFi JVM heap memory. I'd suggest checking your logs for any OOM exceptions with this configuration.
- Assuming no memory issues, you likely have less then 50 unique schemas and each bin has still not satisfies both min values (1,000,000 FlowFiles AND 50 MB content). Thus bin just sit not getting merged still waiting for more FlowFiles to add to a bin to merge.
- Try setting a max bin age to force bins to merge after that amount of time to see what. you get.
My guess here is that your smaller configuration is working:
- Inbound FlowFiles that fail to merge go to failure relationship. Since you did not mention anything about FlowFiles routing to the "failure" relationship and the fact that we see FlowFiles on the "original" relationship, tells me that bis are being merged.
- If you have a lot of unique schemas, it is likely some bins may getting force merged to free a bin, so output looks no different then input FlowFile.
- Just looking at your "original" relationship containing 350,981 FlowFiles and 91.25 MB, your average FlowFile size is only ~275 bytes each.
- Are you saying nothing ever goes to "merged" relationship? I expect there would be considering all these FlowFiles in "Original" relationship.
Hopefully this helps you understand how this processor works and helps you take a closer look at your inbound FlowFiles and their schemas to see if they are each unique.
Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 03-26-2025 01:55 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Dear Matt,
Thank you for your response; it was incredibly helpful.
Based on your feedback and the link you shared, I have revised my design. Below are the key points regarding my implementation:
All my data follows the same schema. Before merging, I convert the data to a JSON format to ensure consistency.
There are no failures occurring during the merge process.
The merge operation successfully produces a merged file.
However, I am still receiving the original output, which I need to eliminate.
Following the documentation, I have adjusted my design to include multiple consecutive merge steps, as illustrated in the attached diagram. Given the large volume of flow data, I have allocated high system specifications to optimise performance.
Additionally, my heap size is currently set to 42 GB.
Would increasing it further be beneficial?
To mitigate swap-related issues, I have adjusted the swap space to 300,000 (from 20,000) since I previously experienced rapid memory fill-up. Could you advise on any potential side effects of this change?
Looking forward to your insights.
Created 03-26-2025 10:12 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
appriciate your response
Created 04-01-2025 08:47 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@MarinaM Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
Regards,
Diana Torres,Community Moderator
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:
