Member since
07-01-2019
8
Posts
1
Kudos Received
0
Solutions
10-30-2019
05:49 AM
2 Kudos
@Elephanta Based on the information provided, here are some things to be aware of/consider: How a Merge based processor decides to Merge a bin: 1. At end of thread execution a bin has reached both minimums (Min size and min records) 2. The max bin age of a bin has been reached 3. at start of a thread execution there are no free bins, forces oldest bin to merge to free a bin. JVM heap memory: 1. While your system has 512GB of memory, how much of that has been allocated to NiFi's JVM. Setting a very large heap for the JVM can result in significant stop-the-world application pauses even when minor Garbage Collection (GC) occurs. Setting JVM heap too low when you have high heap usage processor in use can result in Out Of Memory (OOM) exceptions. 2. Merge based processors have potential for high heap usage. While Merge content does not hold content of all FlowFiles being merged in heap memory, it does hold the FlowFile AttrIbutes of all binned FlowFiles in heap memory. So with a significant number of bins and large min record settings, this can cause high heap usage. This intern can lead to excessive GC occurring. Processor configuration: 1. What is being used as your correlation attribute? Are there more than 64 possible unique correlation attribute values? This could lead to force merging of bins in mergeRecord processor 1-3. 2. With per bin record range set 100,000 - 10,000,000, you run the risk of high heap usage, excessive GC at times, or OOM. Do expect that each unique correlation attribute will have this many records? Perhaps a bin never meets your minimums and merge is only happening because of max bin age. This would explain large pauses and small output FlowFiles. 3. Knowing your incoming data to a merge processor is critical when setting min and max values. Since both mins must be satisfied, you can run in to s scenario where max records is reached, but you did not reach min bin size. That result in bin being forced to sit until max bin age forces it to merge since both min values were not met and because one of the max values was met nothing additional could be allocated to that bin. Again, this can explain your long pauses and small files sizes. 4. you did not mention if your NiFi is a cluster or standalone (single) NiFi instance installation. If a cluster, keep in mind that each node can only merge FlowFIles which exist on that same node. Nodes are not aware of FlowFiles on other nodes. However, since you are merging based on a correlation attribute, you can configure a connection to load-balance data across all your nodes based on that same correlation attribute. This would allow you to use parallel processing to merge your large bundles across multiple NiFi nodes. Threading: 1. When a processor executes, it must requests a thread from the NiFi core. The core has a configurable Max Timer Driven Thread Pool (found in controller setting under the global menu in upper right corner). By default this thread pool is only set to 10. This thread pool is shared by all components you add to your canvas. With 128 cores, the recommended setting for the pool would be 256 - 512 (of course you must also take in to consideration what else may be running in this server, so monitor your cpu usage over time and adjust accordingly.) Disk I/O: 1. NiFi writes all its data in to content claims on disk. We strongly recommend that NiFi's content, flowfile, and provenance repositories are located on separate disks to improve IO and reduce likely hood of corruption of flowfile repo should content repo fill disk to 100%. 2. To help reduce heap usage of actively queued FlowFiles. NiFi will begin writing swap files to disk when a connection queue exceeds the configured swap threshold set in the nifi.properties file. (Note: the connection queue feeding your merge processor may or may not contained swapped FlowFiles. FlowFiles allocated to bins will still show in the connection but will not be eligible to be swapped to disk.) Data ingestion: 1. Your source records seem very small. How is your data being ingested in to NiFi. Perhaps a different method, or processor configuration can yield fewer yet large records. This would result in more efficient merging and less disk swapping. Here are some articles you may want to read: https://community.cloudera.com/t5/Community-Articles/HDF-NIFI-Best-practices-for-setting-up-a-high-performance/ta-p/244999 https://community.cloudera.com/t5/Community-Articles/Dissecting-the-NiFi-quot-connection-quot-Heap-usage-and/ta-p/248166 https://community.cloudera.com/t5/Community-Articles/Understanding-NiFi-max-thread-pools-and-processor-concurrent/ta-p/248920 https://community.cloudera.com/t5/Community-Articles/Understanding-NiFi-processor-s-quot-Run-Duration-quot/ta-p/248921 What you are trying to do is definitely doable with NiFi, but may require some dataflow design and/or system tuning to achieve. Hope this helps, Matt
... View more
10-29-2019
10:19 PM
@Elephanta Okay, now with that information I get a better understanding and picture. By default, HDP 2.6 has a replication factor of 3 so it's looking to place the other 2 copies on different data nodes that the existing unless you create new files with a replication factor of 1 you will continue to get the unreplicated block errors 🙂 but now that you know it's manageable. Maybe next time you delete files in hdfs use the -skipTrash option hdfs dfs -rm -skipTrash /path/to/hdfs/file/to/remove/permanently or emtpyting the existing .Trash Options: Change the replication factor of a file at creatio: hdfs dfs –setrep –w 1 /user/hdfs/file.txt Or change the replication factor of a directory hdfs dfs -setrep -R 1 /user/hdfs/your_dir Changing the replication factor for a directory will only affect the existing files and the new files under the directory will get created with the default replication factor in dfs.replication from hdfs-site.xml maybe in your case this is what you should change to 1 as this takes effect cluster-wide for your Dev environment Happy hadooping
... View more