Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Super Collaborator

The most recent release of Kafka 0.9 with it's comprehensive security implementation has reached an important milestone. In his blog post Kafka Security 101 Ismael from Confluent describes the security features part of the release very well.

As a part II of the here published post about Kafka Security with Kerberos this post discussed a sample implementation of a Java Kafka producer with authentication. It is part of a mini series of posts discussing secure HDP clients, connecting services to a secured cluster, and kerberizing the HDP Sandbox (Download HDP Sandbox). In this effort at the end of this post we will also create a Kafka Servlet to publish messages to a secured broker.

Kafka provides SSL and Kerberos authentication. Only Kerberos is discussed here.7717-1xcds.gif

Kafka from now on supports four different communication protocols between Consumers, Producers, and Brokers. Each protocol considers different security aspects, while PLAINTEXT is the old insecure communication protocol.

  • PLAINTEXT (non-authenticated, non-encrypted)
  • SSL (SSL authentication, encrypted)
  • PLAINTEXT+SASL (authentication, non-encrypted)
  • SSL+SASL (encrypted authentication, encrypted transport)

A Kafka client needs to be configured to use the protocol of the corresponding broker. This tells the client to use authentication for communication with the broker:

Properties props = new Properties();
props.put("security.protocol", "PLAINTEXTSASL");

Making use of Kerberos authentication in Java is provided by the Java Authentication and Authorization Service (JAAS) which is a pluggable authentication method similar to PAM supporting multiple authentication methods. In this case the authentication method being used is GSS-API for Kerberos.

Demo Setup

For JAAS a proper configuration of GSS would be needed in addition to being in possession of proper credentials, obviously. Some credentials can be created with MIT Kerberos like this:

(as root)
$ kadmin.local -q "addprinc -pw hadoop kafka-user" 
$ kadmin.local -q "xst -k /home/kafka-user/kafka-user.keytab kafka-user@MYCORP.NET"
(Creating a keytab will make the existing password invalid. To change your password back to hadoop use as root:)
$ kadmin.local -q "cpw -pw hadoop hdfs-user"

The last line is not necessarily needed as it creates us a so called keytab - basically an encrypted password of the user - that can be used for password less authentication for example for automated services. We will make use of that here as well.

First we need to prepare a test topic to publish messages with proper privileges for our kafka-user:

# Become Kafka admin
$ kinit -kt /etc/security/keytabs/kafka.service.keytab kafka/one.hdp@MYCORP.NET
# Set privileges for kafka-user
$ /usr/hdp/current/kafka-broker/bin/kafka-acls.sh --add --allow-principals user:kafka-user --operation ALL --topic test --authorizer-properties zookeeper.connect=one.hdp:2181
Adding following acls for resource: Topic:test 
  user:kafka-user has Allow permission for operations: All from hosts: * 

Following is list of acls for resource: Topic:test 
  user:kafka-user has Allow permission for operations: All from hosts: *

As a sample producer we will use this:

package hdp.sample;

import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {

    public static void main(String... args) {
        String topic = args[1];

        Properties props = new Properties();
        props.put("metadata.broker.list", args[0]);
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        props.put("security.protocol", "PLAINTEXTSASL");

        ProducerConfig config = new ProducerConfig(props);
        Producer producer = new Producer<String, String>(config);

        for (int i = 0; i < 10; i++){
            producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
        }
    }
}

With this setup we can go ahead demonstrating two ways to use a JAAS context to authenticate with the Kafka broker. At first we will configure a context to use the existing privileges possessed by the executing user. Next we use a so called keytab to demonstrate a password-less login for automated producer processes. At last we will look at a Servlet implementation provided here.

Authentication with User Login

To configure a JAAS config with userKeyTab set to false and useTicketCache to true, so that the privileges of the current users are being used.

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 useKeyTab=false
 useTicketCache=true
 serviceName="kafka";
};

We store this in a file under /home/kafka-user/kafka-jaas.conf and exeute the broker like this:

# list current user context
$ klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka-user@MYCORP.NET

Valid starting       Expires              Service principal
21.02.2016 16:13:13  22.02.2016 16:13:13  krbtgt/MYCORP.NET@MYCORP.NET

# execute java producer
$ java -Djava.security.auth.login.config=/home/kafka-user/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=false -cp hdp-kafka-sample-1.0-SNAPSHOT.jar:/usr/hdp/current/kafka-broker/libs/* hdp.sample.KafkaProducer one.hdp:6667 test

# consume sample messages for test
$ /usr/hdp/current/kafka-broker/bin/kafka-simple-consumer-shell.sh --broker-list one.hdp:6667 --topic test --security-protocol PLAINTEXTSASL --partition 0
{metadata.broker.list=one.hdp:6667, request.timeout.ms=1000, client.id=SimpleConsumerShell, security.protocol=PLAINTEXTSASL}
Test Date: Sun Feb 21 16:12:05 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016

Using Keytab to Login

Next we will configure the JAAS context to use a generated keytab file instead of the security context of the executing user. Before we can do this we need to create the keytab storing it also under /home/kafka-user/kafka-user.keytab.

$ kadmin.local -q "xst -k /home/kafka-user/kafka-user.keytab kafka-user@MYCORP.NET"
Authenticating as principal kafka-user/admin@MYCORP.NET with password.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type aes256-cts-hmac-sha1-96 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type aes128-cts-hmac-sha1-96 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type des3-cbc-sha1 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type arcfour-hmac added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type des-hmac-sha1 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type des-cbc-md5 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.

$ chown kafka-user. /home/kafka-user/kafka-user.keytab

The JAAS configuration can now be changed to look like this:

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 doNotPrompt=true
 useTicketCache=true
 principal="kafka-user@MYCORP.NET"
 useKeyTab=true
 serviceName="kafka"
 keyTab="/home/kafka-user/kafka-user.keytab"
 client=true;
};

This will use the keytab stored under /home/kafka-user/kafka-user.keytab while the user executing the producer must not be logged in to any security controller:

$ klist
klist: Credentials cache file '/tmp/krb5cc_0' not found

$ java -Djava.security.auth.login.config=/home/kafka-user/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -cp hdp-kafka-sample-1.0-SNAPSHOT.jar:/usr/hdp/current/kafka-broker/libs/* hdp.sample.KafkaProducer one.hdp:6667 test

Kafka Producer Servlet

In a last example we will add a Kafka Servlet to the hdp-web-sample project previously described in this post. Our Servlet will get the topic and message as a GET parameter. The Servlet looks as follwoing:

package hdp.webapp;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.Properties;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaServlet extends HttpServlet implements Servlet {

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        String topic = request.getParameter("topic");
        String msg = request.getParameter("msg");

        Properties props = new Properties();
        props.put("metadata.broker.list", "one.hdp:6667");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        props.put("security.protocol", "PLAINTEXTSASL");

        ProducerConfig config = new ProducerConfig(props);
        Producer producer = new Producer<String, String>(config);

        producer.send(new KeyedMessage<String, String>(topic, msg));

        PrintWriter out = response.getWriter();
        out.println("<html>");
        out.println("<head><title>Write to topic: "+ topic +"</title></head>");
        out.println("<body><h1>/"+ msg +"</h1>");
        out.println("</html>");
        out.close();

    }

}

Again we are changing the JAAS config of the Tomcat service to be able to make use of the previously generated keytab. The jaas.conf of Tomcat will contain now this:

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 doNotPrompt=true
 useTicketCache=true
 principal="kafka-user@MYCORP.NET"
 useKeyTab=true
 serviceName="kafka"
 keyTab="/home/kafka-user/kafka-user.keytab"
 client=true;
};
com.sun.security.jgss.krb5.initiate {
    com.sun.security.auth.module.Krb5LoginModule required
    doNotPrompt=true
    principal="tomcat/one.hdp@MYCORP.NET"
    useKeyTab=true
    keyTab="/etc/tomcat/tomcat.keytab"
    storeKey=true;
};

After deploying the web app and restarting tomcat with this newly adapted JAAS config you should be able to publish message to a secured broker be triggering the following GET address from a browser http://one.hdp:8099/hdp-web/kafka?topic=test&msg=Test1 . The response should be a 200 OK like this:

tomcat_kafka_1

You might be having some issues and in particular seeing this Exception:

SEVERE: Servlet.service() for servlet [KafkaServlet] in context with path [/hdp-web] threw exception [Servlet execution threw an exception] with root cause
javax.security.auth.login.LoginException: Unable to obtain password from user

 at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:897)
 at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:760)
 at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
 at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
 at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
 at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
 at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
 at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
 at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
 at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
 at kafka.producer.Producer.<init>(Producer.scala:50)
 at kafka.producer.Producer.<init>(Producer.scala:73)
 at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
 at hdp.webapp.KafkaServlet.doGet(KafkaServlet.java:33)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:620)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
 at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
 at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
 at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
 at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501)
 at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:171)
 at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
 at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:950)
 at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
 at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:408)
 at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1040)
 at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:607)
 at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:314)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
 at java.lang.Thread.run(Thread.java:745)

If are seeing the message javax.security.auth.login.LoginException: Unable to obtain password from user it likely refers to your keytab file, as being the users password. So make sure that the tomcat user is able to read that file stored under /home/kafka-user/kafka-user.keytab for example.

Further Readings

  • Kafka Security 101
  • Kafka Security
  • Kafka Sasl/Kerberos and SSL Implementation
  • Oracle Doc: JAAS Authentication
  • Krb5LoginModule
  • Flume with kerberized KafkaJAAS Login Configuration File
  • This article was first published under: http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/


    xherf.gif
    32,447 Views