Member since
04-05-2016
130
Posts
93
Kudos Received
29
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4191 | 06-05-2018 01:00 AM | |
5546 | 04-10-2018 08:23 AM | |
6008 | 07-18-2017 02:16 AM | |
3230 | 07-11-2017 01:02 PM | |
3618 | 07-10-2017 02:10 AM |
01-11-2017
01:02 AM
2 Kudos
Hi Edgar, Currently, PutWebSocket processor needs an incoming flow file having a WebSocket session id attribute to tell the NiFi WebSocket server which connected client to send a message to. I have tested a flow explained in this Gist to pull messages from Kafka then send it back to multiple connected clients. https://gist.github.com/ijokarumawak/60b9ab2038ef906731ebf4c0eee97176 The flow does work, but it keeps flow files floating around in the loop and basically too difficult to setup. The list of connected session ids are kept in WebSocket server controller service, so, theoretically it's possible to loop through those sessions in controller service without passing a specific session id from PutWebSocket processor. Maybe we can improve PutWebSocket processor so that it can send messages to every connected client. I will create a JIRA for this improvement, in the meanwhile, please use the workaround like the above example flow. Thanks!
... View more
12-16-2016
07:45 AM
2 Kudos
Hello @Mohan V PutKafka is designed to work with Kafka version 0.8 series. If you're using Kafka 0.9, please use PublishKafka processor instead. Thanks, Koji
... View more
12-08-2016
02:18 AM
@Saikrishna Tarapareddy In that case, I'd use another command to list the sub folders first, then pass each sub folder to a zip command. NiFi flow looks like below. List Sub Folders (ExecuteProcess): I used find command here. find souce-files -type d -depth 1 The command produces a flow file containing sub folders each line. So, split those outputs, then use ExtractText to extract sub folder path to an attribute 'targetDir'. You need to add a dynamic property by clicking the plus sign, then name the property with an attribute name to extract the content to. The Value is a regular expression to extract desired text from the content. Used ExecuteStreamCommand this time, to use incoming flow files. - Command Path: zip - Command Arguments: -r;${targetDir}-${now():format('yyyyMMdd')}.zip;${targetDir} - Ignore STDIN: true Then it will zip each sub folders. Thanks, Koji
... View more
12-07-2016
12:12 AM
1 Kudo
Hello @Saikrishna Tarapareddy There's no Processor which compress a folder that I'm aware of. But you can do that by ExecuteProcess processor: This is an example ExecuteProcess configuration to use zip command. Command Arguments: -r /tmp/source-files-${now():format('yyyyMMdd')}.zip /tmp/source-files/ NiFi expression can be used in command arguments, above expression sets the zip filename using current date. In your use-case, I'd use this single processor scheduled with 'Cron Driven' Scheduling Strategy, in addition to the main flow which processes the files and send those to HDFS. Hope this helps. Thanks, Koji
... View more
12-06-2016
11:50 PM
1 Kudo
Hello @J.Thomas King You can reduce the waiting time by setting nifi.cluster.flow.election.max.wait.time or nifi.cluster.flow.election.max.candidates in nifi.properties. Please refer Clustering Configuration "Flow Election" section for detail. Thanks, Koji
... View more
12-05-2016
06:44 AM
@Avijeet Dash I was able to start a process group with following cURL command: curl -i -H 'Content-Type: application/json' -XPUT -d '{"id":"cdb54c9a-0158-1000-5566-c45ca9692f85","state":"RUNNING"}' localhost:8080/nifi-api/flow/process-groups/cdb54c9a-0158-1000-5566-c45ca9692f85 Is there any difference with your request? I think GET response doesn't contain 'state' element because a process group doesn't have its own state, just a container of child processors.
... View more
12-05-2016
06:34 AM
3 Kudos
Hi @Avijeet Dash Yes, you can do the same to process groups. Please refer NiFi Rest Api PUT /flow/processor-groups/{id} endpoint. You can start a process group by sending PUT request to the endpoint a JSON like this: {"id":"cda85e6e-0158-1000-150a-cb88f801e860","state":"RUNNING"} Hope this helps, Koji
... View more
11-26-2016
12:53 AM
4 Kudos
Basics of WebSocket protocol
WebSocket is a fully duplex and bi-directional protocol. The most interesting characteristics (at least for me) is that there is no difference or roll such as server or client for each peer after those established a WebSocket connection. Each peer can send messages at will. However, a connection is always initiated by a WebSocket client.
For example:
A client sends a HTTP request to a server (URL e.g. ws://example.com/web-socket).
The server accepts the request and upgrade HTTP protocol to WebSocket protocol.
(at this point, each peer can send or receive messages asynchronously)
The client can send a message to the server
The server receives it
The server can send a message to the client
The client receives it
I wondered, should NiFi be a client? or a server? Then decided to support both. NiFi can be not only a WebSocket client, but also a WebSocket server!
How it works? Modules
Since the protocol itself is more complex than (or simply different from) other protocols, it was tough to put it into NiFi data flow processing model. Some might feel it is too complex to use, so let me try to explain how it works.
As shown in the diagram above, it is divided into three modules described below, for extensibility and testability. Each of these are individual NAR. Both
nifi-websocket-processors-nar and nifi-websocket-services-jetty-nar have NAR dependency to nifi-websocket-services-api-nar .
nifi-websocket-services-api
WebSocketClientService: An Interface acts as a WebSocket client.
WebSocketServerService: An Interface acts as a WebSocket server.
Features:
WebSocket events: Both Client and Server services adopt event-driven processing so that WebSocket processors can react to.
connected : Fired when a remote client connects with WebSocketServerService, or a WebSocketClientService connects with a remote WebSocket server.
text message : Fired when it receives text WebSocket message.
binary message : Fired when it receives binary WebSocket message.
Send message: Provides methods to send text and binary WebSocket message to a connected remote peer.
Multiple endpoints: It registers processors to endpoints. WebSocketServerService uses an URI path as an endpoint. For example, the same WebSocket server instance can manage two WebSocket endpoints, such as ws://hostname:listen-port/endpoint-1 and ws://hostname:listen-port/endpoint-2 . Connected sessions are manages separately within each endpoints. Likewise, WebSocketClientService uses a clientId to distinguish endpoints. Multiple WebSocket client instances can share the same WebSocketClientService instance.
nifi-websocket-services-jetty
This module contains actual implementation of nifi-websocket-services-api using Jetty.
Features:
Plain WebSocket (ws://), and Secure WebSocket (wss://) protocols are supported.
Uses SSLContextService to refer Java keystore and truststore for secure communication.
nifi-websocket-processors
In order to use these functionalities in a NiFi data flow, we need to put it on a canvas as Processors.
ConnectWebSocket and ListenWebSocket : Work as WebSocket gateways. These processors are registered to WebSocketServer and receives WebSocket events described earlier. When those events are fired, it will be converted to NiFi FlowFile, then sent to relationships accordingly. There are three relationships, connected , text message and binary message . ConnectWebSocket uses WebSocketClientService to actively connect to a remote WebSocket endpoint, while ListenWebSocket uses WebSocketServerService to wait passively for remote WebSocket clients to connect.
PutWebSocket : This processor can use with both ConnectWebSocket and ListenWebSocket, since there is no distinction after connection is made. It sends a WebSocket message using an incoming FlowFile content as message payload.
How can I use? Use Cases
Ok, enough descriptions, let’s see how can we use these component in NiFi data flow!
NiFi as a client to talk with a remote WebSocket server
To use NiFi as a WebSocket client, we need a WebSocketClientService. To add the service:
Click the gear icon on Operate palette
Click the plus sign
Enter ‘WebSocket’ tag to search the ControllerService
Click the edit icon of the JettyWebSocketClient controller service
Then, the service needs to be configured as follows:
Set ws://echo.websocket.org to WebSocket URI. This URI is publicly available to test WebSocket client. It simply echoes back the message it receives.
Click the enable icon, and the service is ready!
Next, let’s setup the data flow using processors:
ConnectWebSocket: Uses the JettyWebSocketClientService added earlier. connected and text message are routed to ReplaceText. binary message is terminated here because we don’t use it in this example.
ReplaceText: Add some prefix to update text content
PutWebSocket: This processor sends messages to the remote WebSocket server. Don’t forget to set Run Schedule longer than default like 3 sec, otherwise this ping-pong loop goes too fast like DoS attack…
UpdateAttribute: This is the end of data flow, and keep it stopped so that we can accumulate the FlowFiles in the relationship to check the contents
By right click the success relationship, the queued FlowFiles can be seen. Its file size is growing as ReplaceText prepend text each time.
NiFi as a server to talk with a remote WebSocket client
Once you get the idea, setting up NiFi as a WebSocket server is easy, almost the same!
We just need to use JettyWebSocketServer controller service instead, and set Listen Port :
Then, replace the ConnectWebSocket processor with ListenWebSocket processor, and specify the Server URL Path that you want to receive WebSocket requests:
Then, open websocket.org echo from a web browser, set location as ws://localhost:9001/server-demo , and click Connect, then Send. NiFi will echo back the message!
Secure WebSocket connection
To use secure WebSocket connection, we need another controller service, StandardSSLContextService . Then use it from JettyWebSocketClient or JettyWebSocketServer. The URL should use wss:// protocol for secure connection.
Scalability
When NiFi is deployed as a cluster for scalability, we can run these WebSocket component on every node. To distribute loads when you use NiFi as WebSocket server, you will need a Load Balancer such as HAProxy in front of NiFi cluster. Please also refer the previous post, NiFi Cluster and Load Balancer.
Summary
In this post, I covered the basic usage of these WebSocket controller services and processors. Since the WebSocket gateway processors (ConnectWebSocket/ListenWebSocket) and PutWebSocket can be used separately, we can design more complex and interesting data flow by putting more flow in between.
I hope this post will help you to get started with NiFi WebSocket support, and create some interesting data flows!
Thanks for reading!
... View more
Labels:
11-26-2016
12:53 AM
8 Kudos
Why do we need a Load Balancer for NiFi cluster?
The easiest way to start using NiFi is deploying it as a standalone NiFi instance. However, when you need more throughput, NiFi can form a cluster to distribute load among multiple NiFi nodes.
From 1.0, thanks to the Zero Master Clustering architecture, we can access NiFi Web UI via any node in a cluster. Although it’s more reliable, it can be unclear which endpoint to specify for things like Remote Process Group url, clients for processors like ListenTCP or ListenSyslog acting as servers.
NiFi Site-to-Site protocol is cluster-topology aware and automatically distributes load among nodes, but users may wonder which hostname to specify. You can point to any node in the cluster, but what if the node goes down?
If ListenXXX processors runs in a cluster, we could configure it with
On Primary Node scheduler to make it simply runs only on a single node (we used to specify a primary node). But it doesn’t scale, and will be a SPOF. If every node can receive incoming request, NiFi can be more powerful.
To address above concerns, adding a LB in front of your NiFi cluster would be a great solution.
Use Docker Compose to create an environment
There’re so many docker containers nowadays, so I started with searching docker hub. I was so glad to find following containers to form a NiFi environment looks like following diagram, these containers made my task way easier:
mkobit/nifi, I added startup script so that it automatically picks up container’s hostname and updates nifi.properties.
zookeeper, NiFi uses Zookeeper for cluster coordination.
dockercloud/haproxy, this docker image detect exposed ports on linked service containers, it’s really useful with docker-compose.
Exposed ports on NiFi Node
On a NiFi node container, following ports have to be accessible from other hosts (port numbers can be configurable, so it might be different from your environment). I exposed these at
nifi-node/Dockerfile. Among these ports, only 8080 and 9001 are the ones that facing external network, thus candidates to be accessed through a LB.
Port
Protocol
LB?
Memo
8080
HTTP
Yes
NiFi Web API (UI, REST, HTTP S2S)
8081
TCP
No
RAW S2S
8082
TCP
No
Cluster Node Protocol
9001
TCP
Yes
Misc. used by server type processors
Technically, RAW S2S is accessed from external network, but since Site-to-Site clients handle load-balancing, it’s unnecessary to add it under LB. Specify which ports are accessible via LB
As mentioned in dockercloud/haproxy document, it uses all exported ports on application containers as routing destination. So unless specifying the purpose of those ports, haproxy load balances incoming HTTP requests toward them. Default setting caused an odd behavior, that only 1/4 NiFi HTTP requests succeed (there’re four ports, 8080, 8081, 8082 and 9001, but only 8080 can accept the request).
I excluded port 8081 and 8082 to be used by LB, by setting
EXCLUDE_PORTS environment value for nifi-nodes docker-compose service.
Then, used
TCP_PORTS environment value to specify that 9001 is a TCP port, not for http protocol.
These settings allow haproxy to route http request to NiFi nodes 8080 port, and TCP connection to 9001 port. Scaling number of NiFi nodes
Once Docker compose file is setup correctly, scaling out number of NiFi node is easy as executing following single command line:
$ docker-compose scale nifi-nodes=2
Now I can see a two nodes cluster by accessing Docker host address from a web browser:
ListenTCP
Let’s set up a NiFi data flow to confirm whether incoming requests get distributed as expected. Added a Listen TCP, configured it to listen on port 9001, and execute following netcat command several times:
$ netcat 192.168.99.100 9001
a (input some characters and enter, these are sent to ListenTCP)
e
g
t
s
Then, look at the stats from NiFi UI…
Summary
There’s many stuff to learn, such as NiFi clustering, Zookeeper, Docker container, Docker compose and HAProxy. I struggled with setting it up correctly. But once it’s done, you can get a nice distributed testing environment up and running really quickly, and also, I expect these containers can be deployed on larger container pools for production use. Or of course, these stack can be deployed on cloud VM instances or physical on-premise servers without docker.
The docker compose sample project is available on Github. I’m planning to explorer deeper with SSL termination and other protocol such as WebSocket using this environment. See you next time!
... View more
Labels:
11-25-2016
11:57 AM
Hi @mayki wogno The first error message was also written by the same error with the second error message. The processor reported the error twice, because it logged an error message when the ListHDFS processor caught the exception, then re-throw it, and NiFi framework caught the exception and logged another error message. When NiFi framework catches an exception thrown by a processor, it yields the processor for the amount of time specified by 'Yield Duration'. Once the processor successfully accesses core-site.xml and hdfs-site.xml, both error messages will be cleared.
... View more