Member since
07-30-2019
3427
Posts
1632
Kudos Received
1011
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 85 | 01-27-2026 12:46 PM | |
| 495 | 01-13-2026 11:14 AM | |
| 1037 | 01-09-2026 06:58 AM | |
| 923 | 12-17-2025 05:55 AM | |
| 984 | 12-15-2025 01:29 PM |
01-15-2018
09:34 PM
1 Kudo
@dhieru singh NiFi offers a "Summary" UI which has a connections tab you can select. Once selected you can sort on the "Queue / Size threshold" column by clicking on the "Queue" word". This will move your connections with the largest threshold percentage to the top of the list. 100% for Queue threshold indicates object threshold back pressure is being applied on that connection. 100% for Size threshold indicates that Queue Size back pressure is being applied on that connection. Clicking on the error to the far right of the row will take you directly to hat connection on your canvas no matter which process group it exists within. Thank you, Matt If you found this answer to be helpful in addressing your question, please take a moment to click the accept link below.
... View more
11-22-2017
04:10 PM
@Pratik Ghatak The NiFi instance/cluster with the Remote Process Group (RPG) is acting as the client and the target Nifi instance/Cluster is acting as the server in this Site-To-Site (S2S) connection. Your Target NiFi configuration you shared shows taht you have S2S setup to support both the RAW and HTTP transport protocols. It also appears from your screenshots that the initial connection between your two NiFis is working correctly. The error you are seeing indicates that you have not added any remote input ports on the target NiFi's canvas to receive a FlowFIles from the source NiFi. On the target NiFi, you will need to add one or more "input ports": Input ports added at the root/top level process group are considered "Remote input ports" and can be used to receive data over S2S. After you add these "remote input ports" to your target Nifi's canvas, you can right click on your source NiFi RPG and select "refresh" from the context menu that appears (or you can just wait for next auto-refresh at 30 seconds). Now you should be able to see those remote input ports when you drag a connection to the RPG. Thank you, Matt If you find this information has addressed your question/issue, please take a moment to click "Accept" beneath the Answer.
... View more
11-22-2017
03:58 PM
1 Kudo
@Mohamed Hossam You could use the ReplaceText processor instead of your script to accomplish what you are trying to do: The above ReplaceText processor will create 4 capture groups for the desired columns from your input FlowFiles. It will even work against incoming FlowFiles that have multiple entries (1 per line) Thank you, Matt If you find this answer addresses yoru question/issue, please take a moment to click "Accept" beneath the answer.
... View more
11-22-2017
03:15 PM
@Lanic All of NiFi executes within a single JVM, so it is not possible to kill a single thread within a single JVM. Unfortunately, a restart is required to clear hung threads. Hopefully you have ironed out any misconfiguration in development that lead to hung threads before deploying to production. I understand that naytime a restart is needed can be very annoying. Thanks, Matt A few Hortonworks community forum tips: 1. Use "@<username>" in responses so that user gets pinged; otherwise, user may never know you asked follow-up question. 2. Avoid starting a new "Answer" to respond to an existing answer. Instead click "Add comment". This makes following a thread/line of thought easier. 3. When an "Answer" addresses/solves your question, please select "Accept" beneath that answer. This encourages user participation in this forum.
... View more
11-22-2017
03:06 PM
2 Kudos
@sally sally If you do not know the exact number of files you expect to Merge, you must consider FlowFile latency. Consider the above MergeContent processor configuration: 1. "Correlation Attribute Name" <-- This property is used to determine which bin inbound queued FlowFiles are assigned to what bin. (You need to make sure you have enough bins to accommodate the number of unique filenames you expect to deal with.) 2. "Minimum Number of Entires" <-- This is the expected min number of FlowFiles to be allocated to a bin before it is considered eligible to be merged. (Since you do not know how many files will have the same filename (2,000+), I set this to 10,000). NiFi will continue to try and Flowfiles with the same filename until 10,000 is reached or bin has existed for "max bin age". 3. "Maximum Number of Entries" <-- If inbound queue more then 20,000 flowfiles with same filename, this property will trigger bin to merge at 20,000 and a new bin started for that filename. 20,000 is generally considered a good ceiling here to prevent excessive heap usage during merge. 4. "Max Bin Age" <-- This is your force merge property. No matter how many Flowfiles have been assigned to a given bin, that bin will be merged once the bin has existed for this amount of time. You set this to the max latency you are willing to accept for this dataflow. Thank you, Matt
... View more
11-22-2017
01:57 PM
@Lanic The reason you cannot empty that queue is because the "PutEmail" processor is still running and own that FlowFile. When you stop a processor it goes into a state of "stopping". In this state NiFi will no longer schedule the processor to run, but it does not interrupt any running threads. The small number in the upper right corner indicates the number of active threads tied to this processor component. Here I see "2" on your putEmail processor. Most likely 1 thread is the running configured processor task and the other thread is the stopping thread. Once these threads have completed, you will be able to empty the queue feeding this processor. If the thread(s) never release, then you have a hung processor. Sometimes this is the result of a poorly configured processor or associated controller service and other times it may be core related. You will need to inspect several Nifi thread dumps to identify the hung thread and see what it is waiting on. You can get a thread dump by running the following command: <path to Nifi>/bin/nifi.sh dump <name for dump file> Thanks, Matt
... View more
10-30-2017
08:32 PM
1 Kudo
@dhieru singh Min number of entries must be set and defaults to 1. That is fine as long as you don't set max num entries. You are correct it is (min number of entries AND min group size) OR max number of entries OR max group size. So either of the "max " settings will force a merge just like max bin age will.
... View more
10-30-2017
08:27 PM
@dhieru singh
A few things to check... 1. Have you monitored your CPU utilization on your NiFi nodes? 2. If you CPUs are not saturated, what do you have your "Max Timer Driven Thread Count" set to? All your processors must share threads from this resource pool. If the resource poll is to small, Processor threads end up just waiting in line for a thread. So if you have plenty of CPU resources still available, You may want to push this value up. The default is only 10 threads and can be found via "Controller Settings" under the hamburger menu in the upper right corner of the NIFi UI. *** A good rule of thumb staring point for this settings is 2- 4 times the number of cores you have on a single node. This configuration is per node. so if set to 40 and you have 2 nodes, the total thread pool is 80 threads across your 2 node cluster. 3. Adding additional MergeContent processors is not likely to make much difference here. But adding additional Concurrent tasks may help. Just keep in mind the number of FlowFiles (not size) being merged in each bin to avoid heap/Garbage collection issues that will affect performance. 4. Make sure you have sufficient heap memory to run this flow with minimal partial or full garbage collection stop the world events. While young/partial garbage collection is normal and health, old/full garbage collection can have areal affect on performance. Heap memory allocations are set in the nifi bootstrap.conf file. Thanks, Matt
... View more
10-30-2017
08:14 PM
2 Kudos
@dhieru singh The mergeContent processor does not remove FlowFiles from an inbound connection until the actual merge process occurs. When mergeContent runs it allocates queued FlowFiles to one or more bins. While the FlowFiles themselves remain on the queue for tracking purposes, any allocated FlowFiles are now owned by that MergeContent processor. They will not be able to list or delete FlowFiles from this queue while the MergeContent processor is running. In your case you have two NiFi nodes each with 10,000 FlowFiles for a total queue of 20,000 FlowFiles. Keep in mind that each of your nodes is merging only those FlowFiles that exist on its same node. In your current configuration you have two min set: min number entries = 10,000 <-- that has men met on both nodes min group size = 127 MB <-- This has not been met yet (you have about 2.2 MB on each node) Because higher of the two must be met before a bin is eligible to merge. Max number entries = 15,000 <-- you have not reached this on either node. Max group size = 128 MB <-- You have not met this on either node. NiFi will force a merge when either of these is reached. In your case, you would hit 15000 long before you reach 128 MB. There is one more setting you have not set at all. Max bin age = Setting this will force the merge of a bin no matter what any of the above settings are if the age of the bin has exceed this configured value. I always recommend you set this to avoid FlowFiles lingering for exceptionally long periods of time. You could increase the backpressure object threshold on the inbound connection, but the size you would need to in crease it to in order to hit your 127 MB min group size would amount to merging more then 565,000 FlowFiles. Since the MergeContent must hold the attributes for all FlowFiles being merged in heap memory, merging this number of FlowFiles in one operation is likely to result in out of memory errors. In cases like this I recommend doing a two phase merge with multiple MergeContent processors. I would configure first MergeContent to merge based on min (15000) and max (20000) number entries. You should also set a max bin age. You will need to make sure you push up your connection object threshold from 10000 to 20000. The second MergeContent Processor would then be set to merge based on min (120 MB) and max (128 MB) group size. You should also set a max bin age. The result of which will be far less heap usage but the same desired end result. The flow would look something like this: Thanks, Matt
... View more
10-24-2017
04:04 PM
@Ravi Papisetti My suggestion here would be to make the following changes in your nifi.properties file on all nodes: nifi.cluster.node.protocol.threads=15 nifi.cluster.node.connection.timeout=30 secs nifi.cluster.node.read.timeout= 30 secs This will give nodes longer to respond to change requests before they get dropped by cluster coordinator. You may also want to keep an eye our for any OOM or GC issues on your nodes that may be occurring at the times of making these changes. Thanks, Matt
... View more