Support Questions

Find answers, ask questions, and share your expertise

How to reissue a delegated token after max lifetime passes for a spark streaming application on a Kerberized cluster

avatar
Expert Contributor

We are using a headless keytab to run our long-running spark streaming application. The token is renewed automatically every 1 day until it hits the max life limit. The problem is token is expired after max life (7 days) and we need to restart the job. Is there any way we can re-issue the token and pass it to a job that is already running? It doesn't feel right at all to restart the job every 7 days only due to the token issue.

7 REPLIES 7

avatar
Rising Star

hi @Ali,

You might want to add "--keytab /path/to/the/headless-keytab", "--principal principalNameAsPerTheKeytab" and "--conf spark.hadoop.fs.hdfs.impl.disable.cache=true" to the spark-submit command.

avatar
Expert Contributor

This is exactly the way we use spark-submit. A token is valid only for 24 hrs. Yarn renew the token every 24 hrs automatically until it reaches the max lifetime (which is 7 days) then the token cannot get renewed anymore and needs to be reissued. Hence, we are restarting the job every 7 days. Restarting the job every 7 days doesn't seem to be the right approach for a production environment!

avatar
Rising Star

Can you share (masked) the spark submit command and the full "delegation token has expired" stacktrace? Also what is the use case of your app?

avatar
Expert Contributor

Stack trace:

WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.Secret Manager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 155456 for spark) can't be found in cache Exception in thread "main" org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 1 55456 for spark) can't be found in cache at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554) at org.apache.hadoop.ipc.Client.call(Client.java:1498) at org.apache.hadoop.ipc.Client.call(Client.java:1398) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:818) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185) at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2165) at org.apache.hadoop.hdfs.DistributedFileSystem$26.doCall(DistributedFileSystem.java:1442) at org.apache.hadoop.hdfs.DistributedFileSystem$26.doCall(DistributedFileSystem.java:1438) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1438) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$6.apply(ApplicationMaster.scala:160) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$6.apply(ApplicationMaster.scala:157) at scala.Option.foreach(Option.scala:257) at org.apache.spark.deploy.yarn.ApplicationMaster.<init>(ApplicationMaster.scala:157) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:765) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:67) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:66) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:764) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)

avatar
Rising Star

Spark-submit looks fine, this issue will take more than a forum to resolve, would require code and logs analysis I'd say.

Meanwhile, I can only suggest to pass "-Dsun.security.krb5.debug=true" to the extraJavaOptions, and it would also help if you can set the following in log4j.properties file "log4j.logger.org.apache.spark.deploy.yarn.Client=DEBUG", then restart the application, hoping it will print more pointers. Also, if your KDC is an MIT KDC, double check that your principal has not set a 'Maximum Renewal Time' of 00:00:00 as explained here

Another property to try out, depending on your application use case that may help is to set:

--conf mapreduce.job.complete.cancel.delegation.tokens=false

avatar
Expert Contributor

Spark submit:

spark-submit \ --master yarn \ --deploy-mode cluster \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf -Dlog4j.configuration=xxx -Djava.util.Arrays.useLegacyMergeSort=true" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf -Dlog4j.configuration=xxx -Djava.util.Arrays.useLegacyMergeSort=true" \ --conf spark.ui.port=18086 \ --conf spark.executor.memory=${executor_memory} \ --conf spark.executor.instances=${num_executors} \ --conf spark.executor.cores=${executor_cores} \ --conf spark.driver.memory=4g \ --conf spark.driver.maxResultSize=3g \ --conf spark.kafka.broker.ingest=xxx \ --conf spark.kafka.zookeeper.ingest=xxx \ --conf spark.kafka.broker.egest=xxx \ --conf spark.kafka.topic.input=xxx \ --conf spark.kafka.topic.output=xxx \ --conf spark.kafka.input.interval=10 \ --conf spark.kafka.group=xxx \ --conf spark.streaming.kafka.maxRetries=10 \ --conf spark.kafka.security.protocol.ingress=SASL_PLAINTEXT \ --conf spark.kafka.security.protocol.egress=SASL_PLAINTEXT \ --conf spark.fetch.message.max.bytes=104857600 \ --conf spark.hive.enable.stats=true \ --conf spark.streaming.backpressure.enabled=true \ --conf spark.streaming.kafka.maxRatePerPartition=1 \ --conf spark.streaming.receiver.maxRate=10 \ --conf spark.executor.heartbeatInterval=120s \ --conf spark.network.timeout=600s \ --conf spark.yarn.scheduler.heartbeat.interval-ms=1000 \ --conf spark.sql.parquet.compression.codec=snappy \ --conf spark.scheduler.minRegisteredResourcesRatio=1 \ --conf spark.yarn.maxAppAttempts=10 \ --conf spark.yarn.am.attemptFailuresValidityInterval=1h \ --conf spark.yarn.max.executor.failures=$((8 * ${num_executors})) `# Increase max executor failures (Default: max(numExecutors * 2, 3))` \ --conf spark.yarn.executor.failuresValidityInterval=1h \ --conf spark.task.maxFailures=8 \ --conf spark.yarn.submit.waitAppCompletion=false \ --conf spark.yarn.principal=xxx \ --conf spark.yarn.keytab=xxx \ --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \ --queue default \ ${APP_HOME}/xxx.jar

avatar
Master Mentor

@Ali

I think for a kerberized cluster you need to use the concept of a jaas.conf file see Accessing kerberized Sources from Spark2 in Cluster mode on YARN

HTH