Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NIFI 1.13.2 Cluster with Randomly Restarting Nodes

avatar
Contributor

NIFI 1.13.2 has nodes randomly restarting with connections refused.

cluster information

12 nodes 8 cores 32 gb ram

bootstrap.conf

java=java
run.as=
lib.dir=./lib
conf.dir=./conf
graceful.shutdown.seconds=20
java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true
java.arg.2=-Xms28g
java.arg.3=-Xmx28gjava.arg.4=-Djava.net.preferIPv4Stack=true
java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
java.arg.7=-XX:ReservedCodeCacheSize=256m
java.arg.8=-XX:CodeCacheMinimumFreeSpace=10m
java.arg.9=-XX:+UseCodeCacheFlushingjava.arg.14=-Djava.awt.headless=true
nifi.bootstrap.sensitive.key=
java.arg.15=-Djava.security.egd=file:/dev/urandom
java.arg.16=-Djavax.security.auth.useSubjectCredsOnly=true
java.arg.17=-Dzookeeper.admin.enableServer=falsejava.arg.snappy=-Dorg.xerial.snappy.tempdir=/opt/nifi/tmp
notification.services.file=./conf/bootstrap-notification-services.xml
notification.max.attempts=5
# cluster node properties (only configure for cluster nodes) #
nifi.cluster.is.node=true
#nifi.cluster.node.address=ip-xx-xxx-xxx-xxx.us-gov-west-1.compute.internal
nifi.cluster.node.address=192.170.108.140
nifi.cluster.node.protocol.port=11443
nifi.cluster.node.protocol.threads=100
nifi.cluster.node.protocol.max.threads=800
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=60 sec
nifi.cluster.node.read.timeout=60 sec
nifi.cluster.node.max.concurrent.requests=800
nifi.cluster.firewall.file=
nifi.cluster.flow.election.max.wait.time=5 mins
nifi.cluster.flow.election.max.candidates=7

# cluster load balancing properties #
nifi.cluster.load.balance.host=192.170.108.140
nifi.cluster.load.balance.port=6342
nifi.cluster.load.balance.connections.per.node=50
nifi.cluster.load.balance.max.thread.count=600
nifi.cluster.load.balance.comms.timeout=45 sec

# zookeeper properties, used for cluster management #
nifi.zookeeper.connect.string=192.170.108.37:2181,192.170.108.67:2181,192.170.108.120:2181,192.170.108.104:2181,192.170.108.106:2181
nifi.zookeeper.connect.timeout=30 secs
nifi.zookeeper.session.timeout=30 secs
nifi.zookeeper.root.node=/nifi_tf

Sample Error

2021-05-14 13:00:11,927 ERROR [Load-Balanced Client Thread-356] o.a.n.c.q.c.c.a.n.NioAsyncLoadBalanceClient Unable to connect to nifi-tf-11.bogus-dns.pvt:9443 for load balancing
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:482)
at sun.nio.ch.Net.connect(Net.java:474)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:107)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:92)
at org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClient.createChannel(NioAsyncLoadBalanceClient.java:456)
at org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClient.establishConnection(NioAsyncLoadBalanceClient.java:399)
at org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClient.communicate(NioAsyncLoadBalanceClient.java:211)
at org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask.run(NioAsyncLoadBalanceClientTask.java:81)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2021-05-14 13:00:15,592 ERROR [Load-Balanced Client Thread-160] o.a.n.c.q.c.c.a.n.NioAsyncLoadBalanceClient Unable to connect to nifi-tf-11.bogus-dns.pvt:9443 for load balancing

 

System Diagnostics

Kilynn_3-1621009187973.pngKilynn_2-1621009162488.pngKilynn_1-1621009146078.png

 

1 ACCEPTED SOLUTION

avatar
Super Mentor

@Kilynn 

The following property in the nifi.properties file controls when a swap file is created per connection.

nifi.queue.swap.threshold=20000

This is per connection and not for all FlowFiles across all connections.  A FlowFile swap file will always consist of 10000 FlowFiles.
So if a connection reaches 20000 queued FlowFiles, a swap file will be created for 10000 of those.  So if a connection queue reaches 40000, you would have 3 swap files of that connection.

