Created on 06-16-2017 08:59 AM - edited 09-16-2022 04:46 AM
Hello,
I have CDH 5.10.1, SPARK2.1, KAFKA2.1 (0.10) - all of them kerberized.
I was able to connect to Kafka from kafka CLI tools, as described here:
https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html#concept_lcn_4mm_s5
But I can not connect as Spark2 Consumer with Direct Stream method, because jaas.conf can't be finden, despite it is properly distributed to HDFS while loading by spark2-submit.
I was trying those 2 documentations:
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html
My code simply counts elements in a batch of a stream.
linking in build.sbt:
isegrim@sparkgw:~/kafka-spark-krb-test$ cat build.sbt import sbt.Keys.scalaVersion lazy val root = (project in file(".")). settings( name := "kafka-spark-krb-test", version := "1.0", scalaVersion := "2.11.8" ) resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/" lazy val spark = Seq( "spark-core", "spark-streaming", "spark-streaming-kafka-0-10" ).map("org.apache.spark" %% _ % "2.1.0.cloudera1" % "provided") libraryDependencies ++= spark assemblyMergeStrategy in assembly := { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) } assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) assemblyJarName in assembly := "kafka-spark-krb-test.jar"
Main code
isegrim@sparkgw:~/kafka-spark-krb-test$ cat src/main/scala/af/spark2/tests/Main.scala
package af.spark2.tests import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.kafka.common.serialization.StringDeserializer object Main { val streamingBatchInterval = Seconds(60) val streamingTimeout = 432000000 def main(args: Array[String]): Unit = { val conf = new SparkConf() val ssc = new StreamingContext(conf, streamingBatchInterval) ssc.checkpoint("spark/checkpoint/kafka-spark-krb-test") val kafkaParams = Map[String,Object]( "bootstrap.servers" -> "broker1:9092,broker2:9092", "security.protocol" -> "SASL_PLAINTEXT", "sasl.kerberos.service.name" -> "kafka", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "kafka-spark-krb-test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("mytopic") val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) messages.count().print() ssc.start() ssc.awaitTerminationOrTimeout(streamingTimeout) ssc.stop(true, true) } }
I start the app like this:
isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar
Some log + error I get:
... 17/06/16 17:16:38 INFO yarn.Client: Uploading resource file:/home/isegrim/.keytab/isegrim.keytab -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/isegrim.keytab 17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/home/isegrim/jaas.conf#jaas.conf -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/jaas.conf 17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/tmp/spark-f1a0e9a4-0c13-495a-a904-2dd1ca1303c1/__spark_conf__1583085251581943316.zip -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/__spark_conf__.zip 17/06/16 17:16:39 INFO spark.SecurityManager: Changing view acls to: isegrim 17/06/16 17:16:39 INFO spark.SecurityManager: Changing modify acls to: isegrim ... 17/06/16 17:16:49 INFO dstream.ForEachDStream: Slide time = 60000 ms 17/06/16 17:16:49 INFO dstream.ForEachDStream: Storage level = Serialized 1x Replicated 17/06/16 17:16:49 INFO dstream.ForEachDStream: Checkpoint interval = null 17/06/16 17:16:49 INFO dstream.ForEachDStream: Remember interval = 60000 ms 17/06/16 17:16:49 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1b485c8b 17/06/16 17:16:49 INFO consumer.ConsumerConfig: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [broker1:9092, broker2:9092] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = kafka-spark-krb-test retry.backoff.ms = 100 ssl.secure.random.implementation = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = kafka sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = SASL_PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest 17/06/16 17:16:49 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540) at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at af.spark2.tests.Main$.main(Main.scala:34) at af.spark2.tests.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540) at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory) at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137) at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at javax.security.auth.login.Configuration$2.run(Configuration.java:255) at javax.security.auth.login.Configuration$2.run(Configuration.java:247) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246) at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:47) at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297) at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103) at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45) at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78) ... 25 more Caused by: java.io.IOException: ./jaas.conf (No such file or directory) at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666) at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:262) at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135) ... 41 more
Is it possible to connect to kerberized Kafka from SparkStreaming using Cloudera's CDH, KAFKA and SPARK2 or Spark1.6 from CDH?
Best Regards
Created 06-16-2017 03:52 PM
Hi,
Thanks for your hint mbigelow.
As far as I know Unix PATH is just a PATH, and starting it by ./ or without has the same meaning - in this case current working directory, which is application hdfs directory.
But I've tried your sugestion, and it inspired me for furter investigation:
isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
But it gave me the same log+error (without './' in error message) 😕
Caused by: java.io.IOException: jaas.conf (No such file or directory)
I've tried hdfs path to jaas.conf but it also failed:
hdfs dfs -put jaas.conf SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
Error:
Caused by: java.io.IOException: hdfs:///user/isegrim/jaas.conf (No such file or directory)
But I've put the file into each worker's temp and now it looks like executors read it from local operating system storage, rather then from hdfs application CWD (--files flag):
for i in {1..4}; do scp jaas.conf worker${i}:/tmp/jaas.conf done SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
This is the new error:
... 17/06/17 00:06:28 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) ... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86) ...
So I've distributed the keytab to every worker node's operating system's local filesystem and I've change the path from app HDFS CWD to OS local filesystem and it works!
$ cat jaas.conf KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/home/isegrim/isegrim.keytab" principal="isegrim@TEST.COM"; };
Distribute new jaas.conf and keytab:
$ for i in {1..4}; do scp isegrim.keytab worker${i}:/home/isegrim/isegrim.keytab done $ for i in {1..4}; do scp jaas.conf worker${i}:/tmp/jaas.conf done
Run the app and check logs:
SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar ... 17/06/17 00:23:17 INFO streaming.StreamingContext: StreamingContext started ... ------------------------------------------- Time: 1497651840000 ms ------------------------------------------- 58
Thank you mbigelow for inspiration!
Created 06-27-2017 02:48 AM
The problem is, with "--deploy-mode client" it looks for jaas.conf in current directory by default. When you use the "--deploy-mode cluster" it works, but you have to pass keytab (and rename it with #) in "--files" too.
Created 06-16-2017 11:06 AM
Created 06-16-2017 03:52 PM
Hi,
Thanks for your hint mbigelow.
As far as I know Unix PATH is just a PATH, and starting it by ./ or without has the same meaning - in this case current working directory, which is application hdfs directory.
But I've tried your sugestion, and it inspired me for furter investigation:
isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
But it gave me the same log+error (without './' in error message) 😕
Caused by: java.io.IOException: jaas.conf (No such file or directory)
I've tried hdfs path to jaas.conf but it also failed:
hdfs dfs -put jaas.conf SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
Error:
Caused by: java.io.IOException: hdfs:///user/isegrim/jaas.conf (No such file or directory)
But I've put the file into each worker's temp and now it looks like executors read it from local operating system storage, rather then from hdfs application CWD (--files flag):
for i in {1..4}; do scp jaas.conf worker${i}:/tmp/jaas.conf done SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
This is the new error:
... 17/06/17 00:06:28 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) ... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86) ...
So I've distributed the keytab to every worker node's operating system's local filesystem and I've change the path from app HDFS CWD to OS local filesystem and it works!
$ cat jaas.conf KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/home/isegrim/isegrim.keytab" principal="isegrim@TEST.COM"; };
Distribute new jaas.conf and keytab:
$ for i in {1..4}; do scp isegrim.keytab worker${i}:/home/isegrim/isegrim.keytab done $ for i in {1..4}; do scp jaas.conf worker${i}:/tmp/jaas.conf done
Run the app and check logs:
SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar ... 17/06/17 00:23:17 INFO streaming.StreamingContext: StreamingContext started ... ------------------------------------------- Time: 1497651840000 ms ------------------------------------------- 58
Thank you mbigelow for inspiration!
Created 06-16-2017 04:15 PM
Created on 06-17-2017 02:41 AM - edited 06-17-2017 02:42 AM
No, no, jaas.conf before distributing it to worker nodes was:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./isegrim.keytab" principal="isegrim@TEST.COM"; };
And after I have figured out, that KafkaConsumer constructor looks for jaas.conf and keytab in executor container's local machine's filesystem instead HDFS application CWD, then I've distributed keytab to linux FS and I've changed path to keytab from ./isegrim.keytab (I thought it will be local to Spark application HDFS CWD) to /home/isegrim/isegrim.keytab.
I agree that this should not work like this. Now I have to take care of distributing and securing keytab and jaas.conf files.
I think it must be something with the Kafka Consumer constructor, which don't know that those files are distributed with the application in it's CWD on HDFS and tries to look for them in local machine's filesystem.
If I will have more time I'll try to check the code of those constructors to figure it out.
Created 06-27-2017 02:48 AM
The problem is, with "--deploy-mode client" it looks for jaas.conf in current directory by default. When you use the "--deploy-mode cluster" it works, but you have to pass keytab (and rename it with #) in "--files" too.
Created 08-11-2017 03:16 AM
So, summarizing:
When starting Spark with --deploy-mode client one must distribute Jaas file and keytab to every machine.
When starting Spark with --deploy-mode cluster one must distribute Jaas file and keytab with --files and/or --keytab.