Kafka from 0.9 onwards started support SASL_PLAINTEXT ( authentication and non-encrypted) for communication b/w brokers and consumer/produce r with broker. To know more about SASL, please refer to this link.
Above configuration is set to use key tab and ticket cache. The Kakfa Client Producer use above info to get TGT and authenticates with Kafka broker.
Note:
a] Make sure the /etc/krb5.conf has realms mapping for "EXAMPLE.COM" and also the default_realm is set to "EXAMPLE.COM" under [libdefaults] section. Please refer this link for more information.
b] Run below command and make sure it is successful
The Kafka Producer Client needs certain information to initialize itself. This can be provided either as a property file input or as a HashMap as below.
Properties properties = new Properties();
properties.put("bootstrap.servers","comma-seperated-list-of-brokers");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// key serializer
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //value serializer
properties.put("acks","1"); //message durability -- 1 mean ack after writing to leader is success. value of "all" means ack after replication.
properties.put("security.protocol","SASL_PLAINTEXT"); // Security protocol to use for communication.
properties.put("batch.size","16384");// maximum size of message
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);<br>
3. Push Message
producer.send(new ProducerRecord<String, String>(", "key", "value"), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error("Send failed for record: {}", metadata);
}
else {
LOG.info("Message delivered to topic {} and partition {}. Message offset is {}",metadata.topic(),metadata.partition(),metadata.offset());
}
}
});
}
producer.close();
Above code pushes message to kafka broker and on completion( acked ) the method "onCompletion" is invoked.