Created on 11-26-2016 12:53 AM
WebSocket is a fully duplex and bi-directional protocol. The most interesting characteristics (at least for me) is that there is no difference or roll such as server or client for each peer after those established a WebSocket connection. Each peer can send messages at will. However, a connection is always initiated by a WebSocket client.
For example:
I wondered, should NiFi be a client? or a server? Then decided to support both. NiFi can be not only a WebSocket client, but also a WebSocket server!
Since the protocol itself is more complex than (or simply different from) other protocols, it was tough to put it into NiFi data flow processing model. Some might feel it is too complex to use, so let me try to explain how it works.
As shown in the diagram above, it is divided into three modules described below, for extensibility and testability. Each of these are individual NAR. Both
nifi-websocket-processors-nar
and nifi-websocket-services-jetty-nar
have NAR dependency to nifi-websocket-services-api-nar
.
connected
: Fired when a remote client connects with WebSocketServerService, or a WebSocketClientService connects with a remote WebSocket server.text message
: Fired when it receives text WebSocket message.binary message
: Fired when it receives binary WebSocket message.ws://hostname:listen-port/endpoint-1
and ws://hostname:listen-port/endpoint-2
. Connected sessions are manages separately within each endpoints. Likewise, WebSocketClientService uses a clientId
to distinguish endpoints. Multiple WebSocket client instances can share the same WebSocketClientService instance.This module contains actual implementation of nifi-websocket-services-api using Jetty.
In order to use these functionalities in a NiFi data flow, we need to put it on a canvas as Processors.
ConnectWebSocket
and ListenWebSocket
: Work as WebSocket gateways. These processors are registered to WebSocketServer and receives WebSocket events described earlier. When those events are fired, it will be converted to NiFi FlowFile, then sent to relationships accordingly. There are three relationships, connected
, text message
and binary message
. ConnectWebSocket uses WebSocketClientService to actively connect to a remote WebSocket endpoint, while ListenWebSocket uses WebSocketServerService to wait passively for remote WebSocket clients to connect.PutWebSocket
: This processor can use with both ConnectWebSocket and ListenWebSocket, since there is no distinction after connection is made. It sends a WebSocket message using an incoming FlowFile content as message payload.Ok, enough descriptions, let’s see how can we use these component in NiFi data flow!
To use NiFi as a WebSocket client, we need a WebSocketClientService. To add the service:
Then, the service needs to be configured as follows:
Next, let’s setup the data flow using processors:
connected
and text message
are routed to ReplaceText. binary message
is terminated here because we don’t use it in this example.Run Schedule
longer than default like 3 sec, otherwise this ping-pong loop goes too fast like DoS attack…
By right click the success
relationship, the queued FlowFiles can be seen. Its file size is growing as ReplaceText prepend text each time.
Once you get the idea, setting up NiFi as a WebSocket server is easy, almost the same!
We just need to use JettyWebSocketServer
controller service instead, and set Listen Port
:
Then, replace the ConnectWebSocket processor with ListenWebSocket
processor, and specify the Server URL Path
that you want to receive WebSocket requests:
Then, open websocket.org echo from a web browser, set location as ws://localhost:9001/server-demo
, and click Connect, then Send. NiFi will echo back the message!
To use secure WebSocket connection, we need another controller service, StandardSSLContextService
. Then use it from JettyWebSocketClient or JettyWebSocketServer. The URL should use wss://
protocol for secure connection.
When NiFi is deployed as a cluster for scalability, we can run these WebSocket component on every node. To distribute loads when you use NiFi as WebSocket server, you will need a Load Balancer such as HAProxy in front of NiFi cluster. Please also refer the previous post, NiFi Cluster and Load Balancer.
In this post, I covered the basic usage of these WebSocket controller services and processors. Since the WebSocket gateway processors (ConnectWebSocket/ListenWebSocket) and PutWebSocket can be used separately, we can design more complex and interesting data flow by putting more flow in between.
I hope this post will help you to get started with NiFi WebSocket support, and create some interesting data flows!
Thanks for reading!
Created on 11-28-2016 07:19 PM
This is exactly what I am looking for. Any idea on when Nifi 1.1.0 is available for download or Is there a workaround till the release ?
Created on 12-27-2016 05:15 PM
Created on 04-24-2017 05:38 PM
Thanks a lot for the great tutorial.
How could this be extended to not only listen to a web socket, but rather periodically send control commands like: https://blockchain.info/api/api_websocket for example `{"op":"unconfirmed_sub"}`?