Support Questions

Find answers, ask questions, and share your expertise

Unable to connect to kerberized Kafka 2.1 (0.10) from Spark 2.1

avatar
Rising Star

Hello,

I have CDH 5.10.1, SPARK2.1, KAFKA2.1 (0.10) - all of them kerberized.
I was able to connect to Kafka from kafka CLI tools, as described here:
https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html#concept_lcn_4mm_s5

But I can not connect as Spark2 Consumer with Direct Stream method, because jaas.conf can't be finden, despite it is properly distributed to HDFS while loading by spark2-submit.


I was trying those 2 documentations:
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html

My code simply counts elements in a batch of a stream.
linking in build.sbt:

isegrim@sparkgw:~/kafka-spark-krb-test$ cat build.sbt
import sbt.Keys.scalaVersion

lazy val root = (project in file(".")).
        settings(
                name := "kafka-spark-krb-test",
                version := "1.0",
                scalaVersion := "2.11.8"
)

resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"

lazy val spark = Seq(
        "spark-core",
        "spark-streaming",
        "spark-streaming-kafka-0-10"
).map("org.apache.spark" %% _ % "2.1.0.cloudera1" % "provided")
libraryDependencies ++= spark

assemblyMergeStrategy in assembly := {
        case PathList("META-INF", xs @ _*) => MergeStrategy.discard
        case x =>
                val oldStrategy = (assemblyMergeStrategy in assembly).value
                oldStrategy(x)
}

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
assemblyJarName in assembly := "kafka-spark-krb-test.jar"



Main code

isegrim@sparkgw:~/kafka-spark-krb-test$ cat src/main/scala/af/spark2/tests/Main.scala
package af.spark2.tests import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.kafka.common.serialization.StringDeserializer object Main { val streamingBatchInterval = Seconds(60) val streamingTimeout = 432000000 def main(args: Array[String]): Unit = { val conf = new SparkConf() val ssc = new StreamingContext(conf, streamingBatchInterval) ssc.checkpoint("spark/checkpoint/kafka-spark-krb-test") val kafkaParams = Map[String,Object]( "bootstrap.servers" -> "broker1:9092,broker2:9092", "security.protocol" -> "SASL_PLAINTEXT", "sasl.kerberos.service.name" -> "kafka", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "kafka-spark-krb-test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("mytopic") val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) messages.count().print() ssc.start() ssc.awaitTerminationOrTimeout(streamingTimeout) ssc.stop(true, true) } }




I start the app like this:

isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar



Some log + error I get:

...
17/06/16 17:16:38 INFO yarn.Client: Uploading resource file:/home/isegrim/.keytab/isegrim.keytab -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/isegrim.keytab
17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/home/isegrim/jaas.conf#jaas.conf -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/jaas.conf
17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/tmp/spark-f1a0e9a4-0c13-495a-a904-2dd1ca1303c1/__spark_conf__1583085251581943316.zip -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/__spark_conf__.zip
17/06/16 17:16:39 INFO spark.SecurityManager: Changing view acls to: isegrim
17/06/16 17:16:39 INFO spark.SecurityManager: Changing modify acls to: isegrim
...
17/06/16 17:16:49 INFO dstream.ForEachDStream: Slide time = 60000 ms
17/06/16 17:16:49 INFO dstream.ForEachDStream: Storage level = Serialized 1x Replicated
17/06/16 17:16:49 INFO dstream.ForEachDStream: Checkpoint interval = null
17/06/16 17:16:49 INFO dstream.ForEachDStream: Remember interval = 60000 ms
17/06/16 17:16:49 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1b485c8b
17/06/16 17:16:49 INFO consumer.ConsumerConfig: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [broker1:9092, broker2:9092]
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id =
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 2147483647
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        group.id = kafka-spark-krb-test
        retry.backoff.ms = 100
        ssl.secure.random.implementation = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = kafka
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = SASL_PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = latest

17/06/16 17:16:49 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
        at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
        at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
        at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
        at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
        at af.spark2.tests.Main$.main(Main.scala:34)
        at af.spark2.tests.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
        at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
                at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory)
        at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
        at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
        at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:47)
        at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
        at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)
        at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45)
        at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
        ... 25 more
Caused by: java.io.IOException: ./jaas.conf (No such file or directory)
        at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)
        at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:262)
        at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135)
        ... 41 more



Is it possible to connect to kerberized Kafka from SparkStreaming using Cloudera's CDH, KAFKA and SPARK2 or Spark1.6 from CDH?

Best Regards

2 ACCEPTED SOLUTIONS

avatar
Rising Star

Hi,

 

Thanks for your hint mbigelow.

As far as I know Unix PATH is just a PATH, and starting it by ./ or without has the same meaning - in this case current working directory, which is application hdfs directory.

But I've tried your sugestion, and it inspired me for furter investigation:

isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar

But it gave me the same log+error (without './' in error message) 😕

Caused by: java.io.IOException: jaas.conf (No such file or directory)

 

I've tried hdfs path to jaas.conf but it also failed:

hdfs dfs -put jaas.conf

SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar

Error:

Caused by: java.io.IOException: hdfs:///user/isegrim/jaas.conf (No such file or directory)

But I've put the file into each worker's temp and now it looks like executors read it from local operating system storage, rather then from hdfs application CWD (--files flag):