You can control individual connection queues by setting the "Back pressure Object Threshold" on a connection:

MattWho_0-1621516074494.png

Note: Threshold settings are soft limits
And default for object threshold is 10000.  
So with these settings there should be very little to no swapping of FlowFiles to disk happening at all.   Swap files would only happen if source processor to that connection output enough FlowFiles to connection at one time to trigger a swap file.

For example:
- Connection has 9000 queued FlowFiles, so back pressure is not being applied.
- Source processor is thus allowed to execute
- Source processor upon execution produces 12000 FlowFiles
- now downstream connection has 21000 queued FlowFiles.  One swap file is produced and back pressure is enabled until queue drops back below 10000 queued FlowFiles.

FlowFiles consist of two parts (FlowFile attributes/metadata and FlowFileContent).  The only portion of a FlowFile held in heap memory is the FlowFile attributes/Metadata.  FlowFile content is never held in memory (Some processors may load content in to memory in order to perform their function only).

FlowFile attributes/metadata is persisted to the flowfile repository and FlowFile content is written to the content repository.  This important to avoid data loss if NiFi dies or is restarted while data still exists in connection queues.

If you found this helped with your query, please take a moment to login and click accept in the solutions that helped.

Thank you,

Matt

View solution in original post

3 REPLIES 3

avatar
Super Mentor

@Kilynn 

Can you provide more detail around "nodes randomly restarting"?
What do you see in the nifi-app.log and nifi-bootstrap.log when the node(s) restart?
Do you see anything in /var/log/messages?  Maybe OOM Killer?

I see Max heap shows 330 GB configured.  With 12 nodes that is ~27GB set per node.  
Any particular reason why you set heap so high for your NiFi nodes?
Larger heaps mean longer stop-the-world GC pauses.

What version of Java are you using?

Thanks,

Matt

avatar
Contributor

@MattWho , thanks for your response.  Before I had heard from you I had checked /var/log/messages and searched for OOM messages without success.  However, there was no indications of nifi dying in nifi-app.log so I dug a little farther.  As it turned out I did find the OS killing the child process in NIFI.  I reduced the JVM to 24gb and the OS no longer feels the need to kill it.  So that problem is solved.  As for the JVM, we were attempting to keep as much of our flow queues in memory to reduce disk i/o.  Is there a better strategy?

avatar
Super Mentor

@Kilynn 

The following property in the nifi.properties file controls when a swap file is created per connection.

nifi.queue.swap.threshold=20000

This is per connection and not for all FlowFiles across all connections.  A FlowFile swap file will always consist of 10000 FlowFiles.
So if a connection reaches 20000 queued FlowFiles, a swap file will be created for 10000 of those.  So if a connection queue reaches 40000, you would have 3 swap files of that connection.

You can control individual connection queues by setting the "Back pressure Object Threshold" on a connection:

MattWho_0-1621516074494.png

Note: Threshold settings are soft limits
And default for object threshold is 10000.  
So with these settings there should be very little to no swapping of FlowFiles to disk happening at all.   Swap files would only happen if source processor to that connection output enough FlowFiles to connection at one time to trigger a swap file.

For example:
- Connection has 9000 queued FlowFiles, so back pressure is not being applied.
- Source processor is thus allowed to execute
- Source processor upon execution produces 12000 FlowFiles
- now downstream connection has 21000 queued FlowFiles.  One swap file is produced and back pressure is enabled until queue drops back below 10000 queued FlowFiles.

FlowFiles consist of two parts (FlowFile attributes/metadata and FlowFileContent).  The only portion of a FlowFile held in heap memory is the FlowFile attributes/Metadata.  FlowFile content is never held in memory (Some processors may load content in to memory in order to perform their function only).

FlowFile attributes/metadata is persisted to the flowfile repository and FlowFile content is written to the content repository.  This important to avoid data loss if NiFi dies or is restarted while data still exists in connection queues.

If you found this helped with your query, please take a moment to login and click accept in the solutions that helped.

Thank you,

Matt