Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Long running Spark streaming job that crashes on HDP 2.3.4.7 (YARN secured HA cluster)

avatar
Explorer

Hello,

I have got a spark streaming job on a HDP 2.3.4.7 kerberized cluster running on YARN that crashes randomly every few days. Note: I activated checkpointing on Spark. WAL are on HDFS.

The symptoms are :

- job still is "running" when I execute a "yarn application -list"

- no data is processed

- spark UI returns "Application application_xxxxxx could not be found in RM or history server"

- in the log YARN, I saw the following errors :

17/09/08 13:45:05 ERROR WriteAheadLogManager : Failed to write to write ahead log after 3 failures

exception loop :

17/09/08 13:27:17 WARN Client: Exception encountered while connecting to the server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 7140339 for xxxxxx) can't be found in cache
    at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:375)
    at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:558)
    at org.apache.hadoop.ipc.Client$Connection.access$1800(Client.java:373)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:727)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:723)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:722)
    at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:373)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1493)
    at org.apache.hadoop.ipc.Client.call(Client.java:1397)
    at org.apache.hadoop.ipc.Client.call(Client.java:1358)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy9.getListing(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:573)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
    at com.sun.proxy.$Proxy10.getListing(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2094)
    at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2077)
    at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:801)
    at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:106)
    at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:863)
    at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:859)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:859)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1515)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1555)
    at org.apache.spark.deploy.SparkHadoopUtil.listFilesSorted(SparkHadoopUtil.scala:275)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater.updateCredentialsIfRequired(ExecutorDelegationTokenUpdater.scala:56)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1.run(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater.updateCredentialsIfRequired(ExecutorDelegationTokenUpdater.scala:79)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)

There seems to be a problem with the delegation token that cannot be found in cache.

Is it a bug in HDP 2.3.4.7 ?

Thanks in advance,

Jean-François

9 REPLIES 9

avatar
Master Mentor

@Jean-François Vandemoortele

You will need to create a jaas.conf and pass it like in the below example

java -Djava.security.auth.login.config=/home/hdfs-user/jaas.conf \
-Djava.security.krb5.conf=/etc/krb5.conf \
-Djavax.security.auth.useSubjectCredsOnly=false \
-cp "./hdfs-sample-1.0-SNAPSHOT.jar:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-client/*"  \
hdfs.sample.HdfsMain 

Contents of the jaas.conf should look like this

---start of jaas.conf-------
com.sun.security.jgss.krb5.initiate {
    com.sun.security.auth.module.Krb5LoginModule required
    doNotPrompt=true
    principal="hdfs-user@YOUR_REALM"
    useKeyTab=true
    keyTab="/path/to/hdfs-user.keytab"
    storeKey=true;
};
---End of jaas.conf-------

Now your jobs will run successfully by renewing the Kerberos tickets.

avatar
Explorer

@Geoffrey Shelton Okot, thanks for your feedback.

I already have a jaas.conf that I use for my Kafka client (my spark streaming job reads data in kerberized Kafka and write in Elasticsearch) in my spark-submit command :

