- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Unable to connect to kerberized Kafka 2.1 (0.10) from Spark 2.1
- Labels:
-
Apache Kafka
-
Apache Spark
-
Kerberos
Created on ‎06-16-2017 08:59 AM - edited ‎09-16-2022 04:46 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello,
I have CDH 5.10.1, SPARK2.1, KAFKA2.1 (0.10) - all of them kerberized.
I was able to connect to Kafka from kafka CLI tools, as described here:
https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html#concept_lcn_4mm_s5
But I can not connect as Spark2 Consumer with Direct Stream method, because jaas.conf can't be finden, despite it is properly distributed to HDFS while loading by spark2-submit.
I was trying those 2 documentations:
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html
My code simply counts elements in a batch of a stream.
linking in build.sbt:
isegrim@sparkgw:~/kafka-spark-krb-test$ cat build.sbt import sbt.Keys.scalaVersion lazy val root = (project in file(".")). settings( name := "kafka-spark-krb-test", version := "1.0", scalaVersion := "2.11.8" ) resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/" lazy val spark = Seq( "spark-core", "spark-streaming", "spark-streaming-kafka-0-10" ).map("org.apache.spark" %% _ % "2.1.0.cloudera1" % "provided") libraryDependencies ++= spark assemblyMergeStrategy in assembly := { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) } assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) assemblyJarName in assembly := "kafka-spark-krb-test.jar"
Main code
isegrim@sparkgw:~/kafka-spark-krb-test$ cat src/main/scala/af/spark2/tests/Main.scala
package af.spark2.tests import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.kafka.common.serialization.StringDeserializer object Main { val streamingBatchInterval = Seconds(60) val streamingTimeout = 432000000 def main(args: Array[String]): Unit = { val conf = new SparkConf() val ssc = new StreamingContext(conf, streamingBatchInterval) ssc.checkpoint("spark/checkpoint/kafka-spark-krb-test") val kafkaParams = Map[String,Object]( "bootstrap.servers" -> "broker1:9092,broker2:9092", "security.protocol" -> "SASL_PLAINTEXT", "sasl.kerberos.service.name" -> "kafka", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "kafka-spark-krb-test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("mytopic") val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) messages.count().print() ssc.start() ssc.awaitTerminationOrTimeout(streamingTimeout) ssc.stop(true, true) } }
I start the app like this:
isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar
Some log + error I get:
... 17/06/16 17:16:38 INFO yarn.Client: Uploading resource file:/home/isegrim/.keytab/isegrim.keytab -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/isegrim.keytab 17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/home/isegrim/jaas.conf#jaas.conf -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/jaas.conf 17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/tmp/spark-f1a0e9a4-0c13-495a-a904-2dd1ca1303c1/__spark_conf__1583085251581943316.zip -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/__spark_conf__.zip 17/06/16 17:16:39 INFO spark.SecurityManager: Changing view acls to: isegrim 17/06/16 17:16:39 INFO spark.SecurityManager: Changing modify acls to: isegrim ... 17/06/16 17:16:49 INFO dstream.ForEachDStream: Slide time = 60000 ms 17/06/16 17:16:49 INFO dstream.ForEachDStream: Storage level = Serialized 1x Replicated 17/06/16 17:16:49 INFO dstream.ForEachDStream: Checkpoint interval = null 17/06/16 17:16:49 INFO dstream.ForEachDStream: Remember interval = 60000 ms 17/06/16 17:16:49 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1b485c8b 17/06/16 17:16:49 INFO consumer.ConsumerConfig: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [broker1:9092, broker2:9092] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = kafka-spark-krb-test retry.backoff.ms = 100 ssl.secure.random.implementation = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = kafka sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = SASL_PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest 17/06/16 17:16:49 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540) at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at af.spark2.tests.Main$.main(Main.scala:34) at af.spark2.tests.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540) at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory) at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137) at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at javax.security.auth.login.Configuration$2.run(Configuration.java:255) at javax.security.auth.login.Configuration$2.run(Configuration.java:247) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246) at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:47) at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297) at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103) at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45) at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78) ... 25 more Caused by: java.io.IOException: ./jaas.conf (No such file or directory) at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666) at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:262) at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135) ... 41 more
Is it possible to connect to kerberized Kafka from SparkStreaming using Cloudera's CDH, KAFKA and SPARK2 or Spark1.6 from CDH?
Best Regards
Created ‎06-16-2017 03:52 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi,
Thanks for your hint mbigelow.
As far as I know Unix PATH is just a PATH, and starting it by ./ or without has the same meaning - in this case current working directory, which is application hdfs directory.
But I've tried your sugestion, and it inspired me for furter investigation:
isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
But it gave me the same log+error (without './' in error message) 😕
Caused by: java.io.IOException: jaas.conf (No such file or directory)
I've tried hdfs path to jaas.conf but it also failed:
hdfs dfs -put jaas.conf SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
Error:
Caused by: java.io.IOException: hdfs:///user/isegrim/jaas.conf (No such file or directory)
But I've put the file into each worker's temp and now it looks like executors read it from local operating system storage, rather then from hdfs application CWD (--files flag):
for i in {1..4}; do scp jaas.conf worker${i}:/tmp/jaas.conf done SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
This is the new error:
... 17/06/17 00:06:28 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) ... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86) ...
So I've distributed the keytab to every worker node's operating system's local filesystem and I've change the path from app HDFS CWD to OS local filesystem and it works!
$ cat jaas.conf KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/home/isegrim/isegrim.keytab" principal="isegrim@TEST.COM"; };
Distribute new jaas.conf and keytab:
$ for i in {1..4}; do scp isegrim.keytab worker${i}:/home/isegrim/isegrim.keytab done $ for i in {1..4}; do scp jaas.conf worker${i}:/tmp/jaas.conf done
Run the app and check logs:
SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar ... 17/06/17 00:23:17 INFO streaming.StreamingContext: StreamingContext started ... ------------------------------------------- Time: 1497651840000 ms ------------------------------------------- 58
Thank you mbigelow for inspiration!
Created ‎06-27-2017 02:48 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The problem is, with "--deploy-mode client" it looks for jaas.conf in current directory by default. When you use the "--deploy-mode cluster" it works, but you have to pass keytab (and rename it with #) in "--files" too.
Created ‎06-16-2017 11:06 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The annotation of './' means to run a file not read a file.
Created ‎06-16-2017 03:52 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi,
Thanks for your hint mbigelow.
As far as I know Unix PATH is just a PATH, and starting it by ./ or without has the same meaning - in this case current working directory, which is application hdfs directory.
But I've tried your sugestion, and it inspired me for furter investigation:
isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
But it gave me the same log+error (without './' in error message) 😕
Caused by: java.io.IOException: jaas.conf (No such file or directory)
I've tried hdfs path to jaas.conf but it also failed:
hdfs dfs -put jaas.conf SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
Error:
Caused by: java.io.IOException: hdfs:///user/isegrim/jaas.conf (No such file or directory)
But I've put the file into each worker's temp and now it looks like executors read it from local operating system storage, rather then from hdfs application CWD (--files flag):
for i in {1..4}; do scp jaas.conf worker${i}:/tmp/jaas.conf done SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
This is the new error:
... 17/06/17 00:06:28 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) ... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86) ...
So I've distributed the keytab to every worker node's operating system's local filesystem and I've change the path from app HDFS CWD to OS local filesystem and it works!
$ cat jaas.conf KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/home/isegrim/isegrim.keytab" principal="isegrim@TEST.COM"; };
Distribute new jaas.conf and keytab:
$ for i in {1..4}; do scp isegrim.keytab worker${i}:/home/isegrim/isegrim.keytab done $ for i in {1..4}; do scp jaas.conf worker${i}:/tmp/jaas.conf done
Run the app and check logs:
SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar ... 17/06/17 00:23:17 INFO streaming.StreamingContext: StreamingContext started ... ------------------------------------------- Time: 1497651840000 ms ------------------------------------------- 58
Thank you mbigelow for inspiration!
Created ‎06-16-2017 04:15 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Anyway, I am glad you got it worked out but something is not working as it should.
The --files option should upload any files listed, also files in the working directory when spark-submit was ran, and any --jars should all be distributed across the cluster, either local or hdfs. You should not have to distributed files manually beforehand. Especially the keytab file as a compromise to it can be devastating.
The keytab file should have been copied to the AM containers working directory. I think the issue there was that you listed the original full path in the jaas.conf.
From the Cloudera Spark docs:
"The full path to the file that contains the keytab for the principal. This keytab is copied to the node running the ApplicationMaster using the Secure Distributed Cache, for periodically renewing the login tickets and the delegation tokens. For information on setting up the principal and keytab, see Configuring a Cluster with Custom Kerberos Principalsand Spark Authentication."
https://www.cloudera.com/documentation/enterprise/5-5-x/topics/sg_spark_auth.html
Created on ‎06-17-2017 02:41 AM - edited ‎06-17-2017 02:42 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
No, no, jaas.conf before distributing it to worker nodes was:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./isegrim.keytab" principal="isegrim@TEST.COM"; };
And after I have figured out, that KafkaConsumer constructor looks for jaas.conf and keytab in executor container's local machine's filesystem instead HDFS application CWD, then I've distributed keytab to linux FS and I've changed path to keytab from ./isegrim.keytab (I thought it will be local to Spark application HDFS CWD) to /home/isegrim/isegrim.keytab.
I agree that this should not work like this. Now I have to take care of distributing and securing keytab and jaas.conf files.
I think it must be something with the Kafka Consumer constructor, which don't know that those files are distributed with the application in it's CWD on HDFS and tries to look for them in local machine's filesystem.
If I will have more time I'll try to check the code of those constructors to figure it out.
Created ‎06-27-2017 02:48 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The problem is, with "--deploy-mode client" it looks for jaas.conf in current directory by default. When you use the "--deploy-mode cluster" it works, but you have to pass keytab (and rename it with #) in "--files" too.
Created ‎08-11-2017 03:16 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
So, summarizing:
When starting Spark with --deploy-mode client one must distribute Jaas file and keytab to every machine.
When starting Spark with --deploy-mode cluster one must distribute Jaas file and keytab with --files and/or --keytab.
