Support Questions
Find answers, ask questions, and share your expertise

How to reissue a delegated token after max lifetime passes for a spark streaming application on a Kerberized cluster


We are using a headless keytab to run our long-running spark streaming application. The token is renewed automatically every 1 day until it hits the max life limit. The problem is token is expired after max life (7 days) and we need to restart the job. Is there any way we can re-issue the token and pass it to a job that is already running? It doesn't feel right at all to restart the job every 7 days only due to the token issue.



hi @Ali,

You might want to add "--keytab /path/to/the/headless-keytab", "--principal principalNameAsPerTheKeytab" and "--conf spark.hadoop.fs.hdfs.impl.disable.cache=true" to the spark-submit command.


This is exactly the way we use spark-submit. A token is valid only for 24 hrs. Yarn renew the token every 24 hrs automatically until it reaches the max lifetime (which is 7 days) then the token cannot get renewed anymore and needs to be reissued. Hence, we are restarting the job every 7 days. Restarting the job every 7 days doesn't seem to be the right approach for a production environment!


Can you share (masked) the spark submit command and the full "delegation token has expired" stacktrace? Also what is the use case of your app?


Stack trace:

WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException( Manager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 155456 for spark) can't be found in cache Exception in thread "main" org.apache.hadoop.ipc.RemoteException($InvalidToken): token (HDFS_DELEGATION_TOKEN token 1 55456 for spark) can't be found in cache at org.apache.hadoop.ipc.Client.getRpcResponse( at at at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke( at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo( at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( at sun.reflect.DelegatingMethodAccessorImpl.invoke( at java.lang.reflect.Method.invoke( at at at at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo( at org.apache.hadoop.hdfs.DistributedFileSystem$26.doCall( at org.apache.hadoop.hdfs.DistributedFileSystem$26.doCall( at org.apache.hadoop.fs.FileSystemLinkResolver.resolve( at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus( at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$6.apply(ApplicationMaster.scala:160) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$6.apply(ApplicationMaster.scala:157) at scala.Option.foreach(Option.scala:257) at org.apache.spark.deploy.yarn.ApplicationMaster.<init>(ApplicationMaster.scala:157) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:765) at org.apache.spark.deploy.SparkHadoopUtil$$anon$ at org.apache.spark.deploy.SparkHadoopUtil$$anon$ at Method) at at at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:764) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)


Spark-submit looks fine, this issue will take more than a forum to resolve, would require code and logs analysis I'd say.

Meanwhile, I can only suggest to pass "" to the extraJavaOptions, and it would also help if you can set the following in file "", then restart the application, hoping it will print more pointers. Also, if your KDC is an MIT KDC, double check that your principal has not set a 'Maximum Renewal Time' of 00:00:00 as explained here

Another property to try out, depending on your application use case that may help is to set:

--conf mapreduce.job.complete.cancel.delegation.tokens=false


Spark submit:

spark-submit \ --master yarn \ --deploy-mode cluster \ --conf " -Dlog4j.configuration=xxx -Djava.util.Arrays.useLegacyMergeSort=true" \ --conf " -Dlog4j.configuration=xxx -Djava.util.Arrays.useLegacyMergeSort=true" \ --conf spark.ui.port=18086 \ --conf spark.executor.memory=${executor_memory} \ --conf spark.executor.instances=${num_executors} \ --conf spark.executor.cores=${executor_cores} \ --conf spark.driver.memory=4g \ --conf spark.driver.maxResultSize=3g \ --conf \ --conf spark.kafka.zookeeper.ingest=xxx \ --conf \ --conf spark.kafka.topic.input=xxx \ --conf spark.kafka.topic.output=xxx \ --conf spark.kafka.input.interval=10 \ --conf \ --conf spark.streaming.kafka.maxRetries=10 \ --conf \ --conf \ --conf spark.fetch.message.max.bytes=104857600 \ --conf spark.hive.enable.stats=true \ --conf spark.streaming.backpressure.enabled=true \ --conf spark.streaming.kafka.maxRatePerPartition=1 \ --conf spark.streaming.receiver.maxRate=10 \ --conf spark.executor.heartbeatInterval=120s \ --conf \ --conf spark.yarn.scheduler.heartbeat.interval-ms=1000 \ --conf spark.sql.parquet.compression.codec=snappy \ --conf spark.scheduler.minRegisteredResourcesRatio=1 \ --conf spark.yarn.maxAppAttempts=10 \ --conf \ --conf spark.yarn.max.executor.failures=$((8 * ${num_executors})) `# Increase max executor failures (Default: max(numExecutors * 2, 3))` \ --conf spark.yarn.executor.failuresValidityInterval=1h \ --conf spark.task.maxFailures=8 \ --conf spark.yarn.submit.waitAppCompletion=false \ --conf spark.yarn.principal=xxx \ --conf spark.yarn.keytab=xxx \ --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \ --queue default \ ${APP_HOME}/xxx.jar



I think for a kerberized cluster you need to use the concept of a jaas.conf file see Accessing kerberized Sources from Spark2 in Cluster mode on YARN


; ;