Support Questions

Find answers, ask questions, and share your expertise

Long running Spark streaming job that crashes on HDP 2.3.4.7 (YARN secured HA cluster)

Explorer

Hello,

I have got a spark streaming job on a HDP 2.3.4.7 kerberized cluster running on YARN that crashes randomly every few days. Note: I activated checkpointing on Spark. WAL are on HDFS.

The symptoms are :

- job still is "running" when I execute a "yarn application -list"

- no data is processed

- spark UI returns "Application application_xxxxxx could not be found in RM or history server"

- in the log YARN, I saw the following errors :

17/09/08 13:45:05 ERROR WriteAheadLogManager : Failed to write to write ahead log after 3 failures

exception loop :

17/09/08 13:27:17 WARN Client: Exception encountered while connecting to the server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 7140339 for xxxxxx) can't be found in cache
    at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:375)
    at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:558)
    at org.apache.hadoop.ipc.Client$Connection.access$1800(Client.java:373)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:727)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:723)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:722)
    at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:373)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1493)
    at org.apache.hadoop.ipc.Client.call(Client.java:1397)
    at org.apache.hadoop.ipc.Client.call(Client.java:1358)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy9.getListing(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:573)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
    at com.sun.proxy.$Proxy10.getListing(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2094)
    at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2077)
    at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:801)
    at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:106)
    at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:863)
    at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:859)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:859)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1515)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1555)
    at org.apache.spark.deploy.SparkHadoopUtil.listFilesSorted(SparkHadoopUtil.scala:275)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater.updateCredentialsIfRequired(ExecutorDelegationTokenUpdater.scala:56)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1.run(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater.updateCredentialsIfRequired(ExecutorDelegationTokenUpdater.scala:79)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)

There seems to be a problem with the delegation token that cannot be found in cache.

Is it a bug in HDP 2.3.4.7 ?

Thanks in advance,

Jean-François

9 REPLIES 9

Mentor

@Jean-François Vandemoortele

You will need to create a jaas.conf and pass it like in the below example

java -Djava.security.auth.login.config=/home/hdfs-user/jaas.conf \
-Djava.security.krb5.conf=/etc/krb5.conf \
-Djavax.security.auth.useSubjectCredsOnly=false \
-cp "./hdfs-sample-1.0-SNAPSHOT.jar:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-client/*"  \
hdfs.sample.HdfsMain 

Contents of the jaas.conf should look like this

---start of jaas.conf-------
com.sun.security.jgss.krb5.initiate {
    com.sun.security.auth.module.Krb5LoginModule required
    doNotPrompt=true
    principal="hdfs-user@YOUR_REALM"
    useKeyTab=true
    keyTab="/path/to/hdfs-user.keytab"
    storeKey=true;
};
---End of jaas.conf-------

Now your jobs will run successfully by renewing the Kerberos tickets.

Explorer

@Geoffrey Shelton Okot, thanks for your feedback.

I already have a jaas.conf that I use for my Kafka client (my spark streaming job reads data in kerberized Kafka and write in Elasticsearch) in my spark-submit command :

spark-submit -v \
    --principal ${keytabPrincipal} \
    --keytab ${keytabPath} \
    --files ${configDir}/log4j-bdfsap.properties#log4j-bdfsap.properties,${configDir}/jaas.conf#jaas.conf,hdfs:///apps/spark/lib/spark-streaming-kafka-assembly_2.10-1.5.2.2.3.4.7-4.jar#spark-streaming-kafka-assembly_2.10-1.5.2.2.3.4.7-4.jar,hdfs:///apps/hive/conf/hive-site.xml#hive-site.xml,hdfs:///apps/spark/lib/elasticsearch-spark_2.10-2.4.0.jar#elasticsearch-spark_2.10-2.4.0.jar  \
    --driver-java-options "-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j-bdfsap.properties" \
 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j.properties" \
    --conf "spark.yarn.maxAppAttempts=4" \
    --conf "spark.yarn.am.attemptFailuresValidityInterval=1h" \
    --conf "spark.yarn.max.executor.failures=8" \
    --conf "spark.yarn.executor.failuresValidityInterval=1h" \
    --conf "spark.task.maxFailures=8" \
    --conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" \
    --jars hdfs:///apps/spark/lib/spark-streaming-kafka-assembly_2.10-1.5.2.2.3.4.7-4.jar,hdfs:///apps/spark/lib/elasticsearch-spark_2.10-2.4.0.jar \
    --name ${APP_NAME} \
    --class ${APP_CLASS} \
    --master yarn-cluster \
    --driver-cores ${DRIVER_CORES} \
    --driver-memory ${DRIVER_MEMORY} \
    --num-executors ${NUM_EXECUTORS} \
    --executor-memory ${EXECUTOR_MEMORY} \
    --executor-cores ${EXECUTOR_CORES} \
    --queue ${queueNameSparkStreaming} \
    ${APP_LIB_HDFS_PATH} ${APP_CONF_HDFS_PATH}/${LOAD_CONF_FILE}

jaas.conf :

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  principal="${userName}@${realm}"
  keyTab="${kerberos_path}"
  renewTicket=true
  storeKey=true
  serviceName="kafka";
};
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  principal="${userName}@${realm}"
  keyTab="${kerberos_path}"
  renewTicket=true
  storeKey=true
  serviceName="zookeeper";
};

Is it sufficient ?

Jean-François

Contributor

@Geoffrey Shelton Okot @Jean-François Vandemoortele

Does this issue resolved if yes.

Can you please suggest me the steps.

Thanks in advance.

Mentor

@Jean-François Vandemoortele

The workaround is to add "-- conf spark.hadoop.fs.hdfs.impl.disable.cache=true" to Spark job command line parameters to disable the token cache from spark side.

Please let me know it that worked out for you .

Mentor

Explorer
@Geoffrey Shelton Okot

Hello,

The spark streaming still crashes on HDP 2.3.4 platform and I have to restart it frequently.

I installed my spark streaming job on a HDP 2.6.1 platform and the job has been running for 1 week without noticeable problem.

I will continue to investigate.

Jean-François

Mentor

@kanna k

Unfortunately, I never got feedback from Jean-François but I guess it worked, That's why it's important to close a thread and share the solution.

Maybe you open a new thread and tag me, what is your exact problem is it a kerberized cluster?

Please let me know

Explorer

At this time, with Spark 1.6, even with the "conf" line added, my streaming job was not very stable.

After a migration to Spark 2.1 (same HDP 2.3.4.7), I noticed that the streaming job is nowadays very stable and resilient.

I now use these conf lines :

--conf "spark.shuffle.service.enabled=true" \
   --conf "spark.dynamicAllocation.enabled=true" \
   --conf "spark.dynamicAllocation.initialExecutors=1" \
   --conf "spark.dynamicAllocation.minExecutors=1" \
   --conf "spark.dynamicAllocation.maxExecutors=20" \
   --conf "spark.yarn.maxAppAttempts=4" \
   --conf "spark.yarn.am.attemptFailuresValidityInterval=1h" \
   --conf "spark.yarn.max.executor.failures=8" \
   --conf "spark.yarn.executor.failuresValidityInterval=1h" \
   --conf "spark.task.maxFailures=8" \
   --conf "spark.yarn.submit.waitAppCompletion=false" \
   --conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" \
   --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j.properties" \
   --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf -Dlog4j.configuration=log4j.properties" \

Jean-François

Mentor

@Jean-François Vandemoortele

Thanks for reverting to @kanna k question, I would just like to know your initial problem org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 7140339for xxxxxx) can't be found in cache was it resolved with the jaas.conf file because that was specific to the Kerberos error above.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.