Support Questions

Find answers, ask questions, and share your expertise

Using The Kafka-Topics Script In CDP7

avatar
Contributor

I am trying to run what in my old cluster was a simple command

kafka-topics --bootstrap-server mybroker:9092 --list

Now with a CDP7 cluster, and Ranger installed, I get the following error. What essential thing am I missing here. Do I need a certain Ranger policy? Does this user have to be Kerberized? Is it something else? I am trying to reuse old management scripts to create my topics, but they all rely on getting the kafka-topics script to work. I have turned TLS off for the brokers.

 

21/01/10 13:47:49 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean
21/01/10 13:47:49 INFO admin.AdminClientConfig: AdminClientConfig values: 
	bootstrap.servers = [mybroker:9092]
	client.dns.lookup = default
	client.id = 
	connections.max.idle.ms = 300000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 120000
	retries = 5
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

21/01/10 13:47:49 INFO utils.AppInfoParser: Kafka version: 2.4.1.7.1.4.0-203
21/01/10 13:47:49 INFO utils.AppInfoParser: Kafka commitId: 79e841231b59b25d
21/01/10 13:47:49 INFO utils.AppInfoParser: Kafka startTimeMs: 1610275669932
21/01/10 13:47:52 INFO internals.AdminMetadataManager: [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.DisconnectException: Cancelled fetchMetadata request with correlation id 11 due to node -1 being disconnected
...
Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
21/01/10 13:49:49 ERROR admin.TopicCommand$: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at kafka.admin.TopicCommand$AdminClientTopicService.getTopics(TopicCommand.scala:313)
	at kafka.admin.TopicCommand$AdminClientTopicService.listTopics(TopicCommand.scala:249)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

21/01/10 13:49:49 INFO internals.AdminMetadataManager: [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

 

 

2 REPLIES 2

avatar
Contributor

I have done the following

  1. Created a new group that is visible to Ranger and added my user to it.
  2. Added that group to an existing Kafka-Ranger policy that has full permissions on the Kafka cluster
  3. Kinit my user when logged onto the Kafka broker.
  4. Run the script again.

I still get the same error message.

avatar
Contributor

I eventually found the answer in this document.

https://docs.cloudera.com/runtime/7.2.6/kafka-securing/topics/kafka-secure-kerberos-enable.html

The steps you need are

1: Create a jaas.conf file to describe how you will kerberise.

Either interactively with kinit

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};

or non-interactively with a keytab

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/mykafkaclient.keytab"
principal="mykafkaclient/clients.hostname.com@EXAMPLE.COM";
};

2: Create a client properties file to describe how you will authenticate

Either with TLS

security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
ssl.truststore.location=<path to jks file>
ssl.truststore.password=<password for truststore>

Or without

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

3: Create the environment variable KAFKA_OPTS to to contain the JVM parameter

export KAFKA_OPTS="-Djava.security.auth.login.config=<path to jaas.conf>"

 

Then you can run the tool by referencing the Kafka brokers and the client config.

BOOTSTRAP=<kafka brokers URL>

kafka-topics --bootstrap-server $BOOTSTRAP --command-config client.properties --list

 

You will also need a Ranger policy that covers what you are trying to do.