Created on 03-14-2019 03:57 AM - edited 09-16-2022 07:13 AM
Hi,
in a two cluster environment where each cluster has its own KDC and between those KDC a trust is configured I cannot read data via Spark. I am missing some property of the spark-shell or spark-submit?
Local HDFS: devhanameservice
Remote HDFS: hanameservice
Running a hdfs ls from dev and listing prod works fine:
[centos@<dev-gateway> ~]$ hdfs dfs -ls hdfs://hanameservice/tmp Found 6 items d--------- - hdfs supergroup 0 2019-03-14 11:47 hdfs://hanameservice/tmp/.cloudera_health_monitoring_canary_files ...
But trying to access the remote file in the remote HDFS in spark-shell returns this:
[centos@<dev-gateway> ~]$ spark2-shell Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://<dev-gateway>.eu-west-1.compute.internal:4040 Spark context available as 'sc' (master = yarn, app id = application_1552545238536_0261). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera4 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191) Type in expressions to have them evaluated. Type :help for more information. scala> val t = sc.textFile("hdfs://hanameservice/tmp/external/test/file.csv") t: org.apache.spark.rdd.RDD[String] = hdfs://hanameservice/tmp/external/test/file.csv MapPartitionsRDD[1] at textFile at <console>:24 scala> t.count() [Stage 0:> (0 + 1) / 28]19/03/14 11:45:04 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <worker-node>, executor 28): java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: "<worker-node>/10.85.150.22"; destination host is: "<remote-name-node>":8020; at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772) at org.apache.hadoop.ipc.Client.call(Client.java:1508) at org.apache.hadoop.ipc.Client.call(Client.java:1441) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:268) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) at com.sun.proxy.$Proxy19.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1324) at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1311) at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1299) at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:315) at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:280) at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:267) at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1630) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:339) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:335) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:335) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:784) at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:109) at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67) at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257) at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:256) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
I am able to run mapreduce jobs with this property:
mapreduce.job.hdfs-servers.token-renewal.exclude=hanameservice
Is it something I should put to spark settings? And if yes, how?
Thanks
Created 04-10-2019 08:25 AM
Created 09-24-2021 03:53 AM
Hi @Tomas79
While launching spark-shell, you need to add spark.yarn.access.hadoopFileSystems parameter. And also ensure to add dfs.namenode.kerberos.principal.pattern parameter value * in core-site.xml file.
For example,
# spark-shell --conf spark.yarn.access.hadoopFileSystems="hdfs://c1441-node2.coelab.cloudera.com:8020"
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/09/24 07:23:25 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
Spark context Web UI available at http://c2441-node2.supportlab.cloudera.com:4040
Spark context available as 'sc' (master = yarn, app id = application_1632395260786_0004).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0.7.1.6.0-297
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_232)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val textDF = spark.read.textFile("hdfs://c1441-node2.coelab.cloudera.com:8020/tmp/ranga_clusterb_test.txt")
textDF: org.apache.spark.sql.Dataset[String] = [value: string]
scala> textDF.show(false)
+---------------------+
|value |
+---------------------+
|Hello Ranga, |
| |
+---------------------+