Created on 04-29-2016 03:08 PM - edited 08-17-2019 12:36 PM
Apache NiFi provides several processors for receiving data over a network connection, such as ListenTCP, ListenSyslog, and ListenUDP. These processors are implemented using a common approach, and a shared framework for receiving and processing messages.
When one of these processors is scheduled to run, a separate thread is started to listen for incoming network connections. When a connection is accepted, another thread is created to read from that connection, we will call this the channel reader (the original thread continues listening for additional connections).
The channel reader starts reading data from the connection as fast as possible, placing each received message into a blocking queue. The blocking queue is a shared reference with the processor. Each time the processor executes it will poll the queue for one or more messages and write them to a flow file. The overall setup looks like the following:
There are several competing activities taking place:
In order to achieve optimal performance, these processors expose several properties to tune these competing activities:
Adjusting the above values appropriately should provide the ability to tune for high through put scenarios. A good approach would be to start by increasing the Max Batch Size from 1 to 1000, and then observe performance. From there, a slight increase to the Max Size of Socket Buffer, and increasing Concurrent Tasks from 1 to 2, should provide additional improvements.
Created on 10-24-2016 11:38 AM
Thank you for these useful explanations !
Do you know how these listners behave in the case of Nifi Cluster? Is the number of listeners determined by the value of concurrent tasks or do we have as many listeners as the number of the cluster nodes?
Created on 10-24-2016 01:48 PM
As many listeners as cluster nodes, you would need to route the traffic to each node appropriately, one option being a load balancer in front that supports tcp or udp. The concurrent tasks only affects processing the messages that have already been read by the listener.
Created on 10-25-2016 09:06 AM
Thank you for these explanations, it is very clear.
Do you know what's happen for the contents of Message Queue if the nifi node is crashed ?
How setup dataflow to avoid losing data ?
Created on 10-25-2016 01:04 PM
The Message Queue is in memory so anything in there would be lost if the node crashed. You could keep the Mx Size of Message Queue really small, possibly even set at 1, to avoid losing anything, but this may not work well for performance.
You really need an application level protocol that can send acknowledgements back to the sender when data is successfully written to a flow file, if the sender never receives an ack then it can re-send. The is a ListenRELP processor that works does this, it is just like ListenTCP but the RELP protocol allows for acknowledgements.
Created on 10-25-2016 01:48 PM
So the number of listeners is equal to the number of cluster nodes, even if we specify only one machine in the syslog agent that sends data to Nifi ?
Created on 10-25-2016 01:54 PM
Well there would be a listener on each cluster node, but it is up to you to route the data to each of those listeners if you want to use them all. If you have a cluster of 3 NiFi nodes, and you setup syslog to push data to node 1 then you are only using the listener on node 1 and the other two listeners aren't doing anything. You would need to have the syslog agent distribute the data to all 3 listeners, or you would need to put a load balancer in front of NiFi and have the syslog agent send to the load balancer and the load balancer would distribute to the 3 nodes.
Created on 08-31-2017 12:02 PM
Thanks for the explanation.
I have a usecase where syslog listener has to receive 10M messages/sec.
I am worried if this can be achieved? because the processing of those messages takes quite good amount of time.
Since I have to extract data out of each messages and store it to hdfs in csv, I use extracttext->replacetext->routeoncontent->puthdfs.
Can you suggest me if 10Million msg/sec will be achieved.
Created on 10-30-2017 03:41 AM
Thank you for useful article @Bryan Bende,
After reading your post, I confuse some points that HOW and WHERE to configure properties: Max Batch Size, Max Size of Message Queue, Max Size of Socket Buffer.
My flowfile: Data Producer --> JettyWebsocketServer --> ListenWebsocket --> Flowfile
So I don't know how and where to configure above properties. Where is Message Queue between Channel Reader and Listen Processor?
I just only configure JettyWebsocketServer(Input Buffer Size, Max Text Messsage Size) and ListenWebsocket has no properties relating to Queue Size.
Thanks,
Created on 10-30-2017 01:48 PM
Hello, this post is for ListenUDP, ListenTCP, ListenSyslog, and ListenRELP.
The ListenWebSocket processor is implemented differently and does not necessarily follow what is described here.
I'm not familiar with the websocket processor, but maybe others have experience with tuning it.