Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Kerberos Cross Realm HDFS Access Via Spark Application

avatar
Explorer

We have a two cluster architecture A and B. Both the clusters are kerberised using MIT KDC with Ranger Enabled. Each cluster has its own KDC. We have set up Cross-realm authentication between the 2 KDC's. We are able to list and do distcp from cluster A to B and Vice versa.

We are trying to run a spark application in cluster A which write to cluster B HDFS (kerberised). We are able to run the application in LOCAL Mode and was able to write to cluster B's HDFS. But when we are trying to run the same in YARN-CLUSTER mode its failing with the AcessControlException (Caused by: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]). We dubug the Code and saw that the FileSystem Object created has SIMPLE authentication in case of yarn-cluster mode and the pricipal is just the username and KERBEROS Authentication in LOCAL mode and principal is the proper principal.

We understand that Yarn delegates token to start Executors and Driver. We am not sure if we are missing any configuration in spark or hdfs or in yarn.

Since its a Cross-Realm KDS's I am using cluser A's Principal and Keytab for Submitting the spark-application.

below the some properties which we have enabled in spark submit.

SPARK:

  • spark.yarn.access.namenodes=hdfs://mycluster02
  • spark.authenticate=true
  • spark.yarn.access.hadoopFileSystems=hdfs://mycluster02
  • spark.yarn.principal=username@DOMAIN.COM
  • spark.yarn.keytab=user.keytab

YARN:

  • hadoop.registry.client.auth=kerberos
  • yarn.resourcemanager.webapp.delegation-token-auth-filter.enabled=false

and all other yarn kerberos principals as keytabs are also set.

HDFS:

  • hadoop.security.authentication=kerberos
  • and all basic configuraion on kerberos enabling.

Below is the same code replicated from the application which runs on executor to create file System object.

Configuration conf = new Configuration();
conf.addResource(new Path(args[0] + "/core-site.xml"));
conf.addResource(new Path(args[0] + "/hdfs-site.xml")); list and do distcp
conf.set("hadoop.security.authentication", "kerberos");
FileSystem fs = FileSystem.get(conf);
FileStatus[] fsStatus = fs.listStatus(new Path("/"));

spark-submit --name "HDFS_APP_DATA" --master yarn-cluster --conf "spark.yarn.access.namenodes=hdfs://mycluster02" --conf "spark.authenticate=true" --conf "spark.yarn.access.hadoopFileSystems=hdfs://mycluster02" --conf "spark.yarn.principal=user@EXAMPLE.COM" --conf "spark.yarn.keytab=/home/user/hdfs_test/user.princ.keytab" --driver-memory 2g --executor-memory 3g --num-executors 1 --executor-cores 1 --class com.test.spark.kafka.batch.util.HDFSApp spark-batch-util-jar-with-dependencies.jar /config_files/

Exception:

18/05/23 16:15:38 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, hostname.org, partition 1,PROCESS_LOCAL, 2092 bytes)
    18/05/23 16:15:38 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hostname.org): java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: "hostname.org/xx.xx.xx.xx"; destination host is: "hostname.org":8020; 
        at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:785)
        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1558)
        at org.apache.hadoop.ipc.Client.call(Client.java:1498)
        at org.apache.hadoop.ipc.Client.call(Client.java:1398)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
        at com.sun.proxy.$Proxy12.getListing(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:625)
        at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185)
        at com.sun.proxy.$Proxy13.getListing(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2143)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2126)
        at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919)
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114)
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:985)
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:981)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:992)
        at com.example.spark.kafka.batch.util.HDFSApp$1.call(HDFSApp.java:51)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$34.apply(RDD.scala:919)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$34.apply(RDD.scala:919)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1857)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1857)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
        at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:720)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
        at org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:683)
        at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:770)
        at org.apache.hadoop.ipc.Client$Connection.access$3200(Client.java:397)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1620)
        at org.apache.hadoop.ipc.Client.call(Client.java:1451)
        ... 34 more
    Caused by: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
        at org.apache.hadoop.security.SaslRpcClient.selectSaslClient(SaslRpcClient.java:172)
        at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:396)
        at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:595)
        at org.apache.hadoop.ipc.Client$Connection.access$2000(Client.java:397)
        at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:762)
        at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:758)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
        at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:758)
        ... 37 more
1 ACCEPTED SOLUTION

avatar
Rising Star

Hi @Pramod GM,

- Is myclusterA also in NN HA configuration?

- Can you do a quick test and replace the myclusterB nameservice, with the clusterB Active NN fqdn:port? If it works then you might one to check the document mentioned below, Step 1 to be more specific.

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_administration/content/distcp_between_ha...

- Try to avoid this, as they are cluster provided config files/properties (in YARN mode):

  1. conf.addResource(newPath(args[0]+"/core-site.xml"));
  2. conf.addResource(newPath(args[0]+"/hdfs-site.xml")); list anddo distcp
  3. conf.set("hadoop.security.authentication","kerberos");

