Created on 03-28-2019 01:43 PM - edited 08-17-2019 04:20 PM
We are receiving Json messages from upstream system via Kafka topic. Requirement is to store these messages into HDFS at certain interval. Since we are storing into HDFS we want to merge certain number of these Records in to single file. As per NiFi documentation we are using "MergeRecords" processor for that.
Below is the snapshot of the Processor Configuration. NiFi version: 1.8
For the Above configuration its expected that MergeRecords should have weighted for one of the thresholds i.e. Maximum records(100000) or Maximum Bean size(100KBs).
But its observed that bean is getting bundled pretty before either of the threshold is reached. It is triggering the bean formation only for 2 records of 5KB size.
If you could help with analysis and/or pointers as why MergeRecord processor is not behaving as per the configuration?
Created 03-28-2019 05:05 PM
-
The "Max" configuration properties do not govern how long a bin waits to be merged.
The Merge based processors work as follows:
-
1. The processor executes based upon the configured run Schedule.
2. At exact time of execution the Merge processor looks at what FlowFiles exist on Inbound connection that have not already been allocated to a Merge processor bin.
3. Those FlowFiles are then allocated to one or more bins. The max Bin size and Max number records create a ceiling for how many FlowFiles can be allocate to a bin. If a bin has reached one of these max values, additional FlowFile in this current execution start getting allocated to a new bin.
4. Once all FlowFiles from this current execution (Thread does not keep checking for new FlowFiles coming in to inbound connection. Those new FlowFiles would be handled by next execution) have been allocated to one or more bins, those bins are evaluated to see if they are eligible to be merged. In order to be eligible the bin must meet both minimum settings for size and number of records or the max bin age has been reached. In your case, a bin could be binned with only 20 records and 20 KB of size or if a bin has existed for at least 1 minute.
-
If you find your merging small bins consistently, changing the run schedule on your merge processor should help. This would allow more time between executions for FlowFiles to queue on the inbound connection.
-
IMPORTANT: Keep in mind that all FlowFiles allocated to bins are bing held in heap memory (swapping does not occur with bins). Specifically the FlowFile attributes/Metadata is the portion of the FlowFile held in heap memory. Your max records of 100,000 could result in considerable heap pressure. Using two Merge processors in series could achieve same result with lower heap usage.
-
I use MergeContent in following Article about connection queues as an example:
https://community.hortonworks.com/articles/184990/dissecting-the-nifi-connection-heap-usage-and-perf...
-
Thank you,
Matt
-If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
Created 03-28-2019 05:05 PM
-
The "Max" configuration properties do not govern how long a bin waits to be merged.
The Merge based processors work as follows:
-
1. The processor executes based upon the configured run Schedule.
2. At exact time of execution the Merge processor looks at what FlowFiles exist on Inbound connection that have not already been allocated to a Merge processor bin.
3. Those FlowFiles are then allocated to one or more bins. The max Bin size and Max number records create a ceiling for how many FlowFiles can be allocate to a bin. If a bin has reached one of these max values, additional FlowFile in this current execution start getting allocated to a new bin.
4. Once all FlowFiles from this current execution (Thread does not keep checking for new FlowFiles coming in to inbound connection. Those new FlowFiles would be handled by next execution) have been allocated to one or more bins, those bins are evaluated to see if they are eligible to be merged. In order to be eligible the bin must meet both minimum settings for size and number of records or the max bin age has been reached. In your case, a bin could be binned with only 20 records and 20 KB of size or if a bin has existed for at least 1 minute.
-
If you find your merging small bins consistently, changing the run schedule on your merge processor should help. This would allow more time between executions for FlowFiles to queue on the inbound connection.
-
IMPORTANT: Keep in mind that all FlowFiles allocated to bins are bing held in heap memory (swapping does not occur with bins). Specifically the FlowFile attributes/Metadata is the portion of the FlowFile held in heap memory. Your max records of 100,000 could result in considerable heap pressure. Using two Merge processors in series could achieve same result with lower heap usage.
-
I use MergeContent in following Article about connection queues as an example:
https://community.hortonworks.com/articles/184990/dissecting-the-nifi-connection-heap-usage-and-perf...
-
Thank you,
Matt
-If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
Created 03-29-2019 09:55 AM
@Matt Clarke I have increased the Run Schedule to 2 mins and made sure that queue has all the flowfiles before executing MergeRecord processor. Still my output is 3 files of 4KB each but the desired outcome is one merged file. Not sure what configuration is impacting the result.
Created 03-29-2019 06:13 PM
Did you change your Max bin age setting to a value higher than 1 minutes? Try setting it to 10 minutes.
Is your NiFi a standalone instance or a NiFi cluster? Keep in mind that each Node in a NIFi cluster runs is own copy of the flow.xml.gz and works on it own set of FlowFiles. So the merge processor can only bin and merge the FlowFiles local to each node.
Thanks,
Matt
Created 03-29-2019 06:58 PM
@Matt Clarke Its a nifi cluster having 3 nodes. That's why I am getting 3 files. Is there any way to handle this? I have already I invested my 4 days into this. Lol.
Created 04-01-2019 03:56 PM
In order to merge FlowFiles that exist on multiple nodes in your cluster you are going to need to move all FlowFiles to one node. Apache NiFi 1.9.x versions introduced a new "Load Balanced" configuration option on dataflow connections. One of the options for the configurable "Load Balance Strategy" is "Single node". Setting this strategy will route all queued FlowFiles to one node in the cluster. You could set this on the connection feeding your Merge processor.
-
In Apache NiFi 1.8 and older you would need to use the PostHTTP processor (configured to send as FlowFile) to send all FlowFiles to a ListenHTTP processor running at one of your nodes URL (processor ill run on al nodes, but your postHTTP will only be configured with URL for one node). Problem with this solution is that if the target URL server goes down, your dataflow will stop working.
-
Thank you,
Matt
Created 04-04-2019 11:15 AM
@Matt Clarke Thanks a lot. You are genius!