Created on 03-14-2019 03:57 AM - edited 09-16-2022 07:13 AM
Hi,
in a two cluster environment where each cluster has its own KDC and between those KDC a trust is configured I cannot read data via Spark. I am missing some property of the spark-shell or spark-submit?
Local HDFS: devhanameservice
Remote HDFS: hanameservice
Running a hdfs ls from dev and listing prod works fine:
[centos@<dev-gateway> ~]$ hdfs dfs -ls hdfs://hanameservice/tmp Found 6 items d--------- - hdfs supergroup 0 2019-03-14 11:47 hdfs://hanameservice/tmp/.cloudera_health_monitoring_canary_files ...
But trying to access the remote file in the remote HDFS in spark-shell returns this:
[centos@<dev-gateway> ~]$ spark2-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://<dev-gateway>.eu-west-1.compute.internal:4040
Spark context available as 'sc' (master = yarn, app id = application_1552545238536_0261).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera4
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val t = sc.textFile("hdfs://hanameservice/tmp/external/test/file.csv")
t: org.apache.spark.rdd.RDD[String] = hdfs://hanameservice/tmp/external/test/file.csv MapPartitionsRDD[1] at textFile at <console>:24
scala> t.count()
[Stage 0:> (0 + 1) / 28]19/03/14 11:45:04 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <worker-node>, executor 28): java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: "<worker-node>/10.85.150.22"; destination host is: "<remote-name-node>":8020;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772)
at org.apache.hadoop.ipc.Client.call(Client.java:1508)
at org.apache.hadoop.ipc.Client.call(Client.java:1441)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:268)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy19.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1324)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1311)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1299)
at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:315)
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:280)
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:267)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1630)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:339)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:335)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:335)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:784)
at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:109)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:256)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
I am able to run mapreduce jobs with this property:
mapreduce.job.hdfs-servers.token-renewal.exclude=hanameservice
Is it something I should put to spark settings? And if yes, how?
Thanks
Created 04-10-2019 08:25 AM
Created 09-24-2021 03:53 AM
Hi @Tomas79
While launching spark-shell, you need to add spark.yarn.access.hadoopFileSystems parameter. And also ensure to add dfs.namenode.kerberos.principal.pattern parameter value * in core-site.xml file.
For example,
# spark-shell --conf spark.yarn.access.hadoopFileSystems="hdfs://c1441-node2.coelab.cloudera.com:8020"
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/09/24 07:23:25 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
Spark context Web UI available at http://c2441-node2.supportlab.cloudera.com:4040
Spark context available as 'sc' (master = yarn, app id = application_1632395260786_0004).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0.7.1.6.0-297
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_232)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val textDF = spark.read.textFile("hdfs://c1441-node2.coelab.cloudera.com:8020/tmp/ranga_clusterb_test.txt")
textDF: org.apache.spark.sql.Dataset[String] = [value: string]
scala> textDF.show(false)
+---------------------+
|value |
+---------------------+
|Hello Ranga, |
| |
+---------------------+