Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Master Mentor

Short Description:

A NiFi connection is where FlowFiles are temporarily held between two connected NiFi Processor components. Each connection that contains queued NiFi FlowFiles will have a footprint in the JVM heap. This article will breakdown a connection to show how NiFi manages the FlowFiles that are queued in that connection and that affects heap and performance.

Article:

First let me share the 10,000 foot view and then I will discuss each aspect of the following image:

69451-screen-shot-2018-04-12-at-11335-pm.png

*** NiFi FlowFiles consist of FlowFile Content and FlowFile Attributes/metadata. FlowFile content is never held in a connections heap space. Only the FlowFile Attributes/metadata is placed in heap by a connection.

The "Connection Queue":

The connection queue is where all FlowFiles queued in the connection are held. To understand how these queued FlowFiles affect performance and heap usage, lets start by focusing on the "Connection Queue" dissection at the bottom of the above image.

The overall size of a connection is controlled by the configured "back Pressure Object Threshold" and "Back Pressure Data Size threshold" settings the user defines per connection.

Back Pressure Object Threshold and Back Pressure Data Size Threshold:

The "Back Pressure Object Threshold" default setting here is 10000.

The "Back Pressure Data Size Threshold" defaults to 1 GB.

Both of these settings are soft limits. This means that they can be exceeded. As an example, lets assume default settings above and a connection that already contains 9,500 FlowFiles. Since the connection has not reached or exceeded object threshold yet, the processor feeding that connection will be allowed to run. If that feeding processor should produce 2,000 FlowFiles when it executes, the connection would grow to 11,500 queued FlowFiles. The preceding processor would then not be allowed to execute until the queue dropped below the configured threshold once again.

The same hold true for the Data Size threshold. Data Size is based on the cumulative reported size of the content associated to each queued FlowFile.

Now that we know how the overall size of "connection queue" is controlled, lets break it down it to its parts:

1. ACTIVE queue: FlowFiles enter a connection will initially begin to placed in the active queue. FlowFiles will continue to placed in to this queue until that queue has reached the global configured nifi swap threshold. All FlowFiles in the active queue are held in heap memory. The processor consuming Flowfiles from this connection will always pull FlowFiles from the active queue. The size of the active queue per connection is controlled by the following property in the nifi.properties file:

 nifi.queue.swap.threshold=20000

Increasing the swap threshold increase the potential heap footprint of every single connection in your dataflow(s).

2. SWAP queue: Based on above default setting, once a connection reaches 20,000 FlowFiles, new FlowFiles entering the connection are placed in the swap queue. The swap queue is also held in heap and is hard coded to 10,000 FlowFiles max. If space is freed in the active queue and no swap files exist, FlowFiles in the swap queue will be moved directly to the active queue.

3. SWAP FILES: Each time the swap queue reaches 10,000 FlowFiles, a swap file is written to disk that contains those FlowFiles. At that point new FlowFiles are again written to the swap queue. Many Swap files can be created. Using image above where connection contains 80,000 FlowFiles, there would be 30,000 FlowFiles in heap and 5 swap files. As the active queue has freed 10,000 FlowFiles, the oldest swap file are moved to the active queue until all swap files are gone. The fact that swap files must be written to and read from disk, having a lot of swap files being produced across your dataflow will affect throughput performance of your dataflow(s).

4. IN-FLIGHT queue: Unlike the above 3, the in-flight queue only exists when the processor consuming from this connection is running. The consuming processor will only pull FlowFiles from the active queue and place them in the in-flight queue until processing has successfully completed and those FlowFiles have been committed to an outbound connection from the consuming processor. This in-flight queue is also held in heap. Some processors work on 1 FlowFile at a time, others work on batches of FlowFile, and some have the potential of working on every single FlowFile on an incoming connection queue. In the last case, this could mean high heap usage while those FlowFiles are being processed.

The example above is one of those potential case using the MergeContent processor. The MergeContent processor places FlowFiles from the active queue in virtual bins. How many bins an what makes a bin eligible for merge is governed buy the processor configuration. What is important to understand that is is possible that every FlowFile in the connection could make its way into the "in-flight queue". In image example, if the MergeContent were running, all 80,000 queued Flowfiles would likely be pulled in to heap via the in-flight queue.

----

Take away from this article:

1. Control heap usage by limiting size of connection queue when possible. (Of course if your intention is to merge 40,000 FlowFiles, there must be 40,000 Flowfiles in the incoming connection. However, you could have two mergeContent processors in series each merging smaller bundles with same end result with less overall heap usage.)

2. With default back pressure object threshold settings, there will be no swap files produced on most connections (remember soft limits) which will result in better throughput performance.

3. The default configured swap threshold of 20,000 is a good balance in most cases of active queue size and performance. For smaller flows you may be able to push this higher and for extremely large flows you may want to set this lower. Just understand it is a trade-off of heap usage for performance. But if your run out of heap, there will be zero performance.

Thank you,

Matt

11,898 Views
Comments

Excellent explanation Matt!

Thanks Matt for this very clear article

I'm using a priorityAttributePrioritizer for my queues and it seems that is not taken in account when the flowfile is not in the active queue.

(The queue size is 50000, and when a flowfile with a better priorityAttribute than all flowfiles in the queue arrives, it is only processed when it arrives in the active queue)

Is this the expected behaviour ? I was expecting that the flowfile with a better priority will be processed first no matter the number of flowfiles in the queue

avatar
Master Mentor

@Félicien Catherin

Your observations are correct. The prioritizer only works against the FlowFiles currently in the "Active" queue. Because this active queue resides in JVM heap, the reordering based on priority comes at very little expense to performance. Re-evaluating all swapped FlowFiles each time a new FlowFile enters a connection would have a hit to throughput performance.
-
The first question I would ask is why the queue is so large?

-

Increasing the configured swap threshold in nifi.properties file will allow more FlowFiles to be held in the active queue, but that comes at an expense of high heap usage by your NiFi.

-

Keep in mind that the best throughput will always be obtained when no prioritizers are defined on a connection.

Thanks for your answer,
I wanted to have only "one" queue were all flowfiles would be waiting.I know now that it was i bad idea => I reduced the size of the queue and now use backpresure. It corrected the priority problem.

Thanks again !