- These properties are particular to SparkX version:

  • (Spark1.x-2.1) spark.yarn.access.namenodes=hdfs://mycluster02
  • (Spark2.2+) spark.yarn.access.hadoopFileSystems=hdfs://mycluster02

- Are both clusters on the same version?

- Does it fails from B to A as well?

Basically, what you want is to make sure that you have followed the "distcp between ha clusters" setup, that both clusters are in NN HA, to you mention the appropriate spark.yarn.access.namenode/hadoopFileSystems property via --conf spark.yarn.access.namenode/hadoopFileSystems=$remoteClusterNameservice based on your spark version, and to use config files provided by your hadoop cluster.

View solution in original post

7 REPLIES 7

avatar

@Pramod GM Have you tried yarn-client mode? I would recommend you test using spark-shell with same configuration arguments and see if running a simple sc.textFile("hdfs://...") works or not. Try to point directly to the active NN with port and without port. Are both clusters Name nodes configured in HA?

HTH

avatar
Explorer

Yes @Felix Albani Yarn-client mode also fetches me the same error. After adding the spark property suggested by @dbompart I am able to list as well as write file across clusters.

avatar
Rising Star

Hi @Pramod GM,

- Is myclusterA also in NN HA configuration?

- Can you do a quick test and replace the myclusterB nameservice, with the clusterB Active NN fqdn:port? If it works then you might one to check the document mentioned below, Step 1 to be more specific.

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_administration/content/distcp_between_ha...

- Try to avoid this, as they are cluster provided config files/properties (in YARN mode):

  1. conf.addResource(newPath(args[0]+"/core-site.xml"));
  2. conf.addResource(newPath(args[0]+"/hdfs-site.xml")); list anddo distcp
  3. conf.set("hadoop.security.authentication","kerberos");

- These properties are particular to SparkX version:

  • (Spark1.x-2.1) spark.yarn.access.namenodes=hdfs://mycluster02
  • (Spark2.2+) spark.yarn.access.hadoopFileSystems=hdfs://mycluster02

- Are both clusters on the same version?

- Does it fails from B to A as well?

Basically, what you want is to make sure that you have followed the "distcp between ha clusters" setup, that both clusters are in NN HA, to you mention the appropriate spark.yarn.access.namenode/hadoopFileSystems property via --conf spark.yarn.access.namenode/hadoopFileSystems=$remoteClusterNameservice based on your spark version, and to use config files provided by your hadoop cluster.

avatar
Explorer

Yes @dbompart both the Clusters are in HA Configuration and running HDP 2.6.3. we added the property spark.yarn.access.namenodes in spark submit. Spark version was 1.6. Now we are able to list the contents as well as Write files also across 2 clusters Thank you. But even after that we are still confused why the FileSystem object has SIMPLE Authentication not KERBEROS Athenticaion? which is available in LOCAL Mode.

avatar
Rising Star

I'm not following that part, but do you mean as in if you have ipc.client.fallback-to-simple-auth-allowed=true, it first tries with Kerberos, fails and falls back to SIMPLE and fails as well as expected?

avatar
Explorer

No in the first time only it's trying to connect via SIMPLE Authentication and its working if we are setting the spark.yarn.access.namenodes property. The property ipc.client.fallback-to-simple-auth-allowed is set to FALSE. It Internally uses UGI to login and get the authentication type.In yarn mode the principal being used in UGI is the username (which is, the user in which executor is initialized) without the domain Name(NOT the principal). It validates it and gives out SIMPLE Authentication. But at the end its getting the job done. I am confused why its still SIMPLE. But in the LOCAL mode the username is the valid principal and the authentication is KERBEROS. I'm talking about the internal part in creating the FileSystem Object(Java). Is it because yarn is delegating tokens and initializing the JVM's?

Eg: the Field user in CLASS:UserGroupInformation is user1 and authenticationMethod is SIMPLE in yarn-cluster mode. Where as in LOCAL mode it is user1@EXAMPLE.COM and authenticationMethod is KERBEROS.

The user1 is the user we are suing the spark-submit in both scenario. And containers are also being launched from user1.

avatar

Hi @dbompart I tried to do:


spark2-shell --conf spark.yarn.access.hadoopFileSystems=hdfs://ip-10-85-54-144.eu-west-1.compute.internal:8020


but it fails to launch with error: Failed to renew token: Kind: HDFS_DELEGATION_TOKEN, Service: 10.85.54.144:8020, Ident: (token for dssetl: HDFS_DELEGATION_TOKEN .....

I had this issue before in distcp, and it was resolved in distcp by setting mapreduce.job.hdfs-servers.token-renewal.exclude=ip-10-85-54-144.eu-west-1.compute.internal


How can I set this in spark2-shell too? And how can I point the spark2-shell to use the custom conf files?

Thanks