Created 04-06-2017 09:08 AM
Hi,
I am trying to run a Spark 1.6 job (written in Java) on Kerberized cluster.
Through the job I am trying to read data from a Hive table which uses HBase for its storage.
SparkConf conf = new SparkConf();
JavaSparkContext context = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(context.sc()); hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict"); hiveContext.setConf("spark.sql.hive.convertMetastoreOrc", "false"); hiveContext.setConf("spark.sql.caseSensitive","false"); DataFrame df = hiveContext.sql(task.getQuery()); df.show(100); |
I am using below spark-sumbit command to run the job on YARN:
spark-submit --master yarn --deploy-mode cluster --class <Main class name> --num-executors 2 --executor-cores 1 --executor-memory 1g --driver-memory 1g --jars application.json,/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar,/usr/hdp/current/hbase-client/lib/hbase-client.jar,/usr/hdp/current/hbase-client/lib/hbase-common.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol.jar,/usr/hdp/current/hbase-client/lib/hbase-server.jar,/usr/hdp/current/hive-client/lib/hive-hbase-handler.jar,/usr/hdp/current/spark-client/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/current/spark-client/lib/datanucleus-rdbms-3.2.9.jar,/usr/hdp/current/spark-client/lib/datanucleus-core-3.2.10.jar,/etc/hbase/conf/hbase-site.xml,/usr/hdp/current/spark-client/conf/hive-site.xml data-refiner-1.0.jar |
I have already performed a kinit before running the job.
The job is able to communicate with Hive meta-store and parse the query
17/04/05 06:15:23 INFO ParseDriver: Parsing command: SELECT * FROM <db_name>.<table_name> 17/04/05 06:15:24 INFO ParseDriver: Parse Completed |
But when trying to communicate with HBase to get data it is failing with below exception:
17/04/05 06:15:26 WARN AbstractRpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 17/04/05 06:15:26 ERROR AbstractRpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:611) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:156) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:737) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:734) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:734) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:887) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:856) at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1199) at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:213) at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:287) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.execService(ClientProtos.java:32765) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.execService(ProtobufUtil.java:1627) at org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel$1.call(RegionCoprocessorRpcChannel.java:104) at org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel$1.call(RegionCoprocessorRpcChannel.java:94) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:126) at org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel.callExecService(RegionCoprocessorRpcChannel.java:107) at org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel.callBlockingMethod(CoprocessorRpcChannel.java:73) at org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos$AuthenticationService$BlockingStub.getAuthenticationToken(AuthenticationProtos.java:4512) at org.apache.hadoop.hbase.security.token.TokenUtil.obtainToken(TokenUtil.java:86) at org.apache.hadoop.hbase.security.token.TokenUtil$1.run(TokenUtil.java:111) at org.apache.hadoop.hbase.security.token.TokenUtil$1.run(TokenUtil.java:108) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724) at org.apache.hadoop.hbase.security.User$SecureHadoopUser.runAs(User.java:313) at org.apache.hadoop.hbase.security.token.TokenUtil.obtainToken(TokenUtil.java:108) at org.apache.hadoop.hbase.security.token.TokenUtil.addTokenForJob(TokenUtil.java:329) at org.apache.hadoop.hive.hbase.HBaseStorageHandler.addHBaseDelegationToken(HBaseStorageHandler.java:496) at org.apache.hadoop.hive.hbase.HBaseStorageHandler.configureTableJobProperties(HBaseStorageHandler.java:441) at org.apache.hadoop.hive.hbase.HBaseStorageHandler.configureInputJobProperties(HBaseStorageHandler.java:342) at org.apache.spark.sql.hive.HiveTableUtil$.configureJobPropertiesForStorageHandler(TableReader.scala:304) at org.apache.spark.sql.hive.HadoopTableReader$.initializeLocalJobConfFunc(TableReader.scala:323) at org.apache.spark.sql.hive.HadoopTableReader$anonfun$12.apply(TableReader.scala:276) at org.apache.spark.sql.hive.HadoopTableReader$anonfun$12.apply(TableReader.scala:276) at org.apache.spark.rdd.HadoopRDD$anonfun$getJobConf$6.apply(HadoopRDD.scala:174) at org.apache.spark.rdd.HadoopRDD$anonfun$getJobConf$6.apply(HadoopRDD.scala:174) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:174) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:195) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$anonfun$org$apache$spark$sql$DataFrame$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.DataFrame$anonfun$org$apache$spark$sql$DataFrame$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$execute$1(DataFrame.scala:1498) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$collect(DataFrame.scala:1505) at org.apache.spark.sql.DataFrame$anonfun$head$1.apply(DataFrame.scala:1375) at org.apache.spark.sql.DataFrame$anonfun$head$1.apply(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311) at com.hpe.eap.batch.EAPDataRefinerMain.main(EAPDataRefinerMain.java:88) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497)
The job runs fine when we query a normal Hive table and also on non-kerberized cluster.
Kindly suggest if we need to modify any configuration parameter/code changes to resolve the issue.
Created 05-11-2017 08:27 PM
@Sudeep Mishra, In secure env, you need to add all Hbase dependent jars to SPARK CLASSPATH. Add this configuration to spark-env.sh.
export SPARK_CLASSPATH=<List of Hbase jars separated by 😆
Created 05-11-2017 08:27 PM
@Sudeep Mishra, In secure env, you need to add all Hbase dependent jars to SPARK CLASSPATH. Add this configuration to spark-env.sh.
export SPARK_CLASSPATH=<List of Hbase jars separated by 😆
Created 05-16-2017 10:11 PM
Please pass the user keytab in along with spark-submit command.
--files /<key_tab_location>/<user_keytab.keytab>
This is due to the executors are not authenticated to extract the data from HBase Region servers or any other components.
by passing the keytab all the executors will have the key-tab and able to communicate
Created 03-19-2018 12:43 PM
Hi, were you able to resolve this issue. Im also getting the same error and done all the above approach but it is still not resolved. And im only getting error in cluser mode, local mode working fine.
Created 09-06-2018 02:30 PM
On Spark2, while we were unable to get it working by setting this in the SPARK_CLASSPATH, the fix did work by passing the same set of jars in the executor classpath.