Support Questions

Find answers, ask questions, and share your expertise
Announcements
Welcome to the upgraded Community! Read this blog to see What’s New!

Spark structured streaming with secured Kafka gets freeze on a log message [INFO StreamExecution: Starting new streaming query.]

avatar
Sample Program:

object DemoSparkStreamingWithKafkaJob {
  def main(args: Array[String]): Unit =
  {
    val spark = getSparkSession
    import spark.implicits._
    val Array(kafkaBrokerList, srcTopic,destTopic, security) = args

     import spark.implicits._
     val df: DataFrame = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", kafkaBrokerList)
      .option("kafka.security.protocol", security)
      .option("subscribe", srcTopic)
      .load

    val keyValueDf = df
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]

    val query = keyValueDf
      .writeStream
      .format("console")
      .start()
    query.awaitTermination()
   }
  def getSparkSession()=
  {
    val spark = SparkSession.builder()
      .appName("KAFKA_SPARK_TEST_APP")
       .getOrCreate()
    spark
  }
}

kafka_jaas.conf:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="{{kafka_bare_jaas_principal}}";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="{{kafka_keytab_path}}"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="{{kafka_jaas_principal}}";
};


Spark-submit Command:

./spark-submit \
--name TEST_JOB_SPARK_STREAM_WITH_KAFKA \
--verbose \
--master yarn \
--deploy-mode client \
--num-executors 1  \
--executor-memory 1g \
--executor-cores 1 \
--repositories http://repo.hortonworks.com/content/repositories/releases/ \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0.2.6.3.0-235 \
--conf spark.yarn.maxAppAttempts=1 \
--files /home/user/sparktest/kafka_jaas.conf#kafka_jaas.conf,/home/user/sparktest/user.headless.keytab#user.headless.keytab \
--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -Djava.security.auth.login.config=/home/user/sparktest/kafka_jaas.conf" \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -Djava.security.auth.login.config=/home/user/sparktest/kafka_jaas.conf" \
--class com.brokerfmc.demo.DemoSparkStreamingWithKafkaJob /home/user/sparktest/demo.jar kafka01.broker.com:6667,kafka02.broker.com:6667 src_topic dest_topic SASL_PLAINTEXT


Spark Console Output:

18/01/25 14:19:59 INFO AbstractLogin: Successfully logged in.
18/01/25 14:19:59 INFO KerberosLogin: TGT refresh thread started.
18/01/25 14:19:59 INFO KerberosLogin: TGT valid starting at: Thu Jan 25 13:07:52 UTC 2018
18/01/25 14:19:59 INFO KerberosLogin: TGT expires: Thu Jan 25 23:08:08 UTC 2018
18/01/25 14:19:59 INFO KerberosLogin: TGT refresh sleeping until: Thu Jan 25 21:26:00 UTC 2018
18/01/25 14:19:59 INFO ConsumerConfig: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [kafka01.broker.com:6667, kafka02.broker.com:6667]
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id = consumer-1
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 1
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = spark-kafka-source-c3ecdc95-df8e-4381-be28-c504abd91dba--805260723-driver-0
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = SASL_PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = earliest
18/01/25 14:19:59 INFO AppInfoParser: Kafka version : 0.10.0.1
18/01/25 14:19:59 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
18/01/25 14:19:59 INFO StreamExecution: Starting new streaming query.


Pom file:

   <properties>
        <provided.scope>provided</provided.scope>
        <hdp.version>2.6.3.0-235</hdp.version>
        <spark-excel.version>0.9.8</spark-excel.version>
        <spark.version>2.2.0.2.6.3.0-235</spark.version>
        <kafka.version>0.10.1.2.6.3.0-235</kafka.version>
    </properties>
	
	<repositories>
        <repository>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>warn</checksumPolicy>
            </releases>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
                <checksumPolicy>fail</checksumPolicy>
            </snapshots>
            <id>HDPReleases</id>
            <name>HDP Releases</name>
            <url>http://repo.hortonworks.com/content/repositories/releases/</url>
            <layout>default</layout>
        </repository>
    </repositories>
	 <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.crealytics</groupId>
            <artifactId>spark-excel_2.11</artifactId>
            <version>${spark-excel.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

1 REPLY 1

avatar
Explorer

Hi Nilesh,

I know it is too late to answer, still lets add a solution for future issues

Please try

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -Djava.security.auth.login.config=kafka_jaas.conf"

instead of

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -Djava.security.auth.login.config=/home/user/sparktest/kafka_jaas.conf"

Also ensure to use just the file name 'user.headless.keytab' in jaas file (not absolute file path) .

eg:

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 doNotPrompt=true
 useTicketCache=false
 principal="userprincipalname"
 useKeyTab=true
 serviceName="kafka"
 keyTab="user.headless.keytab"
 client=true;
};

Thanks

Vinod

Labels