Created 09-11-2017 07:47 AM
Hello,
I have got a spark streaming job on a HDP 2.3.4.7 kerberized cluster running on YARN that crashes randomly every few days. Note: I activated checkpointing on Spark. WAL are on HDFS.
The symptoms are :
- job still is "running" when I execute a "yarn application -list"
- no data is processed
- spark UI returns "Application application_xxxxxx could not be found in RM or history server"
- in the log YARN, I saw the following errors :
17/09/08 13:45:05 ERROR WriteAheadLogManager : Failed to write to write ahead log after 3 failures
exception loop :
17/09/08 13:27:17 WARN Client: Exception encountered while connecting to the server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 7140339 for xxxxxx) can't be found in cache
    at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:375)
    at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:558)
    at org.apache.hadoop.ipc.Client$Connection.access$1800(Client.java:373)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:727)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:723)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:722)
    at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:373)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1493)
    at org.apache.hadoop.ipc.Client.call(Client.java:1397)
    at org.apache.hadoop.ipc.Client.call(Client.java:1358)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy9.getListing(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:573)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
    at com.sun.proxy.$Proxy10.getListing(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2094)
    at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2077)
    at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:801)
    at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:106)
    at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:863)
    at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:859)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:859)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1515)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1555)
    at org.apache.spark.deploy.SparkHadoopUtil.listFilesSorted(SparkHadoopUtil.scala:275)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater.updateCredentialsIfRequired(ExecutorDelegationTokenUpdater.scala:56)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1.run(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater.updateCredentialsIfRequired(ExecutorDelegationTokenUpdater.scala:79)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)There seems to be a problem with the delegation token that cannot be found in cache.
Is it a bug in HDP 2.3.4.7 ?
Thanks in advance,
Jean-François
Created 09-11-2017 08:49 AM
You will need to create a jaas.conf and pass it like in the below example
java -Djava.security.auth.login.config=/home/hdfs-user/jaas.conf \ -Djava.security.krb5.conf=/etc/krb5.conf \ -Djavax.security.auth.useSubjectCredsOnly=false \ -cp "./hdfs-sample-1.0-SNAPSHOT.jar:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-client/*" \ hdfs.sample.HdfsMain
Contents of the jaas.conf should look like this
---start of jaas.conf-------
com.sun.security.jgss.krb5.initiate {
    com.sun.security.auth.module.Krb5LoginModule required
    doNotPrompt=true
    principal="hdfs-user@YOUR_REALM"
    useKeyTab=true
    keyTab="/path/to/hdfs-user.keytab"
    storeKey=true;
};
---End of jaas.conf-------Now your jobs will run successfully by renewing the Kerberos tickets.
Created 09-11-2017 09:15 AM
@Geoffrey Shelton Okot, thanks for your feedback.
I already have a jaas.conf that I use for my Kafka client (my spark streaming job reads data in kerberized Kafka and write in Elasticsearch) in my spark-submit command :
spark-submit -v \
    --principal ${keytabPrincipal} \
    --keytab ${keytabPath} \
    --files ${configDir}/log4j-bdfsap.properties#log4j-bdfsap.properties,${configDir}/jaas.conf#jaas.conf,hdfs:///apps/spark/lib/spark-streaming-kafka-assembly_2.10-1.5.2.2.3.4.7-4.jar#spark-streaming-kafka-assembly_2.10-1.5.2.2.3.4.7-4.jar,hdfs:///apps/hive/conf/hive-site.xml#hive-site.xml,hdfs:///apps/spark/lib/elasticsearch-spark_2.10-2.4.0.jar#elasticsearch-spark_2.10-2.4.0.jar  \
    --driver-java-options "-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j-bdfsap.properties" \
 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j.properties" \
    --conf "spark.yarn.maxAppAttempts=4" \
    --conf "spark.yarn.am.attemptFailuresValidityInterval=1h" \
    --conf "spark.yarn.max.executor.failures=8" \
    --conf "spark.yarn.executor.failuresValidityInterval=1h" \
    --conf "spark.task.maxFailures=8" \
    --conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" \
    --jars hdfs:///apps/spark/lib/spark-streaming-kafka-assembly_2.10-1.5.2.2.3.4.7-4.jar,hdfs:///apps/spark/lib/elasticsearch-spark_2.10-2.4.0.jar \
    --name ${APP_NAME} \
    --class ${APP_CLASS} \
    --master yarn-cluster \
    --driver-cores ${DRIVER_CORES} \
    --driver-memory ${DRIVER_MEMORY} \
    --num-executors ${NUM_EXECUTORS} \
    --executor-memory ${EXECUTOR_MEMORY} \
    --executor-cores ${EXECUTOR_CORES} \
    --queue ${queueNameSparkStreaming} \
    ${APP_LIB_HDFS_PATH} ${APP_CONF_HDFS_PATH}/${LOAD_CONF_FILE}jaas.conf :
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  principal="${userName}@${realm}"
  keyTab="${kerberos_path}"
  renewTicket=true
  storeKey=true
  serviceName="kafka";
};
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  principal="${userName}@${realm}"
  keyTab="${kerberos_path}"
  renewTicket=true
  storeKey=true
  serviceName="zookeeper";
};Is it sufficient ?
Jean-François
Created 02-01-2019 10:58 AM
@Geoffrey Shelton Okot @Jean-François Vandemoortele
Does this issue resolved if yes.
Can you please suggest me the steps.
Thanks in advance.
Created 09-11-2017 10:11 AM
The workaround is to add "-- conf spark.hadoop.fs.hdfs.impl.disable.cache=true" to Spark job command line parameters to disable the token cache from spark side.
Please let me know it that worked out for you .
Created 09-13-2017 10:03 AM
Any updates?
Created 09-13-2017 02:37 PM
Hello,
The spark streaming still crashes on HDP 2.3.4 platform and I have to restart it frequently.
I installed my spark streaming job on a HDP 2.6.1 platform and the job has been running for 1 week without noticeable problem.
I will continue to investigate.
Jean-François
Created 02-01-2019 03:21 PM
Unfortunately, I never got feedback from Jean-François but I guess it worked, That's why it's important to close a thread and share the solution.
Maybe you open a new thread and tag me, what is your exact problem is it a kerberized cluster?
Please let me know
Created 02-01-2019 04:27 PM
At this time, with Spark 1.6, even with the "conf" line added, my streaming job was not very stable.
After a migration to Spark 2.1 (same HDP 2.3.4.7), I noticed that the streaming job is nowadays very stable and resilient.
I now use these conf lines :
--conf "spark.shuffle.service.enabled=true" \ --conf "spark.dynamicAllocation.enabled=true" \ --conf "spark.dynamicAllocation.initialExecutors=1" \ --conf "spark.dynamicAllocation.minExecutors=1" \ --conf "spark.dynamicAllocation.maxExecutors=20" \ --conf "spark.yarn.maxAppAttempts=4" \ --conf "spark.yarn.am.attemptFailuresValidityInterval=1h" \ --conf "spark.yarn.max.executor.failures=8" \ --conf "spark.yarn.executor.failuresValidityInterval=1h" \ --conf "spark.task.maxFailures=8" \ --conf "spark.yarn.submit.waitAppCompletion=false" \ --conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j.properties" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j.properties" \
Jean-François
Created 02-01-2019 05:07 PM
Thanks for reverting to @kanna k question, I would just like to know your initial problem org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 7140339for xxxxxx) can't be found in cache was it resolved with the jaas.conf file because that was specific to the Kerberos error above.
 
					
				
				
			
		