spark-submit -v \
    --principal ${keytabPrincipal} \
    --keytab ${keytabPath} \
    --files ${configDir}/log4j-bdfsap.properties#log4j-bdfsap.properties,${configDir}/jaas.conf#jaas.conf,hdfs:///apps/spark/lib/spark-streaming-kafka-assembly_2.10-1.5.2.2.3.4.7-4.jar#spark-streaming-kafka-assembly_2.10-1.5.2.2.3.4.7-4.jar,hdfs:///apps/hive/conf/hive-site.xml#hive-site.xml,hdfs:///apps/spark/lib/elasticsearch-spark_2.10-2.4.0.jar#elasticsearch-spark_2.10-2.4.0.jar  \
    --driver-java-options "-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j-bdfsap.properties" \
 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j.properties" \
    --conf "spark.yarn.maxAppAttempts=4" \
    --conf "spark.yarn.am.attemptFailuresValidityInterval=1h" \
    --conf "spark.yarn.max.executor.failures=8" \
    --conf "spark.yarn.executor.failuresValidityInterval=1h" \
    --conf "spark.task.maxFailures=8" \
    --conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" \
    --jars hdfs:///apps/spark/lib/spark-streaming-kafka-assembly_2.10-1.5.2.2.3.4.7-4.jar,hdfs:///apps/spark/lib/elasticsearch-spark_2.10-2.4.0.jar \
    --name ${APP_NAME} \
    --class ${APP_CLASS} \
    --master yarn-cluster \
    --driver-cores ${DRIVER_CORES} \
    --driver-memory ${DRIVER_MEMORY} \
    --num-executors ${NUM_EXECUTORS} \
    --executor-memory ${EXECUTOR_MEMORY} \
    --executor-cores ${EXECUTOR_CORES} \
    --queue ${queueNameSparkStreaming} \
    ${APP_LIB_HDFS_PATH} ${APP_CONF_HDFS_PATH}/${LOAD_CONF_FILE}

jaas.conf :

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  principal="${userName}@${realm}"
  keyTab="${kerberos_path}"
  renewTicket=true
  storeKey=true
  serviceName="kafka";
};
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  principal="${userName}@${realm}"
  keyTab="${kerberos_path}"
  renewTicket=true
  storeKey=true
  serviceName="zookeeper";
};

Is it sufficient ?

Jean-François

avatar
Contributor

@Geoffrey Shelton Okot @Jean-François Vandemoortele

Does this issue resolved if yes.

Can you please suggest me the steps.

Thanks in advance.

avatar
Master Mentor

@Jean-François Vandemoortele

The workaround is to add "-- conf spark.hadoop.fs.hdfs.impl.disable.cache=true" to Spark job command line parameters to disable the token cache from spark side.

Please let me know it that worked out for you .

avatar
Master Mentor

avatar
Explorer
@Geoffrey Shelton Okot

Hello,

The spark streaming still crashes on HDP 2.3.4 platform and I have to restart it frequently.

I installed my spark streaming job on a HDP 2.6.1 platform and the job has been running for 1 week without noticeable problem.

I will continue to investigate.

Jean-François

avatar
Master Mentor

@kanna k

Unfortunately, I never got feedback from Jean-François but I guess it worked, That's why it's important to close a thread and share the solution.

Maybe you open a new thread and tag me, what is your exact problem is it a kerberized cluster?

Please let me know

avatar
Explorer

At this time, with Spark 1.6, even with the "conf" line added, my streaming job was not very stable.

After a migration to Spark 2.1 (same HDP 2.3.4.7), I noticed that the streaming job is nowadays very stable and resilient.

I now use these conf lines :

--conf "spark.shuffle.service.enabled=true" \
   --conf "spark.dynamicAllocation.enabled=true" \
   --conf "spark.dynamicAllocation.initialExecutors=1" \
   --conf "spark.dynamicAllocation.minExecutors=1" \
   --conf "spark.dynamicAllocation.maxExecutors=20" \
   --conf "spark.yarn.maxAppAttempts=4" \
   --conf "spark.yarn.am.attemptFailuresValidityInterval=1h" \
   --conf "spark.yarn.max.executor.failures=8" \
   --conf "spark.yarn.executor.failuresValidityInterval=1h" \
   --conf "spark.task.maxFailures=8" \
   --conf "spark.yarn.submit.waitAppCompletion=false" \
   --conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" \
   --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j.properties" \
   --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j.properties" \

Jean-François

avatar
Master Mentor

@Jean-François Vandemoortele

Thanks for reverting to @kanna k question, I would just like to know your initial problem org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 7140339for xxxxxx) can't be found in cache was it resolved with the jaas.conf file because that was specific to the Kerberos error above.