for i in {1..4}; do
    scp jaas.conf worker${i}:/tmp/jaas.conf
done

SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar

This is the new error:

...
17/06/17 00:06:28 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
...
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
...

 

 

 

So I've distributed the keytab to every worker node's operating system's local filesystem and I've change the path from app HDFS CWD to OS local filesystem and it works!

$ cat jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/isegrim/isegrim.keytab"
principal="isegrim@TEST.COM";
};

Distribute new jaas.conf and keytab:

$ for i in {1..4}; do
    scp isegrim.keytab worker${i}:/home/isegrim/isegrim.keytab
done
$ for i in {1..4}; do
    scp jaas.conf worker${i}:/tmp/jaas.conf
done

Run the app and check logs:

SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar

...
17/06/17 00:23:17 INFO streaming.StreamingContext: StreamingContext started
...
-------------------------------------------
Time: 1497651840000 ms
-------------------------------------------
58

Thank you mbigelow for inspiration!

View solution in original post

avatar
New Contributor

The  problem is, with "--deploy-mode client" it looks for jaas.conf in current directory by default. When you use the  "--deploy-mode cluster" it works, but you have to pass keytab (and rename it with #) in "--files" too. 

View solution in original post

6 REPLIES 6

avatar
Champion
Try taking off the './'. Distributing to the executors will place it in the working directory of each one and can be referenced as such.

The annotation of './' means to run a file not read a file.

avatar
Rising Star

Hi,

 

Thanks for your hint mbigelow.

As far as I know Unix PATH is just a PATH, and starting it by ./ or without has the same meaning - in this case current working directory, which is application hdfs directory.

But I've tried your sugestion, and it inspired me for furter investigation:

isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar

But it gave me the same log+error (without './' in error message) 😕

Caused by: java.io.IOException: jaas.conf (No such file or directory)

 

I've tried hdfs path to jaas.conf but it also failed:

hdfs dfs -put jaas.conf

SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar

Error:

Caused by: java.io.IOException: hdfs:///user/isegrim/jaas.conf (No such file or directory)

But I've put the file into each worker's temp and now it looks like executors read it from local operating system storage, rather then from hdfs application CWD (--files flag):

for i in {1..4}; do
    scp jaas.conf worker${i}:/tmp/jaas.conf
done

SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar

This is the new error:

...
17/06/17 00:06:28 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
...
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
...

 

 

 

So I've distributed the keytab to every worker node's operating system's local filesystem and I've change the path from app HDFS CWD to OS local filesystem and it works!

$ cat jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/isegrim/isegrim.keytab"
principal="isegrim@TEST.COM";
};

Distribute new jaas.conf and keytab:

$ for i in {1..4}; do
    scp isegrim.keytab worker${i}:/home/isegrim/isegrim.keytab
done
$ for i in {1..4}; do
    scp jaas.conf worker${i}:/tmp/jaas.conf
done

Run the app and check logs:

SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar

...
17/06/17 00:23:17 INFO streaming.StreamingContext: StreamingContext started
...
-------------------------------------------
Time: 1497651840000 ms
-------------------------------------------
58

Thank you mbigelow for inspiration!

avatar
Champion
I'll be... it doesn't care about './'. I have only ever seen that used to launch scripts/executables without specifying the interpreter.

Anyway, I am glad you got it worked out but something is not working as it should.

The --files option should upload any files listed, also files in the working directory when spark-submit was ran, and any --jars should all be distributed across the cluster, either local or hdfs. You should not have to distributed files manually beforehand. Especially the keytab file as a compromise to it can be devastating.

The keytab file should have been copied to the AM containers working directory. I think the issue there was that you listed the original full path in the jaas.conf.

From the Cloudera Spark docs:

"The full path to the file that contains the keytab for the principal. This keytab is copied to the node running the ApplicationMaster using the Secure Distributed Cache, for periodically renewing the login tickets and the delegation tokens. For information on setting up the principal and keytab, see Configuring a Cluster with Custom Kerberos Principalsand Spark Authentication."

https://www.cloudera.com/documentation/enterprise/5-5-x/topics/sg_spark_auth.html

avatar
Rising Star

No, no, jaas.conf before distributing it to worker nodes was:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="./isegrim.keytab"
principal="isegrim@TEST.COM";
};

And after I have figured out, that KafkaConsumer constructor looks for jaas.conf and keytab in executor container's local machine's filesystem instead HDFS application CWD, then I've distributed keytab to linux FS and I've changed path to keytab from ./isegrim.keytab (I thought it will be local to Spark application HDFS CWD) to /home/isegrim/isegrim.keytab.

 

I agree that this should not work like this. Now I have to take care of distributing and securing keytab and jaas.conf files.

I think it must be something with the Kafka Consumer constructor, which don't know that those files are distributed with the application in it's CWD on HDFS and tries to look for them in local machine's filesystem.

 

If I will have more time I'll try to check the code of those constructors to figure it out.

 

avatar
New Contributor

The  problem is, with "--deploy-mode client" it looks for jaas.conf in current directory by default. When you use the  "--deploy-mode cluster" it works, but you have to pass keytab (and rename it with #) in "--files" too. 

avatar
Rising Star

So, summarizing:

When starting Spark with --deploy-mode client one must distribute Jaas file and keytab to every machine.

When starting Spark with --deploy-mode cluster one must distribute Jaas file and keytab with --files and/or --keytab.