Support Questions

Find answers, ask questions, and share your expertise

kafka-sentry command is not working

avatar
New Contributor

Hi all,

 

we are having a lot of different issues on our Cloudera Kerberized cluster (CDH 5.12) with Kafka + Kerberos + Sentry security: the first one we see is with MirrorMaker, enabling Sentry for Kafka cause MirrorMaker to not start

 

From logs:

[mirrormaker-thread-0] Mirror maker thread failure due to 
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: cloudera_mirrormaker

I think we need to give privilegs from kafka-sentry command, but when i try to launch the commands with my user (which is a superuser in kafka) after the kinit (KDC configured with Active Directory), i got this

 

17/12/14 16:19:12 WARN security.UserGroupInformation: PriviledgedActionException as:XXXXXXXXX@XXXXXXXXX (auth:KERBEROS) cause:org.apache.thrift.transport.TTransportException: Peer indicated failure: Problem with callback handler
17/12/14 16:19:12 ERROR tools.SentryShellKafka:
java.lang.reflect.UndeclaredThrowableException
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1933)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl$UgiSaslClientTransport.open(SentryGenericServiceClientDefaultImpl.java:99)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl.<init>(SentryGenericServiceClientDefaultImpl.java:155)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory.create(SentryGenericServiceClientFactory.java:31)
at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.run(SentryShellKafka.java:51)
at org.apache.sentry.provider.db.tools.SentryShellCommon.executeShell(SentryShellCommon.java:241)
at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.main(SentryShellKafka.java:96)
Caused by: org.apache.thrift.transport.TTransportException: Peer indicated failure: Problem with callback handler
at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:199)
at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:307)
at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl$UgiSaslClientTransport.baseOpen(SentryGenericServiceClientDefaultImpl.java:115)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl$UgiSaslClientTransport.access$000(SentryGenericServiceClientDefaultImpl.java:71)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl$UgiSaslClientTransport$1.run(SentryGenericServiceClientDefaultImpl.java:101)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl$UgiSaslClientTransport$1.run(SentryGenericServiceClientDefaultImpl.java:99)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
... 6 more
The operation failed. Message: Peer indicated failure: Problem with callback handler


# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: XXXXXXXXXX@XXXXXX.XXXXXX

Valid starting Expires Service principal
12/14/2017 15:47:34 12/15/2017 01:47:34 krbtgt/XXXXXXXX@XXXXXXX
renew until 12/21/2017 15:47:27

 

Aside from that, we tried also to launch kafka producers and consumers from CLI, to see if we can manage to test it working

 

I am able to list topics

kafka-topics --list --zookeeper [ZK host]:2181/kafka


....

....

17/12/14 16:22:02 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
17/12/14 16:22:02 INFO zkclient.ZkClient: zookeeper state changed (SaslAuthenticated)
__consumer_offsets
test
test1

 

i can start a local producer

 

 /usr/bin/kafka-console-producer --topic test1 --producer.config client.properties --broker-list [broker]:9092

 

17/12/14 16:24:08 INFO producer.ProducerConfig: ProducerConfig values:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [SERVER:9092]
buffer.memory = 33554432
client.id = console-producer
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 1000
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 1500
retries = 3
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 102400
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

17/12/14 16:24:08 INFO authenticator.AbstractLogin: Successfully logged in.
17/12/14 16:24:08 INFO kerberos.KerberosLogin: [Principal=null]: TGT refresh thread started.
17/12/14 16:24:08 INFO kerberos.KerberosLogin: [Principal=null]: TGT valid starting at: Thu Dec 14 15:47:34 CET 2017
17/12/14 16:24:08 INFO kerberos.KerberosLogin: [Principal=null]: TGT expires: Fri Dec 15 01:47:34 CET 2017
17/12/14 16:24:08 INFO kerberos.KerberosLogin: [Principal=null]: TGT refresh sleeping until: Fri Dec 15 00:12:50 CET 2017
17/12/14 16:24:08 INFO utils.AppInfoParser: Kafka version : 0.10.2-kafka-2.2.0
17/12/14 16:24:08 INFO utils.AppInfoParser: Kafka commitId : unknown

 

Then when i start the consumer, i got this

 

/usr/bin/kafka-console-consumer --topic test1 --from-beginning --consumer.config consumer.properties --zookeeper [ZOOKEEPER_HOST]:2181/kafka

 

...

...
17/12/14 16:28:13 INFO zookeeper.ClientCnxn: Socket connection established, initiating session, client: /10.92.80.21:41892, server: dctsvl028.group.local/10.92.80.28:2181
17/12/14 16:28:13 INFO zookeeper.ClientCnxn: Session establishment complete on server dctsvl028.group.local/10.92.80.28:2181, sessionid = 0x16050267dfb0874, negotiated timeout = 6000
17/12/14 16:28:13 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
17/12/14 16:28:13 INFO zkclient.ZkClient: zookeeper state changed (SaslAuthenticated)
17/12/14 16:28:13 INFO consumer.ZookeeperConsumerConnector: [console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c], starting auto committer every 60000 ms
17/12/14 16:28:13 INFO consumer.ZookeeperConsumerConnector: [console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c], begin registering consumer console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c in ZK

......

......

17/12/14 16:28:13 INFO consumer.ZookeeperConsumerConnector: [console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c], Creating topic event watcher for topics test1
17/12/14 16:28:13 INFO consumer.ZookeeperConsumerConnector: [console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c], Topics to consume = ArrayBuffer(test1)
17/12/14 16:28:13 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: [console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c-leader-finder-thread], Failed to find leader for Set(test1-6, test1-3, test1-1, test1-7, test1-5, test1-0, test1-4, test1-2)
kafka.common.BrokerEndPointNotAvailableException: End point with security protocol PLAINTEXT not found for broker 293
at kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1$$anonfun$apply$5.apply(ClientUtils.scala:146)
at kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1$$anonfun$apply$5.apply(ClientUtils.scala:146)
at scala.Option.getOrElse(Option.scala:121)
at kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1.apply(ClientUtils.scala:146)
at kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1.apply(ClientUtils.scala:142)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.client.ClientUtils$.getPlaintextBrokerEndPoints(ClientUtils.scala:142)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

 

and this last part continue forever

 

if i try using --bootstrap server, i get this

/usr/bin/kafka-console-consumer --topic test1 --from-beginning --consumer.config consumer.properties --bootstrap-server [broker_server]:9092 --new-consumer

 

17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 11 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 12 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 13 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 14 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 15 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 16 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 17 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 18 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 19 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:42 WARN clients.NetworkClient: Error while fetching metadata with correlation id 20 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:42 WARN clients.NetworkClient: Error while fetching metadata with correlation id 21 : {test1=LEADER_NOT_AVAILABLE}

 

changing the port above from 9092 to another port makes the consumer starting, but still if i type something in the producer, i don't get anything in the consumer console

 

kafka process are running under this ports

# netstat -apn|grep 17897
tcp 0 0 0.0.0.0:9393 0.0.0.0:* LISTEN 17897/java
tcp 0 0 0.0.0.0:35188 0.0.0.0:* LISTEN 17897/java
tcp 0 0 [IP]:9092 0.0.0.0:* LISTEN 17897/java
tcp 0 0 0.0.0.0:37801 0.0.0.0:* LISTEN 17897/java
tcp 0 0 0.0.0.0:24042 0.0.0.0:* LISTEN 17897/java

 

Probably it's my fault in messing up somewhere, below the files used for testing; in CM the protocol has been set at SASL_PLAINTEXT, sentry has been enable in kafka configuration, not much else than that. SSL not enabled

 

/etc/kafka/jaas.conf

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};

 

consumer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
bootstrap.servers=[broker]:9092

 

client.properties

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
bootstrap.servers=[broker]:9092
sasl.mechanism=GSSAPI

 

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/jaas.conf"

 

also note that i redacted the IP/hosts in the configuration/outputs

Sorry for the long post

 

Regards,

Matteo

1 REPLY 1

avatar
Explorer

Hi @mlussana1,

 

I have configured Kafka(2.2.0) with Sentry enabled in Kerberized environment and be able to use it as channel for Flume in CDH 5.13.

 

First of all did you add the kafka to the Allowed Connecting Users(sentry.service.allow.connect) in sentry configuration?And in order to give privilege your user must be in one of the sentry admin groups which are listed in 

Admin Groups(sentry.service.admin.group) configuration.Those may cause the sentry shell problem.
 
For the producer problem ,I am not sure but you may modify the jass.conf file as follows:
 
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=false
useTicketCache=true
serviceName="kafka"
principal="username@xxxx.xxx";
};

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=false
useTicketCache=true
serviceName="zookeeper"
principal="username@xxxx.xxx";
};
 
 
And I run the consumer and producer from command line as follows:
 
kafka-console-consumer  --topic topicname  --from-beginning --bootstrap-server brokerhostname:9092 --consumer.config consumer.properties
kafka-console-producer --broker-list [brokers]:9092 --topic topicname --producer.config client.properties
 
I hope these would help.