- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 08-11-2016 10:05 PM
Kafka from 0.9 onwards started support SASL_PLAINTEXT ( authentication and non-encrypted) for communication b/w brokers and consumer/produce r with broker. To know more about SASL, please refer to this link.
- Maven Dependency
Add below maven dependency to your pom.xml
<dependency><br> <groupId>org.apache.kafka</groupId><br> <artifactId>kafka-clients</artifactId><br> <version>0.10.0.0</version><br></dependency>
2. Kerberos Setup
Configure JAAS configuration file with contents as below
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true principal="user@EXAMPLE.COM" useKeyTab=true serviceName="kafka" keyTab="/etc/security/keytabs/user.headless.keytab"; };
Above configuration is set to use key tab and ticket cache. The Kakfa Client Producer use above info to get TGT and authenticates with Kafka broker.
Note:
a] Make sure the /etc/krb5.conf has realms mapping for "EXAMPLE.COM" and also the default_realm is set to "EXAMPLE.COM" under [libdefaults] section. Please refer this link for more information.
b] Run below command and make sure it is successful
kinit -kt /etc/security/keytabs/user.headless.keytab user@EXAMPLE.COM
3. Initialization Kafka Producer
The Kafka Producer Client needs certain information to initialize itself. This can be provided either as a property file input or as a HashMap as below.
Properties properties = new Properties(); properties.put("bootstrap.servers","comma-seperated-list-of-brokers"); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// key serializer properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //value serializer properties.put("acks","1"); //message durability -- 1 mean ack after writing to leader is success. value of "all" means ack after replication. properties.put("security.protocol","SASL_PLAINTEXT"); // Security protocol to use for communication. properties.put("batch.size","16384");// maximum size of message KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);<br>
3. Push Message
producer.send(new ProducerRecord<String, String>(", "key", "value"), new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { LOG.error("Send failed for record: {}", metadata); } else { LOG.info("Message delivered to topic {} and partition {}. Message offset is {}",metadata.topic(),metadata.partition(),metadata.offset()); } } }); } producer.close();
Above code pushes message to kafka broker and on completion( acked ) the method "onCompletion" is invoked.
4. Run
While running this code add below VM params
-Djava.security.auth.login.config=<PATH_TO_JAAS_FILE_CREATED_IN_STEP2> -Djava.security.krb5.conf=/etc/krb5.conf