Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar

Kafka from 0.9 onwards started support SASL_PLAINTEXT ( authentication and non-encrypted) for communication b/w brokers and consumer/produce r with broker. To know more about SASL, please refer to this link.

  1. Maven Dependency

Add below maven dependency to your pom.xml

<dependency><br>    <groupId>org.apache.kafka</groupId><br>    <artifactId>kafka-clients</artifactId><br>    <version>0.10.0.0</version><br></dependency>

2. Kerberos Setup

Configure JAAS configuration file with contents as below

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 useTicketCache=true
 principal="user@EXAMPLE.COM"
 useKeyTab=true
 serviceName="kafka"
 keyTab="/etc/security/keytabs/user.headless.keytab";
};


Above configuration is set to use key tab and ticket cache. The Kakfa Client Producer use above info to get TGT and authenticates with Kafka broker.

Note:

a] Make sure the /etc/krb5.conf has realms mapping for "EXAMPLE.COM" and also the default_realm is set to "EXAMPLE.COM" under [libdefaults] section. Please refer this link for more information.

b] Run below command and make sure it is successful

kinit -kt /etc/security/keytabs/user.headless.keytab user@EXAMPLE.COM

3. Initialization Kafka Producer

The Kafka Producer Client needs certain information to initialize itself. This can be provided either as a property file input or as a HashMap as below.

Properties properties = new Properties();
properties.put("bootstrap.servers","comma-seperated-list-of-brokers");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// key serializer
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //value serializer
properties.put("acks","1"); //message durability -- 1 mean ack after writing to leader is success. value of "all" means ack after replication.
properties.put("security.protocol","SASL_PLAINTEXT"); // Security protocol to use for communication.
properties.put("batch.size","16384");// maximum size of message 
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);<br>

3. Push Message

producer.send(new ProducerRecord<String, String>(", "key", "value"), new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            if (e != null) {
                LOG.error("Send failed for record: {}", metadata);
            }
            else    {

                LOG.info("Message delivered to topic {} and partition {}. Message offset is {}",metadata.topic(),metadata.partition(),metadata.offset());
            }
        }
    });

}
producer.close();

Above code pushes message to kafka broker and on completion( acked ) the method "onCompletion" is invoked.

4. Run

While running this code add below VM params

-Djava.security.auth.login.config=<PATH_TO_JAAS_FILE_CREATED_IN_STEP2> -Djava.security.krb5.conf=/etc/krb5.conf
4,562 Views