Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Error reading Kafka topic from Spark: Unable to obtain password from user

Error reading Kafka topic from Spark: Unable to obtain password from user

New Contributor

I am receiving the following error:

18/12/19 16:43:32 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 18/12/19 16:43:32 ERROR streaming.StreamExecution: Query [id = b318a686-395d-4b47-b6f8-3f7190bf4f36, runId = 9b6245f4-6063-4ccb-819e-af464cf5723a] te rminated with error org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0. 0 (TID 3, vminbgdevdtnd02.axisb.com, executor 1): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.createConsumer(CachedKafkaConsumer.scala:56) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.<init>(CachedKafkaConsumer.scala:45) at org.apache.spark.sql.kafka010.CachedKafkaConsumer$.getOrCreate(CachedKafkaConsumer.scala:376) at org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:137) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Unable to obtain password from user


The code I am running is as follows:

export SPARK_KAFKA_VERSION=0.10

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/143637/spark_jaas.conf"

export SPARK_SUBMIT_OPTS='-Djava.security.auth.login.config=/home/143637/spark_jaas.conf'

spark2-shell --files spark_jaas.conf,143637.keytab,truststore.jks,host.keystore --driver-java-options "-Djava.security.auth.login.config=spark_jaas.conf" --conf "spark.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf -Dsun.security.krb5.debug=false -Djavax.net.ssl.trustStore=truststore.jks -Djavax.net.ssl.trustStorePassword=password" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf -Dsun.security.krb5.debug=false -Djavax.net.ssl.trustStore=truststore.jks -Djavax.net.ssl.trustStorePassword=password" --queue fraud --jars /home/143637/spark-sql-kafka-0-10_2.11-2.1.1.jar --keytab /home/143637/143637.keytab

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import org.apache.log4j.Logger

import org.apache.log4j.Level

Logger.getLogger("org").setLevel(Level.ERROR)

Logger.getLogger("akka").setLevel(Level.ERROR)

import kafka.serializer.StringDecoder

import org.apache.spark.streaming.kafka._

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.storage.StorageLevel

import StorageLevel._

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.flume._

import org.apache.flume

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SQLContext

import org.apache.spark.streaming.Duration

import sqlContext.implicits._

import org.apache.spark.sql.functions.col

import org.apache.spark.sql.functions.lit

import java.util.Calendar

import java.text.SimpleDateFormat

val kafka = spark.readStream.format("kafka").option("kafka.security.protocol", "SASL_SSL").option("kafka.sasl.kerberos.service.name", "kafka").option("kafka.ssl.truststore.location", "/opt/cloudera/security/truststore/truststore.jks").option("kafka.ssl.truststore.password", "password").option("kafka.ssl.keystore.location", "/opt/cloudera/security/jks/host.keystore").option("kafka.ssl.keystore.password", "password").option("kafka.metadata.broker.list", "vminbgdevnd01.axisb.com:9093").option("kafka.bootstrap.servers", "vminbgdevnd01.axisb.com:9093").option("subscribe", "test_topic3").load()

val df = kafka.select(explode(split($"value".cast("string"), ",")).as("word"))

val query = df.writeStream.format("console").start()

1 REPLY 1

Re: Error reading Kafka topic from Spark: Unable to obtain password from user

Contributor

Hi @Aakriti Batra,

The problem seems to be in the JAAS file passed to the executor, it would help to see it's content, but I'd rather suggest you to read this whole article instead:

https://community.hortonworks.com/articles/56704/secure-kafka-java-producer-with-kerberos.html

Don't have an account?
Coming from Hortonworks? Activate your account here