Created 12-19-2018 01:34 PM
I am receiving the following error:
18/12/19 16:43:32 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 18/12/19 16:43:32 ERROR streaming.StreamExecution: Query [id = b318a686-395d-4b47-b6f8-3f7190bf4f36, runId = 9b6245f4-6063-4ccb-819e-af464cf5723a] te rminated with error org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0. 0 (TID 3, vminbgdevdtnd02.axisb.com, executor 1): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.createConsumer(CachedKafkaConsumer.scala:56) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.<init>(CachedKafkaConsumer.scala:45) at org.apache.spark.sql.kafka010.CachedKafkaConsumer$.getOrCreate(CachedKafkaConsumer.scala:376) at org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:137) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Unable to obtain password from user
The code I am running is as follows:
export SPARK_KAFKA_VERSION=0.10
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/143637/spark_jaas.conf"
export SPARK_SUBMIT_OPTS='-Djava.security.auth.login.config=/home/143637/spark_jaas.conf'
spark2-shell --files spark_jaas.conf,143637.keytab,truststore.jks,host.keystore --driver-java-options "-Djava.security.auth.login.config=spark_jaas.conf" --conf "spark.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf -Dsun.security.krb5.debug=false -Djavax.net.ssl.trustStore=truststore.jks -Djavax.net.ssl.trustStorePassword=password" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf -Dsun.security.krb5.debug=false -Djavax.net.ssl.trustStore=truststore.jks -Djavax.net.ssl.trustStorePassword=password" --queue fraud --jars /home/143637/spark-sql-kafka-0-10_2.11-2.1.1.jar --keytab /home/143637/143637.keytab
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import StorageLevel._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
import org.apache.flume
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
import sqlContext.implicits._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.lit
import java.util.Calendar
import java.text.SimpleDateFormat
val kafka = spark.readStream.format("kafka").option("kafka.security.protocol", "SASL_SSL").option("kafka.sasl.kerberos.service.name", "kafka").option("kafka.ssl.truststore.location", "/opt/cloudera/security/truststore/truststore.jks").option("kafka.ssl.truststore.password", "password").option("kafka.ssl.keystore.location", "/opt/cloudera/security/jks/host.keystore").option("kafka.ssl.keystore.password", "password").option("kafka.metadata.broker.list", "vminbgdevnd01.axisb.com:9093").option("kafka.bootstrap.servers", "vminbgdevnd01.axisb.com:9093").option("subscribe", "test_topic3").load()
val df = kafka.select(explode(split($"value".cast("string"), ",")).as("word"))
val query = df.writeStream.format("console").start()
Created 12-23-2018 07:02 PM
Hi @Aakriti Batra,
The problem seems to be in the JAAS file passed to the executor, it would help to see it's content, but I'd rather suggest you to read this whole article instead:
https://community.hortonworks.com/articles/56704/secure-kafka-java-producer-with-kerberos.html