Created 12-20-2018 05:40 AM
We are using a headless keytab to run our long-running spark streaming application. The token is renewed automatically every 1 day until it hits the max life limit. The problem is token is expired after max life (7 days) and we need to restart the job. Is there any way we can re-issue the token and pass it to a job that is already running? It doesn't feel right at all to restart the job every 7 days only due to the token issue.
Created 12-23-2018 06:56 PM
hi @Ali,
You might want to add "--keytab /path/to/the/headless-keytab", "--principal principalNameAsPerTheKeytab" and "--conf spark.hadoop.fs.hdfs.impl.disable.cache=true" to the spark-submit command.
Created 01-03-2019 12:52 AM
This is exactly the way we use spark-submit. A token is valid only for 24 hrs. Yarn renew the token every 24 hrs automatically until it reaches the max lifetime (which is 7 days) then the token cannot get renewed anymore and needs to be reissued. Hence, we are restarting the job every 7 days. Restarting the job every 7 days doesn't seem to be the right approach for a production environment!
Created 01-03-2019 06:57 PM
Can you share (masked) the spark submit command and the full "delegation token has expired" stacktrace? Also what is the use case of your app?
Created 01-08-2019 05:54 AM
Stack trace:
WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.Secret Manager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 155456 for spark) can't be found in cache Exception in thread "main" org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 1 55456 for spark) can't be found in cache at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554) at org.apache.hadoop.ipc.Client.call(Client.java:1498) at org.apache.hadoop.ipc.Client.call(Client.java:1398) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:818) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185) at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2165) at org.apache.hadoop.hdfs.DistributedFileSystem$26.doCall(DistributedFileSystem.java:1442) at org.apache.hadoop.hdfs.DistributedFileSystem$26.doCall(DistributedFileSystem.java:1438) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1438) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$6.apply(ApplicationMaster.scala:160) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$6.apply(ApplicationMaster.scala:157) at scala.Option.foreach(Option.scala:257) at org.apache.spark.deploy.yarn.ApplicationMaster.<init>(ApplicationMaster.scala:157) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:765) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:67) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:66) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:764) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Created 01-08-2019 07:48 PM
Spark-submit looks fine, this issue will take more than a forum to resolve, would require code and logs analysis I'd say.
Meanwhile, I can only suggest to pass "-Dsun.security.krb5.debug=true" to the extraJavaOptions, and it would also help if you can set the following in log4j.properties file "log4j.logger.org.apache.spark.deploy.yarn.Client=DEBUG", then restart the application, hoping it will print more pointers. Also, if your KDC is an MIT KDC, double check that your principal has not set a 'Maximum Renewal Time' of 00:00:00 as explained here
Another property to try out, depending on your application use case that may help is to set:
--conf mapreduce.job.complete.cancel.delegation.tokens=false
Created 01-08-2019 06:00 AM
Spark submit:
spark-submit \ --master yarn \ --deploy-mode cluster \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf -Dlog4j.configuration=xxx -Djava.util.Arrays.useLegacyMergeSort=true" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf -Dlog4j.configuration=xxx -Djava.util.Arrays.useLegacyMergeSort=true" \ --conf spark.ui.port=18086 \ --conf spark.executor.memory=${executor_memory} \ --conf spark.executor.instances=${num_executors} \ --conf spark.executor.cores=${executor_cores} \ --conf spark.driver.memory=4g \ --conf spark.driver.maxResultSize=3g \ --conf spark.kafka.broker.ingest=xxx \ --conf spark.kafka.zookeeper.ingest=xxx \ --conf spark.kafka.broker.egest=xxx \ --conf spark.kafka.topic.input=xxx \ --conf spark.kafka.topic.output=xxx \ --conf spark.kafka.input.interval=10 \ --conf spark.kafka.group=xxx \ --conf spark.streaming.kafka.maxRetries=10 \ --conf spark.kafka.security.protocol.ingress=SASL_PLAINTEXT \ --conf spark.kafka.security.protocol.egress=SASL_PLAINTEXT \ --conf spark.fetch.message.max.bytes=104857600 \ --conf spark.hive.enable.stats=true \ --conf spark.streaming.backpressure.enabled=true \ --conf spark.streaming.kafka.maxRatePerPartition=1 \ --conf spark.streaming.receiver.maxRate=10 \ --conf spark.executor.heartbeatInterval=120s \ --conf spark.network.timeout=600s \ --conf spark.yarn.scheduler.heartbeat.interval-ms=1000 \ --conf spark.sql.parquet.compression.codec=snappy \ --conf spark.scheduler.minRegisteredResourcesRatio=1 \ --conf spark.yarn.maxAppAttempts=10 \ --conf spark.yarn.am.attemptFailuresValidityInterval=1h \ --conf spark.yarn.max.executor.failures=$((8 * ${num_executors})) `# Increase max executor failures (Default: max(numExecutors * 2, 3))` \ --conf spark.yarn.executor.failuresValidityInterval=1h \ --conf spark.task.maxFailures=8 \ --conf spark.yarn.submit.waitAppCompletion=false \ --conf spark.yarn.principal=xxx \ --conf spark.yarn.keytab=xxx \ --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \ --queue default \ ${APP_HOME}/xxx.jar
Created 01-08-2019 08:37 PM
I think for a kerberized cluster you need to use the concept of a jaas.conf file see Accessing kerberized Sources from Spark2 in Cluster mode on YARN
HTH