Created on 06-16-2017 08:59 AM - edited 09-16-2022 04:46 AM
Hello,
I have CDH 5.10.1, SPARK2.1, KAFKA2.1 (0.10) - all of them kerberized.
I was able to connect to Kafka from kafka CLI tools, as described here:
https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html#concept_lcn_4mm_s5
But I can not connect as Spark2 Consumer with Direct Stream method, because jaas.conf can't be finden, despite it is properly distributed to HDFS while loading by spark2-submit.
I was trying those 2 documentations:
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html
My code simply counts elements in a batch of a stream.
linking in build.sbt:
isegrim@sparkgw:~/kafka-spark-krb-test$ cat build.sbt
import sbt.Keys.scalaVersion
lazy val root = (project in file(".")).
settings(
name := "kafka-spark-krb-test",
version := "1.0",
scalaVersion := "2.11.8"
)
resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
lazy val spark = Seq(
"spark-core",
"spark-streaming",
"spark-streaming-kafka-0-10"
).map("org.apache.spark" %% _ % "2.1.0.cloudera1" % "provided")
libraryDependencies ++= spark
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
assemblyJarName in assembly := "kafka-spark-krb-test.jar"
Main code
isegrim@sparkgw:~/kafka-spark-krb-test$ cat src/main/scala/af/spark2/tests/Main.scala
package af.spark2.tests import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.kafka.common.serialization.StringDeserializer object Main { val streamingBatchInterval = Seconds(60) val streamingTimeout = 432000000 def main(args: Array[String]): Unit = { val conf = new SparkConf() val ssc = new StreamingContext(conf, streamingBatchInterval) ssc.checkpoint("spark/checkpoint/kafka-spark-krb-test") val kafkaParams = Map[String,Object]( "bootstrap.servers" -> "broker1:9092,broker2:9092", "security.protocol" -> "SASL_PLAINTEXT", "sasl.kerberos.service.name" -> "kafka", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "kafka-spark-krb-test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("mytopic") val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) messages.count().print() ssc.start() ssc.awaitTerminationOrTimeout(streamingTimeout) ssc.stop(true, true) } }
I start the app like this:
isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jar
Some log + error I get:
...
17/06/16 17:16:38 INFO yarn.Client: Uploading resource file:/home/isegrim/.keytab/isegrim.keytab -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/isegrim.keytab
17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/home/isegrim/jaas.conf#jaas.conf -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/jaas.conf
17/06/16 17:16:39 INFO yarn.Client: Uploading resource file:/tmp/spark-f1a0e9a4-0c13-495a-a904-2dd1ca1303c1/__spark_conf__1583085251581943316.zip -> hdfs:///user/isegrim/.sparkStaging/application_1497619292421_0004/__spark_conf__.zip
17/06/16 17:16:39 INFO spark.SecurityManager: Changing view acls to: isegrim
17/06/16 17:16:39 INFO spark.SecurityManager: Changing modify acls to: isegrim
...
17/06/16 17:16:49 INFO dstream.ForEachDStream: Slide time = 60000 ms
17/06/16 17:16:49 INFO dstream.ForEachDStream: Storage level = Serialized 1x Replicated
17/06/16 17:16:49 INFO dstream.ForEachDStream: Checkpoint interval = null
17/06/16 17:16:49 INFO dstream.ForEachDStream: Remember interval = 60000 ms
17/06/16 17:16:49 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1b485c8b
17/06/16 17:16:49 INFO consumer.ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [broker1:9092, broker2:9092]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = kafka-spark-krb-test
retry.backoff.ms = 100
ssl.secure.random.implementation = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = SASL_PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest
17/06/16 17:16:49 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at af.spark2.tests.Main$.main(Main.scala:34)
at af.spark2.tests.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.SecurityException: java.io.IOException: ./jaas.conf (No such file or directory)
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:47)
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
... 25 more
Caused by: java.io.IOException: ./jaas.conf (No such file or directory)
at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)
at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:262)
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135)
... 41 more
Is it possible to connect to kerberized Kafka from SparkStreaming using Cloudera's CDH, KAFKA and SPARK2 or Spark1.6 from CDH?
Best Regards
Created 06-16-2017 03:52 PM
Hi,
Thanks for your hint mbigelow.
As far as I know Unix PATH is just a PATH, and starting it by ./ or without has the same meaning - in this case current working directory, which is application hdfs directory.
But I've tried your sugestion, and it inspired me for furter investigation:
isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
But it gave me the same log+error (without './' in error message) 😕
Caused by: java.io.IOException: jaas.conf (No such file or directory)
I've tried hdfs path to jaas.conf but it also failed:
hdfs dfs -put jaas.conf SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
Error:
Caused by: java.io.IOException: hdfs:///user/isegrim/jaas.conf (No such file or directory)
But I've put the file into each worker's temp and now it looks like executors read it from local operating system storage, rather then from hdfs application CWD (--files flag):
for i in {1..4}; do
scp jaas.conf worker${i}:/tmp/jaas.conf
done
SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jarThis is the new error:
...
17/06/17 00:06:28 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
...
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
...
So I've distributed the keytab to every worker node's operating system's local filesystem and I've change the path from app HDFS CWD to OS local filesystem and it works!
$ cat jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/isegrim/isegrim.keytab"
principal="isegrim@TEST.COM";
};Distribute new jaas.conf and keytab:
$ for i in {1..4}; do
scp isegrim.keytab worker${i}:/home/isegrim/isegrim.keytab
done
$ for i in {1..4}; do
scp jaas.conf worker${i}:/tmp/jaas.conf
doneRun the app and check logs:
SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar ... 17/06/17 00:23:17 INFO streaming.StreamingContext: StreamingContext started ... ------------------------------------------- Time: 1497651840000 ms ------------------------------------------- 58
Thank you mbigelow for inspiration!
Created 06-27-2017 02:48 AM
The problem is, with "--deploy-mode client" it looks for jaas.conf in current directory by default. When you use the "--deploy-mode cluster" it works, but you have to pass keytab (and rename it with #) in "--files" too.
Created 06-16-2017 11:06 AM
Created 06-16-2017 03:52 PM
Hi,
Thanks for your hint mbigelow.
As far as I know Unix PATH is just a PATH, and starting it by ./ or without has the same meaning - in this case current working directory, which is application hdfs directory.
But I've tried your sugestion, and it inspired me for furter investigation:
isegrim@sparkgw:~/kafka-spark-krb-test$ SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
But it gave me the same log+error (without './' in error message) 😕
Caused by: java.io.IOException: jaas.conf (No such file or directory)
I've tried hdfs path to jaas.conf but it also failed:
hdfs dfs -put jaas.conf SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=hdfs:///user/isegrim/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar
Error:
Caused by: java.io.IOException: hdfs:///user/isegrim/jaas.conf (No such file or directory)
But I've put the file into each worker's temp and now it looks like executors read it from local operating system storage, rather then from hdfs application CWD (--files flag):
for i in {1..4}; do
scp jaas.conf worker${i}:/tmp/jaas.conf
done
SPARK_KAFKA_VERSION=0.10 spark2-submit \
--master yarn \
--deploy-mode client \
--queue myqueue \
--keytab /home/isegrim/isegrim.keytab \
--principal isegrim@TEST.COM \
--files /home/isegrim/jaas.conf#jaas.conf \
--driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \
target/scala-2.11/kafka-spark-krb-test.jarThis is the new error:
...
17/06/17 00:06:28 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
...
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
...
So I've distributed the keytab to every worker node's operating system's local filesystem and I've change the path from app HDFS CWD to OS local filesystem and it works!
$ cat jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/isegrim/isegrim.keytab"
principal="isegrim@TEST.COM";
};Distribute new jaas.conf and keytab:
$ for i in {1..4}; do
scp isegrim.keytab worker${i}:/home/isegrim/isegrim.keytab
done
$ for i in {1..4}; do
scp jaas.conf worker${i}:/tmp/jaas.conf
doneRun the app and check logs:
SPARK_KAFKA_VERSION=0.10 spark2-submit \ --master yarn \ --deploy-mode client \ --queue myqueue \ --keytab /home/isegrim/isegrim.keytab \ --principal isegrim@TEST.COM \ --files /home/isegrim/jaas.conf#jaas.conf \ --driver-java-options "-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf" \ target/scala-2.11/kafka-spark-krb-test.jar ... 17/06/17 00:23:17 INFO streaming.StreamingContext: StreamingContext started ... ------------------------------------------- Time: 1497651840000 ms ------------------------------------------- 58
Thank you mbigelow for inspiration!
Created 06-16-2017 04:15 PM
Created on 06-17-2017 02:41 AM - edited 06-17-2017 02:42 AM
No, no, jaas.conf before distributing it to worker nodes was:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="./isegrim.keytab"
principal="isegrim@TEST.COM";
};And after I have figured out, that KafkaConsumer constructor looks for jaas.conf and keytab in executor container's local machine's filesystem instead HDFS application CWD, then I've distributed keytab to linux FS and I've changed path to keytab from ./isegrim.keytab (I thought it will be local to Spark application HDFS CWD) to /home/isegrim/isegrim.keytab.
I agree that this should not work like this. Now I have to take care of distributing and securing keytab and jaas.conf files.
I think it must be something with the Kafka Consumer constructor, which don't know that those files are distributed with the application in it's CWD on HDFS and tries to look for them in local machine's filesystem.
If I will have more time I'll try to check the code of those constructors to figure it out.
Created 06-27-2017 02:48 AM
The problem is, with "--deploy-mode client" it looks for jaas.conf in current directory by default. When you use the "--deploy-mode cluster" it works, but you have to pass keytab (and rename it with #) in "--files" too.
Created 08-11-2017 03:16 AM
So, summarizing:
When starting Spark with --deploy-mode client one must distribute Jaas file and keytab to every machine.
When starting Spark with --deploy-mode cluster one must distribute Jaas file and keytab with --files and/or --keytab.