Support Questions

Find answers, ask questions, and share your expertise

Error reading Kafka topic from Spark: Unable to obtain password from user

avatar
New Contributor

I am receiving the following error:

18/12/19 16:43:32 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 18/12/19 16:43:32 ERROR streaming.StreamExecution: Query [id = b318a686-395d-4b47-b6f8-3f7190bf4f36, runId = 9b6245f4-6063-4ccb-819e-af464cf5723a] te rminated with error org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0. 0 (TID 3, vminbgdevdtnd02.axisb.com, executor 1): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.createConsumer(CachedKafkaConsumer.scala:56) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.<init>(CachedKafkaConsumer.scala:45) at org.apache.spark.sql.kafka010.CachedKafkaConsumer$.getOrCreate(CachedKafkaConsumer.scala:376) at org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:137) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Unable to obtain password from user


The code I am running is as follows:

export SPARK_KAFKA_VERSION=0.10

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/143637/spark_jaas.conf"

export SPARK_SUBMIT_OPTS='-Djava.security.auth.login.config=/home/143637/spark_jaas.conf'

spark2-shell --files spark_jaas.conf,143637.keytab,truststore.jks,host.keystore --driver-java-options "-Djava.security.auth.login.config=spark_jaas.conf" --conf "spark.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf -Dsun.security.krb5.debug=false -Djavax.net.ssl.trustStore=truststore.jks -Djavax.net.ssl.trustStorePassword=password" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf -Dsun.security.krb5.debug=false -Djavax.net.ssl.trustStore=truststore.jks -Djavax.net.ssl.trustStorePassword=password" --queue fraud --jars /home/143637/spark-sql-kafka-0-10_2.11-2.1.1.jar --keytab /home/143637/143637.keytab

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import org.apache.log4j.Logger

import org.apache.log4j.Level

Logger.getLogger("org").setLevel(Level.ERROR)

Logger.getLogger("akka").setLevel(Level.ERROR)

import kafka.serializer.StringDecoder

import org.apache.spark.streaming.kafka._

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.storage.StorageLevel

import StorageLevel._

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.flume._

import org.apache.flume

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SQLContext

import org.apache.spark.streaming.Duration

import sqlContext.implicits._

import org.apache.spark.sql.functions.col

import org.apache.spark.sql.functions.lit

import java.util.Calendar

import java.text.SimpleDateFormat

val kafka = spark.readStream.format("kafka").option("kafka.security.protocol", "SASL_SSL").option("kafka.sasl.kerberos.service.name", "kafka").option("kafka.ssl.truststore.location", "/opt/cloudera/security/truststore/truststore.jks").option("kafka.ssl.truststore.password", "password").option("kafka.ssl.keystore.location", "/opt/cloudera/security/jks/host.keystore").option("kafka.ssl.keystore.password", "password").option("kafka.metadata.broker.list", "vminbgdevnd01.axisb.com:9093").option("kafka.bootstrap.servers", "vminbgdevnd01.axisb.com:9093").option("subscribe", "test_topic3").load()

val df = kafka.select(explode(split($"value".cast("string"), ",")).as("word"))

val query = df.writeStream.format("console").start()

1 REPLY 1

avatar
Rising Star

Hi @Aakriti Batra,

The problem seems to be in the JAAS file passed to the executor, it would help to see it's content, but I'd rather suggest you to read this whole article instead:

https://community.hortonworks.com/articles/56704/secure-kafka-java-producer-with-kerberos.html