Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi MergeRecord processor behaving in a strange manner

avatar
Explorer

Hi,

I am trying merge large number of record based flow files using MergeRecord processor. The machine has 512 GB RAM and 128 CPU cores.

I have two MergeRecord processors chained together back to back. However, I observed:

 

1. Despite having the same value for correlation attribute, not all records are being merged. I'm not sure why, but some records always escape the merge process. I initially thought it could be because of max age attribute or the fact that values for correlation attribute could vary a little. (Over 120 values)

2. In order to make up for the above, I ended up chaining 4 merge processors with incremental max age values starting with 20 minutes, then incrementing to 45, 90 and 100 minutes for the final merge processor. Here is the config used by each MergeProcessor:

  1. First MergeProcessor: Merge strategy: bin packing, min records: 100000, max records: 10000000, min bin size: 128 MB, max bin size: 1 GB, max bin age: 20 minutes and max number of bins 65.
  2. Second MergeProcessor: Merge strategy: bin packing, min records: 1000000, max records: 100000000, min bin size: 256MB, max bin size: 5 GB, max bin age: 45 minutes and max number of bins 65.
  3. Third MergeProcessor: Merge strategy: bin packing, min records: 10000000, max records: 100000000, min bin size: 512MB, max bin size: 10 GB, max bin age: 95 minutes and max number of bins 65.
  4. First MergeProcessor: Merge strategy: bin packing, min records: 1000000, max records: 10000000, min bin size: 1 GB, max bin size: 10 GB, max bin age: 100 minutes and max number of bins 31.

 

 

 

3. Despite all the above steps, I still end up several with flow files that are just few kbs in size.

4. The incoming records are basically organized per day e.g. 2019-10-28. We have data for 60 days and several million records per day. I was hoping to merge daily records into a single parquet file to be saved to HDFS.

5. Another odd behavior I observed was that the first MergeRecord processor seems to pause for a while even though there are records available for processing.(See the MergeRecord processor on top left in screen shot below with 4 GB of input files, yet stalled). This adds to unnecessary delays. Is there a pre-condition to the triggering of MergeRecord processor? My incorrect guess was that MergeProcessor would only trigger if the records available in the queue are greater than the minimum bin size.

 

Here's a screenshot of the processors:

Annotation 2019-10-29 224013.pngAnnotation 2019-10-29 224014.png

The flow files above are from the last queue - just before PutParquet processor. I double and tripled checked - all of them have the same value for correlation attribute.

 

Question:

1. How do I ensure all my files are merged? Not sure if this is even possible with NiFi?

2. Why does the MergeRecord processor halt randomly?

3. Is there a way to make this faster? I'm thinking of dumping NiFi completely in favor or Sqoop for bulk loading.

1 REPLY 1

avatar
Super Mentor

@Elephanta 

 

Based on the information provided, here are some things to be aware of/consider:

 

How a Merge based processor decides to Merge a bin:

1. At end of thread execution a bin has reached both minimums (Min size and min records)
2. The max bin age of a bin has been reached
3. at start of a thread execution there are no free bins, forces oldest bin to merge to free a bin.

 

JVM heap memory:

1. While your system has 512GB of memory, how much of that has been allocated to NiFi's JVM.  Setting a very large heap for the JVM can result in significant stop-the-world application pauses even when minor Garbage Collection (GC) occurs.  Setting JVM heap too low when you have high heap usage processor in use can result in Out Of Memory (OOM) exceptions. 

2. Merge based processors have potential for high heap usage.  While Merge content does not hold content of all FlowFiles being merged in heap memory, it does hold the FlowFile AttrIbutes of all binned FlowFiles in heap memory. So with a significant number of bins and large min record settings, this can cause high heap usage.  This intern can lead to excessive GC occurring.

Processor configuration:
1. What is being used as your correlation attribute?  Are there more than 64 possible unique correlation attribute values?  This could lead to force merging of bins in mergeRecord processor 1-3.
2. With per bin record range set 100,000 - 10,000,000, you run the risk of high heap usage, excessive GC at times, or OOM.  Do expect that each unique correlation attribute will have this many records?  Perhaps a bin never meets your minimums and merge is only happening because of max bin age.  This would explain large pauses and small output FlowFiles.

3. Knowing your incoming data to a merge processor is critical when setting min and max values.  Since both mins must be satisfied, you can run in to s scenario where max records is reached, but you did not reach min bin size.  That result in bin being forced to sit until max bin age forces it to merge since both min values were not met and because one of the max values was met nothing additional could be allocated to that bin.  Again, this can explain your long pauses and small files sizes.
4. you did not mention if your NiFi is a cluster or standalone (single) NiFi instance installation.  If a cluster, keep in mind that each node can only merge FlowFIles which exist on that same node.  Nodes are not aware of FlowFiles on other nodes.  However, since you are merging based on a correlation attribute, you can configure a connection to load-balance data across all your nodes based on that same correlation attribute.  This would allow you to use parallel processing to merge your large bundles across multiple NiFi nodes.

Threading:
1. When a processor executes, it must requests a thread from the NiFi core.  The core has a configurable Max Timer Driven Thread Pool (found in controller setting under the global menu in upper right corner).  By default this thread pool is only set to 10.  This thread pool is shared by all components you add to your canvas. With 128 cores, the recommended setting for the pool would be 256 - 512 (of course you must also take in to consideration what else may be running in this server, so monitor your cpu usage over time and adjust accordingly.)

 

Disk I/O:

1. NiFi writes all its data in to content claims on disk.  We strongly recommend that NiFi's content, flowfile, and provenance repositories are located on separate disks to improve IO and reduce likely hood of corruption of flowfile repo should content repo fill disk to 100%.

2. To help reduce heap usage of actively queued FlowFiles.  NiFi will begin writing swap files to disk when a connection queue exceeds the configured swap threshold set in the nifi.properties file.  (Note: the connection queue feeding your merge processor may or may not contained swapped FlowFiles. FlowFiles allocated to bins will still show in the connection but will not be eligible to be swapped to disk.)

 

Data ingestion:
1. Your source records seem very small. How is your data being ingested in to NiFi.  Perhaps a different method, or processor configuration can yield fewer yet large records.  This would result in more efficient merging and less disk swapping.

 

Here are some articles you may want to read:
https://community.cloudera.com/t5/Community-Articles/HDF-NIFI-Best-practices-for-setting-up-a-high-p...

https://community.cloudera.com/t5/Community-Articles/Dissecting-the-NiFi-quot-connection-quot-Heap-u...

https://community.cloudera.com/t5/Community-Articles/Understanding-NiFi-max-thread-pools-and-process...

https://community.cloudera.com/t5/Community-Articles/Understanding-NiFi-processor-s-quot-Run-Duratio...

 

What you are trying to do is definitely doable with NiFi, but may require some dataflow design and/or system tuning to achieve.


Hope this helps,

Matt