Created 02-11-2025 12:40 PM
Trying to demonstrate that nifi can listen to multicast traffic.
I have this python script:
I can receive the message if I change the multicast_group to 127.0.0.1.
Created on 02-12-2025 10:24 AM - edited 02-12-2025 10:58 AM
The ListenUDP processor will listen on a specific port and can be configured to listen on a specific network interface on the NiFi host.
The question is what network interface on the NiFi host is assigned and address associated with your multicast 224.1.1.1 ip address?
You can't bind any NiFi processor to a specific IP address. NiFi supports multi-node clusters where every node runs the exact same dataflows.
Lets assume your multicast group is sending datagrams to port 10000 on the eth3 network interface on your NiFi host, then you would configure the listenUDP to create a listener using port 10000 for eth3.
The Site to Site properties from the nifi.properties have absolutely nothing to do with any processor components you add to the NiFi UI. Site-to-Site (S2S) NiFi protocol is used for the transfer of NiFi FlowFiles between NiFi nodes. This S2S protocol is utilized by Remote Process Groups and Load Balanced connections capability only.
You'll want to unset 10000 in the S2S settings since you want your listenUDP processor to bind to that port number. You'll also want to unset 224.1.1.1 as the S2S input host. S2S can't bind to host that does not resolve to one of the NiFi network interfaces.
S2S reference documentation:
Please help our community grow and grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 02-12-2025 11:51 AM
hey @MattWho My nifi instance and little python script run on local host.
My python script sends a packet as ip address 224.1.1.1 on port 10000.
When i configure my processor, omitting the sending host and sending port, it still doesn't receive the packet from 224.1.1.1.
If i omit the send host and port, and just have 10000 for port. It also doesn't work, since that assumes the packet is being sent from 127.0.0.1 (localhost).
I removed the site to site properties like you recommended.
Essentially i am just trying to send a multicast message to nifi, and trying to figure out how to configure the current processor to do that. Or I will just try and create a custom udp listener to handle multicast messages.
Thanks!
Created 03-06-2025 02:22 PM
I wanted to give an update on my findings, finally able to get multicast to work. I also had to comment out this line: datagramChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
@Override
public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException {
stopped = false;
// Use INET protocol family for IPv4 multicast
if (isMulticast) {
datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
} else {
datagramChannel = DatagramChannel.open();
}
datagramChannel.configureBlocking(false);
if (maxBufferSize > 0) {
datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
if (actualReceiveBufSize < maxBufferSize) {
logger.warn("Attempted to set Socket Buffer Size to {} bytes but could only set to {} bytes. You may want to consider changing the Operating System's maximum receive buffer",
maxBufferSize, actualReceiveBufSize);
}
}
// we don't have to worry about nicAddress being null here because InetSocketAddress already handles it
datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
datagramChannel.socket().bind(new InetSocketAddress(port));
// if a sending host and port were provided then connect to that specific address to only receive
// datagrams from that host/port, otherwise we can receive datagrams from any host/port
if (sendingHost != null && sendingPort != null) {
// This seems to not allow the multicast to work. Therefore, I commented it out.
// datagramChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
}
selector = Selector.open();
datagramChannel.register(selector, SelectionKey.OP_READ);
// Join multicast group if specified
if (isMulticast) {
InetAddress group = InetAddress.getByName(multicastGroup);
// Determine which network interface to use for multicast
NetworkInterface networkInterface;
if (multicastInterface != null && !multicastInterface.isEmpty()) {
// Use specified interface
networkInterface = NetworkInterface.getByName(multicastInterface);
if (networkInterface == null) {
// Try as an IP address
networkInterface = NetworkInterface.getByInetAddress(InetAddress.getByName(multicastInterface));
}
} else {
// Use the NIC address interface if none specified
networkInterface = NetworkInterface.getByInetAddress(nicAddress);
}
if (networkInterface == null) {
throw new IOException("Could not find network interface for multicast");
}
// Join the multicast group on the selected interface
membershipKey = datagramChannel.join(group, networkInterface);
logger.info("Joined multicast group {} on interface {}", multicastGroup, networkInterface.getDisplayName());
}
}
Created 03-06-2025 02:26 PM
I am now running into another problem.
Say i want to have 5 different sources getting fed into 5 different ListenUDP and going to PutUDP.
1 works fine.
As soon as i add 2 or more, the source starts to lose packets.
E.g
ListenUDP -> PutUDP (works fine)
_________________
ListenUDP -> PutUDP
ListenUDP -> PutUDP
ListenUDP -> PutUDP
ListenUDP -> PutUDP
This scenario does not work. Say for a video feed, the video feed becomes very blurry.
If i increase the size of the batch size from 1 to 30, this problem is mostly fixed, but i still see some packets being dropped.
Anyway of resolving this?
Created 03-07-2025 06:07 AM
@Darryl
Glad to hear you got your network setting worked out so that your multicast traffic was reaching the network interfaces where the ListenUDP processor was listening for the traffic.
UDP protocol has no guaranteed delivery. It is fast but is not a fault tolerant.
Are you seeing the downstream connection from ListenUDP processor filling to the point back pressure is being applied to ListenUDP processor (by default back pressure gets applied once a connection queue hits 10,000 FlowFiles)?
Effectively how the putUDP processor works is that incoming messages are added to an internal message queue (default "Max Size of Message Queue" is 10000). The processor then reads messages from the message queue and created FlowFiles based on the scheduling interval, thread availability, and "Max Batch Size" setting. If the Message Queue is full UDP packets will get dropped/lost.
Things you can try:
To efficiently set concurrent tasks on processors, it is important to understand how they work, where the processor gets the threads from, and load on your system.
Please help our community grow and thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 03-07-2025 06:58 AM
Thank you for the very detailed reply!
Just some quick notes.
When i bump up the batch size to 50, it causes an error with the PutUDP.
"Caused by: java.io.IOException:
A message sent on a datagram socket was larger than the internal message buffer or some other network limit,
or the buffer used to receive a datagram into was smaller than the datagram itself"
When i tried bumping up the concurrent threads from 1 to 2, it caused the video to be extremely blurry.
And this is what i am using to test.
ffmpeg.exe -stream_loop -10000 -re -i %FILE% -map 0:0 -map 0:1 -c copy -f mpegts udp://224.1.1.2:10000
And then PutUDP ouputs to this: ffplay -i udp://localhost:8090
You can test most of this by testing:
ListenUDP -> PutUDP
and then blurriness and packets drop occur when doing just 2:
ListenUDP -> PutUDP
ListenUDP -> PutUDP
Created 03-11-2025 06:16 AM
@Darryl
So the downstream system that putUDP is sending to is complaining the size of the datagram is too large when batch size is set to 50. When using batching in ListenUDP processor each datagram is delimited by a new line character. You could add a processor like splitText between ListenUDP and PutUDP to split these FlowFiles into smaller FlowFiles before sending to putUDP. Since a batch size of 30 seemed to work well for you, I would try increasing Batch setting in ListenUDP to 60 and set the "lineSplit Count" in Split text to 30.
As far as "When i tried bumping up the concurrent threads from 1 to 2, it caused the video to be extremely blurry", I am guessing maybe the multi-threading is resulting the packets out of order resulting in your extremely blurry video? If that is the case, you'll need to keep your flow single treaded. And if order of processing FlowFiles is important downstream connections should be configured with FirstInFirstOutPrioritizer prioritizer. This does not mean you can then use multiple threads, but makes sure downstream processors take FlowFiles from upstream connections in this order.
Please help our community grow and thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt