Community Articles

Find and share helpful community-sourced technical articles.
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)

Kafka from 0.9 onwards started support SASL_PLAINTEXT ( authentication and non-encrypted) for communication b/w brokers and consumer/produce r with broker. To know more about SASL, please refer to this link.

  1. Maven Dependency

Add below maven dependency to your pom.xml

<dependency><br>    <groupId>org.apache.kafka</groupId><br>    <artifactId>kafka-clients</artifactId><br>    <version></version><br></dependency>

2. Kerberos Setup

Configure JAAS configuration file with contents as below

KafkaClient { required

Above configuration is set to use key tab and ticket cache. The Kakfa Client Producer use above info to get TGT and authenticates with Kafka broker.


a] Make sure the /etc/krb5.conf has realms mapping for "EXAMPLE.COM" and also the default_realm is set to "EXAMPLE.COM" under [libdefaults] section. Please refer this link for more information.

b] Run below command and make sure it is successful

kinit -kt /etc/security/keytabs/user.headless.keytab user@EXAMPLE.COM

3. Initialization Kafka Producer

The Kafka Producer Client needs certain information to initialize itself. This can be provided either as a property file input or as a HashMap as below.

Properties properties = new Properties();
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// key serializer
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //value serializer
properties.put("acks","1"); //message durability -- 1 mean ack after writing to leader is success. value of "all" means ack after replication.
properties.put("security.protocol","SASL_PLAINTEXT"); // Security protocol to use for communication.
properties.put("batch.size","16384");// maximum size of message 
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);<br>

3. Push Message

producer.send(new ProducerRecord<String, String>(", "key", "value"), new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            if (e != null) {
                LOG.error("Send failed for record: {}", metadata);
            else    {

      "Message delivered to topic {} and partition {}. Message offset is {}",metadata.topic(),metadata.partition(),metadata.offset());


Above code pushes message to kafka broker and on completion( acked ) the method "onCompletion" is invoked.

4. Run

While running this code add below VM params<PATH_TO_JAAS_FILE_CREATED_IN_STEP2>