Support Questions

Find answers, ask questions, and share your expertise

trying to read the offset from JAVA api (Consumer ) ? geeting the error like below :

avatar
Contributor

Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617) at KafkaConsumerNew.main(KafkaConsumerNew.java:22) Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXTSASL at java.lang.Enum.valueOf(Enum.java:238) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) ... 3 more

3 REPLIES 3

avatar
Contributor

@Veerendra Nath

Try out with SASL_PLAINTEXT.

If you are using open source Kafka version not HDP Kafka, you need to use below mentioned values.

Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

avatar
Contributor
I am using HDP 2.6 and Kafka 0.9 and my java code looks like 






consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:port number"

consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerConfig.put("security.protocol", "PLAINTEXTSASL"); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig); TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener(); consumer.subscribe(Collections.singletonList("TOPICNMAE"), rebalanceListener); while (true) { ConsumerRecords<byte[], byte[]> records = consumer.poll(1000); for (ConsumerRecord<byte[], byte[]> record : records) { System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitSync(); }

and command i am using :java -Djava.security.auth.login.config=path/kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -cp path/Consumer_test.jar className topicName

avatar
Contributor

chnage this lne from

consumerConfig.put("security.protocol", "PLAINTEXTSASL");

to

consumerConfig.put("security.protocol", "SASL_PLAINTEXT");

Reference: https://kafka.apache.org/090/documentation.html (search for security.protocol)