Created 01-25-2018 03:09 PM
Sample Program: object DemoSparkStreamingWithKafkaJob { def main(args: Array[String]): Unit = { val spark = getSparkSession import spark.implicits._ val Array(kafkaBrokerList, srcTopic,destTopic, security) = args import spark.implicits._ val df: DataFrame = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", kafkaBrokerList) .option("kafka.security.protocol", security) .option("subscribe", srcTopic) .load val keyValueDf = df .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] val query = keyValueDf .writeStream .format("console") .start() query.awaitTermination() } def getSparkSession()= { val spark = SparkSession.builder() .appName("KAFKA_SPARK_TEST_APP") .getOrCreate() spark } } kafka_jaas.conf: KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="{{kafka_bare_jaas_principal}}"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="{{kafka_keytab_path}}" storeKey=true useTicketCache=false serviceName="zookeeper" principal="{{kafka_jaas_principal}}"; }; Spark-submit Command: ./spark-submit \ --name TEST_JOB_SPARK_STREAM_WITH_KAFKA \ --verbose \ --master yarn \ --deploy-mode client \ --num-executors 1 \ --executor-memory 1g \ --executor-cores 1 \ --repositories http://repo.hortonworks.com/content/repositories/releases/ \ --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0.2.6.3.0-235 \ --conf spark.yarn.maxAppAttempts=1 \ --files /home/user/sparktest/kafka_jaas.conf#kafka_jaas.conf,/home/user/sparktest/user.headless.keytab#user.headless.keytab \ --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -Djava.security.auth.login.config=/home/user/sparktest/kafka_jaas.conf" \ --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -Djava.security.auth.login.config=/home/user/sparktest/kafka_jaas.conf" \ --class com.brokerfmc.demo.DemoSparkStreamingWithKafkaJob /home/user/sparktest/demo.jar kafka01.broker.com:6667,kafka02.broker.com:6667 src_topic dest_topic SASL_PLAINTEXT Spark Console Output: 18/01/25 14:19:59 INFO AbstractLogin: Successfully logged in. 18/01/25 14:19:59 INFO KerberosLogin: TGT refresh thread started. 18/01/25 14:19:59 INFO KerberosLogin: TGT valid starting at: Thu Jan 25 13:07:52 UTC 2018 18/01/25 14:19:59 INFO KerberosLogin: TGT expires: Thu Jan 25 23:08:08 UTC 2018 18/01/25 14:19:59 INFO KerberosLogin: TGT refresh sleeping until: Thu Jan 25 21:26:00 UTC 2018 18/01/25 14:19:59 INFO ConsumerConfig: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [kafka01.broker.com:6667, kafka02.broker.com:6667] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-1 ssl.endpoint.identification.algorithm = null max.poll.records = 1 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = spark-kafka-source-c3ecdc95-df8e-4381-be28-c504abd91dba--805260723-driver-0 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = SASL_PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = earliest 18/01/25 14:19:59 INFO AppInfoParser: Kafka version : 0.10.0.1 18/01/25 14:19:59 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5 18/01/25 14:19:59 INFO StreamExecution: Starting new streaming query. Pom file: <properties> <provided.scope>provided</provided.scope> <hdp.version>2.6.3.0-235</hdp.version> <spark-excel.version>0.9.8</spark-excel.version> <spark.version>2.2.0.2.6.3.0-235</spark.version> <kafka.version>0.10.1.2.6.3.0-235</kafka.version> </properties> <repositories> <repository> <releases> <enabled>true</enabled> <updatePolicy>always</updatePolicy> <checksumPolicy>warn</checksumPolicy> </releases> <snapshots> <enabled>false</enabled> <updatePolicy>never</updatePolicy> <checksumPolicy>fail</checksumPolicy> </snapshots> <id>HDPReleases</id> <name>HDP Releases</name> <url>http://repo.hortonworks.com/content/repositories/releases/</url> <layout>default</layout> </repository> </repositories> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.crealytics</groupId> <artifactId>spark-excel_2.11</artifactId> <version>${spark-excel.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> </dependencies>
Created 01-02-2019 10:22 AM
Hi Nilesh,
I know it is too late to answer, still lets add a solution for future issues
Please try
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -Djava.security.auth.login.config=kafka_jaas.conf"
instead of
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -Djava.security.auth.login.config=/home/user/sparktest/kafka_jaas.conf"
Also ensure to use just the file name 'user.headless.keytab' in jaas file (not absolute file path) .
eg:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true useTicketCache=false principal="userprincipalname" useKeyTab=true serviceName="kafka" keyTab="user.headless.keytab" client=true; };
Thanks
Vinod