Member since
β04-05-2016
130
Posts
93
Kudos Received
29
Solutions
β03-10-2017
12:47 AM
1 Kudo
How to connect GetKafka to Kafka through Stunnel Stunnel is a proxy that can make insecure network transmission secure by wrapping it with SSL. This article contains example and illustrations describing how it works and how to configure it. Most part of it is derived from this informative Git comment I wouldn't be able to set it up without this comment. Thank you for sharing such detailed example.
How it works? Let's see how it can be applied to NiFi GetKafka.
I used two servers for this experimentation. 0 and 1.server.aws.mine. A single Zookeeper and Kafka broker is running on 0.server.
A GetKafka NiFi processor in 1.server consumes messages through Stunnel: Kafka Broker joins the Kafka cluster and declares its address as 127.0.0.1:9092 . If Zookeeper is in different server (recommended) and you need to secure this connection via Stunnel as well, then you need to apply the same method as the one used between GetKafka and Zookeeper. GetKafka's Zookeeper Connection String is set to 127.0.0.1:2181 which is local Stunnel is listening to. Then the local Stunnel on 1.server proxies the request to 0.server:2181 over SSL. At 0.server, the request is proxied again by the Stunnel running on 0.server, then finally arrives at Zookeeper. Since the Kafka Broker running on 0.server declares its address as 127.0.0.1:9092 , GetKafka (Kafka client) sends request to 127.0.0.1:9092 , and the request eventually transferred to the Broker through Stunnel pair. Here is the relevant configurations in 1.server's stunnel.conf (entire file is available hereπ client = yes
[zookeeper]
accept = 127.0.0.1:2181
connect = 0.server.aws.mine:2181
[kafka]
accept = 127.0.0.1:9092
connect = 0.server.aws.mine:9092
And this is for 0.server (entire file is available hereπ client = no
[zookeeper]
accept = 0.server.aws.mine:2181
connect = 127.0.0.1:2181
[kafka]
accept = 0.server.aws.mine:9092
connect = 127.0.0.1:9092
Kafka server.properties: host.name=127.0.0.1
zookeeper.connect=127.0.0.1:2181
Zookeeper zookeeper.properties clientPort=2181
clientPortAddress=127.0.0.1
How to authorize client access? Each Stunnel server has to have its own pem file containing a private key and a certificate. Also, a CA certificate file (or directory) is also needed to authorize client access. I used tls-toolkit.sh that is available in NiFi toolkit, to generate required files. Toolkit can generate three files, keystore.jks , truststore.jks and nifi.properties for each server. Server's key and cert can be extracted from keystore.jks. To do so, convert keystore.jks into keystore.p12 file by following commands (credit goes to this Stackoverflow) : # It's not important which server to run the toolkit on.
$ ./bin/tls-toolkit.sh standalone -n [0-1].server.aws.mine -C 'CN=server,OU=mine'
# Password for keystore.jks can be found in generated nifi.properties 'nifi.security.keystorePasswd'.
$ keytool -importkeystore -srckeystore keystore.jks -destkeystore keystore.p12 -srcstoretype jks -deststoretype pkcs12
Then extract key and cert from the p12 file:
$ openssl pkcs12 -in keystore.p12 -nokeys -out cert.pem $ openssl pkcs12 -in keystore.p12 -nodes -nocerts -out key.pem
Concatenate key and cert to create stunnel.pem, and deploy stunnel.pem to servers:
$ cat key.pem cert.pem >> stunnel.pem I used cert.pem as the CAFile for Stunnel on 0.server. In stunnel.conf on 0.server, following settings are needed to enable client cert verification: verify = 3
CAFile = /etc/stunnel/certs
Refer Stunnel manual for further description on these configurations. I confirmed that GetKafka running on 1.server can consume messages through Stunnel. If I used a cert which is not configured in the certs file on 0.server, GetKafka got timeout exception as follows: 2017-03-09 06:50:48,690 WARN [Timer-Driven Process Thread-5] o.apache.nifi.processors.kafka.GetKafka GetKafka[id=b0a21b5d-015a-1000-fbba-2648095ae625] Executor did not stop in 30 sec. Terminated.
2017-03-09 06:50:48,691 WARN [Timer-Driven Process Thread-5] o.apache.nifi.processors.kafka.GetKafka GetKafka[id=b0a21b5d-015a-1000-fbba-2648095ae625] Timed out after 60000 milliseconds while waiting to get connection
java.util.concurrent.TimeoutException: null
at java.util.concurrent.FutureTask.get(FutureTask.java:205) [na:1.8.0_121]
at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) ~[nifi-kafka-0-8-processors-1.1.2.jar:1.1.2]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) [nifi-api-1.1.2.jar:1.1.2]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) [nifi-framework-core-1.1.2.jar:1.1.2]
Stunnel commands # Install
sudo yum -y install stunnel
# Edit config
sudo vi /etc/stunnel/stunnel.conf
# Start
sudo stunnel
# Stop
sudo kill `cat /var/run/stunnel.pid`
Conclusion Although Stunnel works with GetKafka and Kafka 0.8.x, I recommend to use newer version of Kafka and ConsumeKafka NiFi processor with SSL if possible. As it's written in the Git comment, this workaround is not scalable (in terms of required administration tasks) and complicated.
... View more
Labels:
β04-24-2017
05:38 PM
Thanks a lot for the great tutorial. How could this be extended to not only listen to a web socket, but rather periodically send control commands like: https://blockchain.info/api/api_websocket for example `{"op":"unconfirmed_sub"}`?
... View more
β09-11-2018
11:01 PM
Hello In this Solution, Nifi Cluster also deployed on docker? Thanks
... View more
β07-05-2016
07:26 PM
6 Kudos
There're already several articles describing how to setup NiFi to connect Kafka or HDFS. However, following scattered piece of documentations to complete Kerberizing HDP, and setting ACL of Zookeeper, Kafka, HDFS, and Kerberos was not an easy task for me.
So, the motivation of this article is to share rough but throughout operations needed to set it up right. You can elaborate each step by digging related documentation further. Example configuration and NiFi template are available, too.
Some of these steps may not be required, and some were forgotten to be here, but these were the steps to Kerberize my HDP sandbox VM:
Install by downloading the latest HDF
Start Kafka, stop maintenance mode.
Restart all affected services
Install a new MIT KDC [1] KDC
Enable Kerberos from Ambari
Proceed with Ambari Kerberos wizard
Check Pig failed to pass the test, but continued with Complete button anyway
Start services manually that didn't start automatically
Configured Kafka for Kerberos over Ambari [2]
Modify Kafka listeners from PLAINTEXT://localhost:6667 to PLAINTEXTSASL://localhost:6667 from Ambari
Enable `Kafka ranger plugin` in Ranger config, and Check `Enable Ranger for KAFKA` in Kafka config from Ambari [6]
Setup Ranger Kafka service [3] Don't know what password should be here. kafka/kafka passed connection test.
If a consumer has already connected to the same topic using same consumer group id, then other consumer using different sasl user can't connect using the same group id. Because a Znode is already created with ACL.
Setup NiFi to access Kerberized Kafka [4], watch out for the '"' when you copy and paste exampl
Setup NiFi to access Kerberized HDFS by setting `/etc/krb5.conf` as `nifi.kerberos.krb5.file` in nifi.properties.
Using only Ranger to manager access control is recommended [5]
- Setup NiFi Dataflow using PutKafka, GetKafka and PutHDFS
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_Security_Guide/content/_optional_install_a_new_mit_kdc.html
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_secure-kafka-ambari/content/ch_secure-kafka-overview.html
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/bk_Ranger_User_Guide/content/kafka_service.html
https://community.hortonworks.com/articles/28180/how-to-configure-hdf-12-to-send-to-and-get-data-fr.html?platform=hootsuite
http://hortonworks.com/blog/best-practices-in-hdfs-authorization-with-apache-ranger/
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_Security_Guide/content/kafka_plugin.html
... View more
Labels: