Support Questions

Find answers, ask questions, and share your expertise

Apache nifi listen to multicast messages without separator characters

avatar
Explorer

Trying to demonstrate that nifi can listen to multicast traffic.

I have this python script:

import socket
import struct
import sys

# Multicast group details
multicast_group = '224.1.1.1'
server_address = (''10000)

# Create a UDP socket
sock = socket.socket(socket.AF_INETsocket.SOCK_DGRAM)

# Set the time-to-live for multicast packets
ttl = struct.pack('b'1)
sock.setsockopt(socket.IPPROTO_IPsocket.IP_MULTICAST_TTLttl)

try:
    # Send data to the multicast group
    message = b'Hello, multicast!'
    print(f"sending {message} to {multicast_group}:{server_address[1]}")
    sent = sock.sendto(message, (multicast_groupserver_address[1]))

except Exception as e:
    print(f"Error occurred: {e}")

finally:
    sock.close()

And then my listenudp processor is: 
Darryl_0-1739306340674.png

 

I can receive the message if I change the multicast_group to 127.0.0.1. 
How do I get it to receive it with 224.1.1.1.
 
I also added this to my nifi properties
# Site to Site properties
nifi.remote.input.host=224.1.1.1
nifi.remote.input.secure=true
nifi.remote.input.socket.port=10000
 
7 REPLIES 7

avatar
Master Mentor

@Darryl 

The ListenUDP processor will listen on a specific port and can be configured to listen on a specific network interface on the NiFi host.

The question is what network interface on the NiFi host is assigned and address associated with your multicast 224.1.1.1 ip address?

You can't bind any NiFi processor to a specific IP address. NiFi supports multi-node clusters where every node runs the exact same dataflows. 

Lets assume your multicast group is sending datagrams to port 10000 on the eth3 network interface on your NiFi host, then you would configure the listenUDP to create a listener using port 10000 for eth3.

The Site to Site properties from the nifi.properties have absolutely nothing to do with any processor components you add to the NiFi UI.  Site-to-Site (S2S) NiFi protocol is used for the transfer of NiFi FlowFiles between NiFi nodes.   This S2S protocol is utilized by Remote Process Groups and Load Balanced connections capability only.  

You'll want to unset 10000 in the S2S settings since you want your listenUDP processor to bind to that port number.  You'll also want to unset 224.1.1.1 as the S2S input host.  S2S can't bind to host that does not resolve to one of the NiFi network interfaces. 

S2S reference documentation:

Please help our community grow and grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Explorer

hey @MattWho My nifi instance and little python script run on local host.
My python script sends a packet as ip address 224.1.1.1 on port 10000.
When i configure my processor, omitting the sending host and sending port, it still doesn't receive the packet from 224.1.1.1.

If i omit the send host and port, and just have 10000 for port. It also doesn't work, since that assumes the packet is being sent from 127.0.0.1 (localhost).

I removed the site to site properties like you recommended.
Essentially i am just trying to send a multicast message to nifi, and trying to figure out how to configure the current processor to do that. Or I will just try and create a custom udp listener to handle multicast messages.


Thanks!

avatar
Explorer

I wanted to give an update on my findings, finally able to get multicast to work. I also had to comment out this line: datagramChannel.connect(new InetSocketAddress(sendingHost, sendingPort));

@Override

public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException {

stopped = false;

 

// Use INET protocol family for IPv4 multicast

if (isMulticast) {

datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);

} else {

datagramChannel = DatagramChannel.open();

}

 

datagramChannel.configureBlocking(false);

if (maxBufferSize > 0) {

datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);

final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);

if (actualReceiveBufSize < maxBufferSize) {

logger.warn("Attempted to set Socket Buffer Size to {} bytes but could only set to {} bytes. You may want to consider changing the Operating System's maximum receive buffer",

maxBufferSize, actualReceiveBufSize);

}

}

// we don't have to worry about nicAddress being null here because InetSocketAddress already handles it

datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);

datagramChannel.socket().bind(new InetSocketAddress(port));

// if a sending host and port were provided then connect to that specific address to only receive

// datagrams from that host/port, otherwise we can receive datagrams from any host/port

if (sendingHost != null && sendingPort != null) {

// This seems to not allow the multicast to work. Therefore, I commented it out.

// datagramChannel.connect(new InetSocketAddress(sendingHost, sendingPort));

}

 

selector = Selector.open();

datagramChannel.register(selector, SelectionKey.OP_READ);

 

// Join multicast group if specified

if (isMulticast) {

InetAddress group = InetAddress.getByName(multicastGroup);

 

// Determine which network interface to use for multicast

NetworkInterface networkInterface;

if (multicastInterface != null && !multicastInterface.isEmpty()) {

// Use specified interface

networkInterface = NetworkInterface.getByName(multicastInterface);

if (networkInterface == null) {

// Try as an IP address

networkInterface = NetworkInterface.getByInetAddress(InetAddress.getByName(multicastInterface));

}

} else {

// Use the NIC address interface if none specified

networkInterface = NetworkInterface.getByInetAddress(nicAddress);

}

 

if (networkInterface == null) {

throw new IOException("Could not find network interface for multicast");

}

 

// Join the multicast group on the selected interface

membershipKey = datagramChannel.join(group, networkInterface);

logger.info("Joined multicast group {} on interface {}", multicastGroup, networkInterface.getDisplayName());

}

}

avatar
Explorer

I am now running into another problem.
Say i want to have 5 different sources getting fed into 5 different ListenUDP and going to PutUDP.

1 works fine.
As soon as i add 2 or more, the source starts to lose packets.

E.g
ListenUDP -> PutUDP (works fine)
_________________

ListenUDP -> PutUDP
ListenUDP -> PutUDP
ListenUDP -> PutUDP
ListenUDP -> PutUDP

This scenario does not work. Say for a video feed, the video feed becomes very blurry.
If i increase the size of the batch size from 1 to 30, this problem is mostly fixed, but i still see some packets being dropped.

Anyway of resolving this?

@MattWho 

avatar
Master Mentor

@Darryl 

Glad to hear you got your network setting worked out so that your multicast traffic was reaching the network interfaces where the ListenUDP processor was listening for the traffic.

UDP protocol has no guaranteed delivery.  It is fast but is not a fault tolerant.
Are you seeing the downstream connection from ListenUDP processor filling to the point back pressure is being applied to ListenUDP processor (by default back pressure gets applied once a connection queue hits 10,000 FlowFiles)?  

Effectively how the putUDP processor works is that incoming messages are added to an internal message queue (default "Max Size of Message Queue" is 10000).  The processor then reads messages from the message queue and created FlowFiles based on the scheduling interval, thread availability, and "Max Batch Size" setting.   If the Message Queue is full UDP packets will get dropped/lost.

Things you can try:

  1. Make adjustments to avoid downstream backpressure being applied to listenUDP.  Increasing backpressure threshold is not very helpful unless you have set you batch size very high.  You'll want to look at ways to increase the rate at which downstream NiFi processor can process incoming FlowFiles.  If CPU load average on your NiFi server is healthy, you can increase concurrent tasks on the downstream processor so you have multiple concurrent threads processing FlowFiles concurrently.
  2. You already adjusted the batch size to 30 which results in multiple datagrams being added to a single NiFi FlowFile and it sounds like this change did not impact downstream processing.  By adding more then one message to a single FlowFile, NiFi needs to create fewer FlowFiles with each thread execution that reads from the message queue.  You could play around with increasing this to a even larger value.
  3.  You can play around with Run duration.  This is something you would try on downstream processor processors that are not keeping up with their inbound connection resulting in upstream back pressure being triggered.  It creates some latency but allows a single thread to execute longer and work on multiple inbound FlowFiles in a single scheduled thread execution.
  4. You can play around with concurrent tasks setting on the UDP processor.  Adjustments to concurrent tasks on ListenUDP will allow more then 1 thread to execute concurrently to consume messages from the message queue to generate FlowFiles to the downstream connection.

To efficiently set concurrent tasks on processors, it is important to understand how they work, where the processor gets the threads from, and load on your system.  

  • NiFI sets a "Maximum Timer Driven Thread Count" value that create a pool of threads from which all NiFi processors will utilize for executing.  The default value is 10.    That means if you have 1000 processors on the canvas, they will be requesting to use one of these threads when they get scheduled.  The thread pool helps in preventing overloading the CPU resources on the NiFi host.
  • NiFi processors have a configurable "concurrent tasks" and "Run Schedule".  The Run Schedule control how often this processor should ask the NiFi controller for a thread to execute.  So assume 1 concurrent task and a run schedule of 0 secs.  This processor will get schedules as fast as possible.  Meaning as soon as it is started it will get scheduled and request a thread from the thread pool (There is a back of period if upon execution there is no work to do to prevent over CPU usage). So with 1 concurrent task the processor can't be scheduled again until the current task completes.   When you change concurrent task to 2, the processor will scheduled 1st time and immediately schedule again hoping to get 2 threads from the thread pool to execute concurrently. 
  • So assuming CPU load average is not high, you can increase the size of the max timer driven thread pool and increase concurrent tasks on yoru processors.  Always make small incremental changes and monitor impact on yoru CPU load average. 

Please help our community grow and thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Explorer

Thank you for the very detailed reply!
Just some quick notes. 
When i bump up the batch size to 50, it causes an error with the PutUDP.
"Caused by: java.io.IOException:
A message sent on a datagram socket was larger than the internal message buffer or some other network limit,
or the buffer used to receive a datagram into was smaller than the datagram itself"


When i tried bumping up the concurrent threads from 1 to 2, it caused the video to be extremely blurry.

And this is what i am using to test.

ffmpeg.exe -stream_loop -10000 -re -i %FILE% -map 0:0 -map 0:1 -c copy -f mpegts udp://224.1.1.2:10000

And then PutUDP ouputs to this: ffplay -i udp://localhost:8090

You can test most of this by testing:
ListenUDP -> PutUDP

and then blurriness and packets drop occur when doing just 2:
ListenUDP -> PutUDP
ListenUDP -> PutUDP



avatar
Master Mentor

@Darryl 

So the downstream system that putUDP is sending to is complaining the size of the datagram is too large when batch size is set to 50.   When using batching in ListenUDP processor each datagram is delimited by a new line character.  You could add a processor like splitText between ListenUDP and PutUDP to split these FlowFiles into smaller FlowFiles before sending to putUDP.  Since a batch size of 30 seemed to work well for you, I would try increasing Batch setting in ListenUDP to 60 and set the "lineSplit Count" in Split text to 30.

As far as "When i tried bumping up the concurrent threads from 1 to 2, it caused the video to be extremely blurry", I am guessing maybe the multi-threading is resulting the packets out of order resulting in your extremely blurry video?  If that is the case, you'll need to keep your flow single treaded.  And if order of processing FlowFiles is important downstream connections should be configured with FirstInFirstOutPrioritizer prioritizer.  This does not mean you can then use multiple threads, but makes sure downstream processors take FlowFiles from upstream connections in this order.

Please help our community grow and thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt