Created on 12-14-2017 07:50 AM - edited 09-16-2022 05:38 AM
Hi all,
we are having a lot of different issues on our Cloudera Kerberized cluster (CDH 5.12) with Kafka + Kerberos + Sentry security: the first one we see is with MirrorMaker, enabling Sentry for Kafka cause MirrorMaker to not start
From logs:
[mirrormaker-thread-0] Mirror maker thread failure due to org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: cloudera_mirrormaker
I think we need to give privilegs from kafka-sentry command, but when i try to launch the commands with my user (which is a superuser in kafka) after the kinit (KDC configured with Active Directory), i got this
17/12/14 16:19:12 WARN security.UserGroupInformation: PriviledgedActionException as:XXXXXXXXX@XXXXXXXXX (auth:KERBEROS) cause:org.apache.thrift.transport.TTransportException: Peer indicated failure: Problem with callback handler
17/12/14 16:19:12 ERROR tools.SentryShellKafka:
java.lang.reflect.UndeclaredThrowableException
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1933)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl$UgiSaslClientTransport.open(SentryGenericServiceClientDefaultImpl.java:99)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl.<init>(SentryGenericServiceClientDefaultImpl.java:155)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory.create(SentryGenericServiceClientFactory.java:31)
at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.run(SentryShellKafka.java:51)
at org.apache.sentry.provider.db.tools.SentryShellCommon.executeShell(SentryShellCommon.java:241)
at org.apache.sentry.provider.db.generic.tools.SentryShellKafka.main(SentryShellKafka.java:96)
Caused by: org.apache.thrift.transport.TTransportException: Peer indicated failure: Problem with callback handler
at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:199)
at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:307)
at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl$UgiSaslClientTransport.baseOpen(SentryGenericServiceClientDefaultImpl.java:115)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl$UgiSaslClientTransport.access$000(SentryGenericServiceClientDefaultImpl.java:71)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl$UgiSaslClientTransport$1.run(SentryGenericServiceClientDefaultImpl.java:101)
at org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientDefaultImpl$UgiSaslClientTransport$1.run(SentryGenericServiceClientDefaultImpl.java:99)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
... 6 more
The operation failed. Message: Peer indicated failure: Problem with callback handler
# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: XXXXXXXXXX@XXXXXX.XXXXXX
Valid starting Expires Service principal
12/14/2017 15:47:34 12/15/2017 01:47:34 krbtgt/XXXXXXXX@XXXXXXX
renew until 12/21/2017 15:47:27
Aside from that, we tried also to launch kafka producers and consumers from CLI, to see if we can manage to test it working
I am able to list topics
kafka-topics --list --zookeeper [ZK host]:2181/kafka
....
....
17/12/14 16:22:02 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
17/12/14 16:22:02 INFO zkclient.ZkClient: zookeeper state changed (SaslAuthenticated)
__consumer_offsets
test
test1
i can start a local producer
/usr/bin/kafka-console-producer --topic test1 --producer.config client.properties --broker-list [broker]:9092
17/12/14 16:24:08 INFO producer.ProducerConfig: ProducerConfig values:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [SERVER:9092]
buffer.memory = 33554432
client.id = console-producer
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 1000
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 1500
retries = 3
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 102400
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
17/12/14 16:24:08 INFO authenticator.AbstractLogin: Successfully logged in.
17/12/14 16:24:08 INFO kerberos.KerberosLogin: [Principal=null]: TGT refresh thread started.
17/12/14 16:24:08 INFO kerberos.KerberosLogin: [Principal=null]: TGT valid starting at: Thu Dec 14 15:47:34 CET 2017
17/12/14 16:24:08 INFO kerberos.KerberosLogin: [Principal=null]: TGT expires: Fri Dec 15 01:47:34 CET 2017
17/12/14 16:24:08 INFO kerberos.KerberosLogin: [Principal=null]: TGT refresh sleeping until: Fri Dec 15 00:12:50 CET 2017
17/12/14 16:24:08 INFO utils.AppInfoParser: Kafka version : 0.10.2-kafka-2.2.0
17/12/14 16:24:08 INFO utils.AppInfoParser: Kafka commitId : unknown
Then when i start the consumer, i got this
/usr/bin/kafka-console-consumer --topic test1 --from-beginning --consumer.config consumer.properties --zookeeper [ZOOKEEPER_HOST]:2181/kafka
...
...
17/12/14 16:28:13 INFO zookeeper.ClientCnxn: Socket connection established, initiating session, client: /10.92.80.21:41892, server: dctsvl028.group.local/10.92.80.28:2181
17/12/14 16:28:13 INFO zookeeper.ClientCnxn: Session establishment complete on server dctsvl028.group.local/10.92.80.28:2181, sessionid = 0x16050267dfb0874, negotiated timeout = 6000
17/12/14 16:28:13 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
17/12/14 16:28:13 INFO zkclient.ZkClient: zookeeper state changed (SaslAuthenticated)
17/12/14 16:28:13 INFO consumer.ZookeeperConsumerConnector: [console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c], starting auto committer every 60000 ms
17/12/14 16:28:13 INFO consumer.ZookeeperConsumerConnector: [console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c], begin registering consumer console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c in ZK
......
......
17/12/14 16:28:13 INFO consumer.ZookeeperConsumerConnector: [console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c], Creating topic event watcher for topics test1
17/12/14 16:28:13 INFO consumer.ZookeeperConsumerConnector: [console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c], Topics to consume = ArrayBuffer(test1)
17/12/14 16:28:13 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: [console-consumer-22115_dctsvl021.group.local-1513265293155-5814942c-leader-finder-thread], Failed to find leader for Set(test1-6, test1-3, test1-1, test1-7, test1-5, test1-0, test1-4, test1-2)
kafka.common.BrokerEndPointNotAvailableException: End point with security protocol PLAINTEXT not found for broker 293
at kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1$$anonfun$apply$5.apply(ClientUtils.scala:146)
at kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1$$anonfun$apply$5.apply(ClientUtils.scala:146)
at scala.Option.getOrElse(Option.scala:121)
at kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1.apply(ClientUtils.scala:146)
at kafka.client.ClientUtils$$anonfun$getPlaintextBrokerEndPoints$1.apply(ClientUtils.scala:142)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.client.ClientUtils$.getPlaintextBrokerEndPoints(ClientUtils.scala:142)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
and this last part continue forever
if i try using --bootstrap server, i get this
/usr/bin/kafka-console-consumer --topic test1 --from-beginning --consumer.config consumer.properties --bootstrap-server [broker_server]:9092 --new-consumer
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 11 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 12 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 13 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 14 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 15 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 16 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 17 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 18 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:41 WARN clients.NetworkClient: Error while fetching metadata with correlation id 19 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:42 WARN clients.NetworkClient: Error while fetching metadata with correlation id 20 : {test1=LEADER_NOT_AVAILABLE}
17/12/14 16:34:42 WARN clients.NetworkClient: Error while fetching metadata with correlation id 21 : {test1=LEADER_NOT_AVAILABLE}
changing the port above from 9092 to another port makes the consumer starting, but still if i type something in the producer, i don't get anything in the consumer console
kafka process are running under this ports
# netstat -apn|grep 17897
tcp 0 0 0.0.0.0:9393 0.0.0.0:* LISTEN 17897/java
tcp 0 0 0.0.0.0:35188 0.0.0.0:* LISTEN 17897/java
tcp 0 0 [IP]:9092 0.0.0.0:* LISTEN 17897/java
tcp 0 0 0.0.0.0:37801 0.0.0.0:* LISTEN 17897/java
tcp 0 0 0.0.0.0:24042 0.0.0.0:* LISTEN 17897/java
Probably it's my fault in messing up somewhere, below the files used for testing; in CM the protocol has been set at SASL_PLAINTEXT, sentry has been enable in kafka configuration, not much else than that. SSL not enabled
/etc/kafka/jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
consumer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
bootstrap.servers=[broker]:9092
client.properties
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
bootstrap.servers=[broker]:9092
sasl.mechanism=GSSAPI
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/jaas.conf"
also note that i redacted the IP/hosts in the configuration/outputs
Sorry for the long post
Regards,
Matteo
Created 02-09-2018 05:39 AM
Hi @mlussana1,
I have configured Kafka(2.2.0) with Sentry enabled in Kerberized environment and be able to use it as channel for Flume in CDH 5.13.
First of all did you add the kafka to the Allowed Connecting Users(sentry.service.allow.connect) in sentry configuration?And in order to give privilege your user must be in one of the sentry admin groups which are listed in
 
					
				
				
			
		
