Created 11-21-2017 09:43 PM
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617) at KafkaConsumerNew.main(KafkaConsumerNew.java:22) Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXTSASL at java.lang.Enum.valueOf(Enum.java:238) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) ... 3 more
Created 11-21-2017 09:55 PM
@Veerendra Nath
Try out with SASL_PLAINTEXT.
If you are using open source Kafka version not HDP Kafka, you need to use below mentioned values.
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
Created 11-21-2017 10:21 PM
I am using HDP 2.6 and Kafka 0.9 and my java code looks like
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:port number"
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerConfig.put("security.protocol", "PLAINTEXTSASL"); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig); TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener(); consumer.subscribe(Collections.singletonList("TOPICNMAE"), rebalanceListener); while (true) { ConsumerRecords<byte[], byte[]> records = consumer.poll(1000); for (ConsumerRecord<byte[], byte[]> record : records) { System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitSync(); }
and command i am using :java -Djava.security.auth.login.config=path/kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -cp path/Consumer_test.jar className topicName
Created 11-21-2017 10:45 PM
chnage this lne from
consumerConfig.put("security.protocol", "PLAINTEXTSASL");
to
consumerConfig.put("security.protocol", "SASL_PLAINTEXT");
Reference: https://kafka.apache.org/090/documentation.html (search for security.protocol)