Member since
04-05-2016
130
Posts
93
Kudos Received
29
Solutions
03-10-2017
12:47 AM
1 Kudo
How to connect GetKafka to Kafka through Stunnel Stunnel is a proxy that can make insecure network transmission secure by wrapping it with SSL. This article contains example and illustrations describing how it works and how to configure it. Most part of it is derived from this informative Git comment I wouldn't be able to set it up without this comment. Thank you for sharing such detailed example.
How it works? Let's see how it can be applied to NiFi GetKafka.
I used two servers for this experimentation. 0 and 1.server.aws.mine. A single Zookeeper and Kafka broker is running on 0.server.
A GetKafka NiFi processor in 1.server consumes messages through Stunnel: Kafka Broker joins the Kafka cluster and declares its address as 127.0.0.1:9092 . If Zookeeper is in different server (recommended) and you need to secure this connection via Stunnel as well, then you need to apply the same method as the one used between GetKafka and Zookeeper. GetKafka's Zookeeper Connection String is set to 127.0.0.1:2181 which is local Stunnel is listening to. Then the local Stunnel on 1.server proxies the request to 0.server:2181 over SSL. At 0.server, the request is proxied again by the Stunnel running on 0.server, then finally arrives at Zookeeper. Since the Kafka Broker running on 0.server declares its address as 127.0.0.1:9092 , GetKafka (Kafka client) sends request to 127.0.0.1:9092 , and the request eventually transferred to the Broker through Stunnel pair. Here is the relevant configurations in 1.server's stunnel.conf (entire file is available here😞 client = yes
[zookeeper]
accept = 127.0.0.1:2181
connect = 0.server.aws.mine:2181
[kafka]
accept = 127.0.0.1:9092
connect = 0.server.aws.mine:9092
And this is for 0.server (entire file is available here😞 client = no
[zookeeper]
accept = 0.server.aws.mine:2181
connect = 127.0.0.1:2181
[kafka]
accept = 0.server.aws.mine:9092
connect = 127.0.0.1:9092
Kafka server.properties: host.name=127.0.0.1
zookeeper.connect=127.0.0.1:2181
Zookeeper zookeeper.properties clientPort=2181
clientPortAddress=127.0.0.1
How to authorize client access? Each Stunnel server has to have its own pem file containing a private key and a certificate. Also, a CA certificate file (or directory) is also needed to authorize client access. I used tls-toolkit.sh that is available in NiFi toolkit, to generate required files. Toolkit can generate three files, keystore.jks , truststore.jks and nifi.properties for each server. Server's key and cert can be extracted from keystore.jks. To do so, convert keystore.jks into keystore.p12 file by following commands (credit goes to this Stackoverflow) : # It's not important which server to run the toolkit on.
$ ./bin/tls-toolkit.sh standalone -n [0-1].server.aws.mine -C 'CN=server,OU=mine'
# Password for keystore.jks can be found in generated nifi.properties 'nifi.security.keystorePasswd'.
$ keytool -importkeystore -srckeystore keystore.jks -destkeystore keystore.p12 -srcstoretype jks -deststoretype pkcs12
Then extract key and cert from the p12 file:
$ openssl pkcs12 -in keystore.p12 -nokeys -out cert.pem $ openssl pkcs12 -in keystore.p12 -nodes -nocerts -out key.pem
Concatenate key and cert to create stunnel.pem, and deploy stunnel.pem to servers:
$ cat key.pem cert.pem >> stunnel.pem I used cert.pem as the CAFile for Stunnel on 0.server. In stunnel.conf on 0.server, following settings are needed to enable client cert verification: verify = 3
CAFile = /etc/stunnel/certs
Refer Stunnel manual for further description on these configurations. I confirmed that GetKafka running on 1.server can consume messages through Stunnel. If I used a cert which is not configured in the certs file on 0.server, GetKafka got timeout exception as follows: 2017-03-09 06:50:48,690 WARN [Timer-Driven Process Thread-5] o.apache.nifi.processors.kafka.GetKafka GetKafka[id=b0a21b5d-015a-1000-fbba-2648095ae625] Executor did not stop in 30 sec. Terminated.
2017-03-09 06:50:48,691 WARN [Timer-Driven Process Thread-5] o.apache.nifi.processors.kafka.GetKafka GetKafka[id=b0a21b5d-015a-1000-fbba-2648095ae625] Timed out after 60000 milliseconds while waiting to get connection
java.util.concurrent.TimeoutException: null
at java.util.concurrent.FutureTask.get(FutureTask.java:205) [na:1.8.0_121]
at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) ~[nifi-kafka-0-8-processors-1.1.2.jar:1.1.2]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) [nifi-api-1.1.2.jar:1.1.2]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) [nifi-framework-core-1.1.2.jar:1.1.2]
Stunnel commands # Install
sudo yum -y install stunnel
# Edit config
sudo vi /etc/stunnel/stunnel.conf
# Start
sudo stunnel
# Stop
sudo kill `cat /var/run/stunnel.pid`
Conclusion Although Stunnel works with GetKafka and Kafka 0.8.x, I recommend to use newer version of Kafka and ConsumeKafka NiFi processor with SSL if possible. As it's written in the Git comment, this workaround is not scalable (in terms of required administration tasks) and complicated.
... View more
Labels:
02-09-2017
05:50 AM
Forgot to mention that, instead of using ListenX, if you can deploy NiFi or use NiFi Site-to-Site client library in your client program, you don't have to deploy additional stuff to distribute load among remote NiFi cluster nodes. Site-to-Site client knows which nodes exist in a remote cluster, then automatically distribute data transmission.
... View more
02-09-2017
05:45 AM
Hi @Raj B, ListenX processors can be run on every node in a NiFi cluster, but a client that sends a request has to specify a host to send to. Zookeeper doesn't provide any help to do that. If you write a custom client that retrieves NiFi cluster topology and round-robin requests among nodes at client side, it's doable. Otherwise, a DNS server or reverse proxy such as HAProxy is needed in the middle of a client and NiFi nodes. Thanks!
... View more
11-26-2016
12:53 AM
4 Kudos
Basics of WebSocket protocol
WebSocket is a fully duplex and bi-directional protocol. The most interesting characteristics (at least for me) is that there is no difference or roll such as server or client for each peer after those established a WebSocket connection. Each peer can send messages at will. However, a connection is always initiated by a WebSocket client.
For example:
A client sends a HTTP request to a server (URL e.g. ws://example.com/web-socket).
The server accepts the request and upgrade HTTP protocol to WebSocket protocol.
(at this point, each peer can send or receive messages asynchronously)
The client can send a message to the server
The server receives it
The server can send a message to the client
The client receives it
I wondered, should NiFi be a client? or a server? Then decided to support both. NiFi can be not only a WebSocket client, but also a WebSocket server!
How it works? Modules
Since the protocol itself is more complex than (or simply different from) other protocols, it was tough to put it into NiFi data flow processing model. Some might feel it is too complex to use, so let me try to explain how it works.
As shown in the diagram above, it is divided into three modules described below, for extensibility and testability. Each of these are individual NAR. Both
nifi-websocket-processors-nar and nifi-websocket-services-jetty-nar have NAR dependency to nifi-websocket-services-api-nar .
nifi-websocket-services-api
WebSocketClientService: An Interface acts as a WebSocket client.
WebSocketServerService: An Interface acts as a WebSocket server.
Features:
WebSocket events: Both Client and Server services adopt event-driven processing so that WebSocket processors can react to.
connected : Fired when a remote client connects with WebSocketServerService, or a WebSocketClientService connects with a remote WebSocket server.
text message : Fired when it receives text WebSocket message.
binary message : Fired when it receives binary WebSocket message.
Send message: Provides methods to send text and binary WebSocket message to a connected remote peer.
Multiple endpoints: It registers processors to endpoints. WebSocketServerService uses an URI path as an endpoint. For example, the same WebSocket server instance can manage two WebSocket endpoints, such as ws://hostname:listen-port/endpoint-1 and ws://hostname:listen-port/endpoint-2 . Connected sessions are manages separately within each endpoints. Likewise, WebSocketClientService uses a clientId to distinguish endpoints. Multiple WebSocket client instances can share the same WebSocketClientService instance.
nifi-websocket-services-jetty
This module contains actual implementation of nifi-websocket-services-api using Jetty.
Features:
Plain WebSocket (ws://), and Secure WebSocket (wss://) protocols are supported.
Uses SSLContextService to refer Java keystore and truststore for secure communication.
nifi-websocket-processors
In order to use these functionalities in a NiFi data flow, we need to put it on a canvas as Processors.
ConnectWebSocket and ListenWebSocket : Work as WebSocket gateways. These processors are registered to WebSocketServer and receives WebSocket events described earlier. When those events are fired, it will be converted to NiFi FlowFile, then sent to relationships accordingly. There are three relationships, connected , text message and binary message . ConnectWebSocket uses WebSocketClientService to actively connect to a remote WebSocket endpoint, while ListenWebSocket uses WebSocketServerService to wait passively for remote WebSocket clients to connect.
PutWebSocket : This processor can use with both ConnectWebSocket and ListenWebSocket, since there is no distinction after connection is made. It sends a WebSocket message using an incoming FlowFile content as message payload.
How can I use? Use Cases
Ok, enough descriptions, let’s see how can we use these component in NiFi data flow!
NiFi as a client to talk with a remote WebSocket server
To use NiFi as a WebSocket client, we need a WebSocketClientService. To add the service:
Click the gear icon on Operate palette
Click the plus sign
Enter ‘WebSocket’ tag to search the ControllerService
Click the edit icon of the JettyWebSocketClient controller service
Then, the service needs to be configured as follows:
Set ws://echo.websocket.org to WebSocket URI. This URI is publicly available to test WebSocket client. It simply echoes back the message it receives.
Click the enable icon, and the service is ready!
Next, let’s setup the data flow using processors:
ConnectWebSocket: Uses the JettyWebSocketClientService added earlier. connected and text message are routed to ReplaceText. binary message is terminated here because we don’t use it in this example.
ReplaceText: Add some prefix to update text content
PutWebSocket: This processor sends messages to the remote WebSocket server. Don’t forget to set Run Schedule longer than default like 3 sec, otherwise this ping-pong loop goes too fast like DoS attack…
UpdateAttribute: This is the end of data flow, and keep it stopped so that we can accumulate the FlowFiles in the relationship to check the contents
By right click the success relationship, the queued FlowFiles can be seen. Its file size is growing as ReplaceText prepend text each time.
NiFi as a server to talk with a remote WebSocket client
Once you get the idea, setting up NiFi as a WebSocket server is easy, almost the same!
We just need to use JettyWebSocketServer controller service instead, and set Listen Port :
Then, replace the ConnectWebSocket processor with ListenWebSocket processor, and specify the Server URL Path that you want to receive WebSocket requests:
Then, open websocket.org echo from a web browser, set location as ws://localhost:9001/server-demo , and click Connect, then Send. NiFi will echo back the message!
Secure WebSocket connection
To use secure WebSocket connection, we need another controller service, StandardSSLContextService . Then use it from JettyWebSocketClient or JettyWebSocketServer. The URL should use wss:// protocol for secure connection.
Scalability
When NiFi is deployed as a cluster for scalability, we can run these WebSocket component on every node. To distribute loads when you use NiFi as WebSocket server, you will need a Load Balancer such as HAProxy in front of NiFi cluster. Please also refer the previous post, NiFi Cluster and Load Balancer.
Summary
In this post, I covered the basic usage of these WebSocket controller services and processors. Since the WebSocket gateway processors (ConnectWebSocket/ListenWebSocket) and PutWebSocket can be used separately, we can design more complex and interesting data flow by putting more flow in between.
I hope this post will help you to get started with NiFi WebSocket support, and create some interesting data flows!
Thanks for reading!
... View more
Labels:
11-26-2016
12:53 AM
8 Kudos
Why do we need a Load Balancer for NiFi cluster?
The easiest way to start using NiFi is deploying it as a standalone NiFi instance. However, when you need more throughput, NiFi can form a cluster to distribute load among multiple NiFi nodes.
From 1.0, thanks to the Zero Master Clustering architecture, we can access NiFi Web UI via any node in a cluster. Although it’s more reliable, it can be unclear which endpoint to specify for things like Remote Process Group url, clients for processors like ListenTCP or ListenSyslog acting as servers.
NiFi Site-to-Site protocol is cluster-topology aware and automatically distributes load among nodes, but users may wonder which hostname to specify. You can point to any node in the cluster, but what if the node goes down?
If ListenXXX processors runs in a cluster, we could configure it with
On Primary Node scheduler to make it simply runs only on a single node (we used to specify a primary node). But it doesn’t scale, and will be a SPOF. If every node can receive incoming request, NiFi can be more powerful.
To address above concerns, adding a LB in front of your NiFi cluster would be a great solution.
Use Docker Compose to create an environment
There’re so many docker containers nowadays, so I started with searching docker hub. I was so glad to find following containers to form a NiFi environment looks like following diagram, these containers made my task way easier:
mkobit/nifi, I added startup script so that it automatically picks up container’s hostname and updates nifi.properties.
zookeeper, NiFi uses Zookeeper for cluster coordination.
dockercloud/haproxy, this docker image detect exposed ports on linked service containers, it’s really useful with docker-compose.
Exposed ports on NiFi Node
On a NiFi node container, following ports have to be accessible from other hosts (port numbers can be configurable, so it might be different from your environment). I exposed these at
nifi-node/Dockerfile. Among these ports, only 8080 and 9001 are the ones that facing external network, thus candidates to be accessed through a LB.
Port
Protocol
LB?
Memo
8080
HTTP
Yes
NiFi Web API (UI, REST, HTTP S2S)
8081
TCP
No
RAW S2S
8082
TCP
No
Cluster Node Protocol
9001
TCP
Yes
Misc. used by server type processors
Technically, RAW S2S is accessed from external network, but since Site-to-Site clients handle load-balancing, it’s unnecessary to add it under LB. Specify which ports are accessible via LB
As mentioned in dockercloud/haproxy document, it uses all exported ports on application containers as routing destination. So unless specifying the purpose of those ports, haproxy load balances incoming HTTP requests toward them. Default setting caused an odd behavior, that only 1/4 NiFi HTTP requests succeed (there’re four ports, 8080, 8081, 8082 and 9001, but only 8080 can accept the request).
I excluded port 8081 and 8082 to be used by LB, by setting
EXCLUDE_PORTS environment value for nifi-nodes docker-compose service.
Then, used
TCP_PORTS environment value to specify that 9001 is a TCP port, not for http protocol.
These settings allow haproxy to route http request to NiFi nodes 8080 port, and TCP connection to 9001 port. Scaling number of NiFi nodes
Once Docker compose file is setup correctly, scaling out number of NiFi node is easy as executing following single command line:
$ docker-compose scale nifi-nodes=2
Now I can see a two nodes cluster by accessing Docker host address from a web browser:
ListenTCP
Let’s set up a NiFi data flow to confirm whether incoming requests get distributed as expected. Added a Listen TCP, configured it to listen on port 9001, and execute following netcat command several times:
$ netcat 192.168.99.100 9001
a (input some characters and enter, these are sent to ListenTCP)
e
g
t
s
Then, look at the stats from NiFi UI…
Summary
There’s many stuff to learn, such as NiFi clustering, Zookeeper, Docker container, Docker compose and HAProxy. I struggled with setting it up correctly. But once it’s done, you can get a nice distributed testing environment up and running really quickly, and also, I expect these containers can be deployed on larger container pools for production use. Or of course, these stack can be deployed on cloud VM instances or physical on-premise servers without docker.
The docker compose sample project is available on Github. I’m planning to explorer deeper with SSL termination and other protocol such as WebSocket using this environment. See you next time!
... View more
Labels:
07-05-2016
07:26 PM
6 Kudos
There're already several articles describing how to setup NiFi to connect Kafka or HDFS. However, following scattered piece of documentations to complete Kerberizing HDP, and setting ACL of Zookeeper, Kafka, HDFS, and Kerberos was not an easy task for me.
So, the motivation of this article is to share rough but throughout operations needed to set it up right. You can elaborate each step by digging related documentation further. Example configuration and NiFi template are available, too.
Some of these steps may not be required, and some were forgotten to be here, but these were the steps to Kerberize my HDP sandbox VM:
Install by downloading the latest HDF
Start Kafka, stop maintenance mode.
Restart all affected services
Install a new MIT KDC [1] KDC
Enable Kerberos from Ambari
Proceed with Ambari Kerberos wizard
Check Pig failed to pass the test, but continued with Complete button anyway
Start services manually that didn't start automatically
Configured Kafka for Kerberos over Ambari [2]
Modify Kafka listeners from PLAINTEXT://localhost:6667 to PLAINTEXTSASL://localhost:6667 from Ambari
Enable `Kafka ranger plugin` in Ranger config, and Check `Enable Ranger for KAFKA` in Kafka config from Ambari [6]
Setup Ranger Kafka service [3] Don't know what password should be here. kafka/kafka passed connection test.
If a consumer has already connected to the same topic using same consumer group id, then other consumer using different sasl user can't connect using the same group id. Because a Znode is already created with ACL.
Setup NiFi to access Kerberized Kafka [4], watch out for the '"' when you copy and paste exampl
Setup NiFi to access Kerberized HDFS by setting `/etc/krb5.conf` as `nifi.kerberos.krb5.file` in nifi.properties.
Using only Ranger to manager access control is recommended [5]
- Setup NiFi Dataflow using PutKafka, GetKafka and PutHDFS
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_Security_Guide/content/_optional_install_a_new_mit_kdc.html
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_secure-kafka-ambari/content/ch_secure-kafka-overview.html
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/bk_Ranger_User_Guide/content/kafka_service.html
https://community.hortonworks.com/articles/28180/how-to-configure-hdf-12-to-send-to-and-get-data-fr.html?platform=hootsuite
http://hortonworks.com/blog/best-practices-in-hdfs-authorization-with-apache-ranger/
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_Security_Guide/content/kafka_plugin.html
... View more
Labels: