Support Questions

Find answers, ask questions, and share your expertise

Running Spark job to query Hive HBase tables in a Kerberized cluster

avatar
New Contributor

Hi,

I am trying to run a Spark 1.6 job (written in Java) on Kerberized cluster.

Through the job I am trying to read data from a Hive table which uses HBase for its storage.

SparkConf conf = new SparkConf();

JavaSparkContext context = new JavaSparkContext(conf);

HiveContext hiveContext = new HiveContext(context.sc());

hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict"); hiveContext.setConf("spark.sql.hive.convertMetastoreOrc", "false");

hiveContext.setConf("spark.sql.caseSensitive","false");

DataFrame df = hiveContext.sql(task.getQuery());

df.show(100);

I am using below spark-sumbit command to run the job on YARN:

spark-submit --master yarn --deploy-mode cluster --class <Main class name> --num-executors 2 --executor-cores 1 --executor-memory 1g --driver-memory 1g --jars application.json,/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar,/usr/hdp/current/hbase-client/lib/hbase-client.jar,/usr/hdp/current/hbase-client/lib/hbase-common.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol.jar,/usr/hdp/current/hbase-client/lib/hbase-server.jar,/usr/hdp/current/hive-client/lib/hive-hbase-handler.jar,/usr/hdp/current/spark-client/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/current/spark-client/lib/datanucleus-rdbms-3.2.9.jar,/usr/hdp/current/spark-client/lib/datanucleus-core-3.2.10.jar,/etc/hbase/conf/hbase-site.xml,/usr/hdp/current/spark-client/conf/hive-site.xml data-refiner-1.0.jar

I have already performed a kinit before running the job.

The job is able to communicate with Hive meta-store and parse the query

17/04/05 06:15:23 INFO ParseDriver: Parsing command: SELECT * FROM <db_name>.<table_name>

17/04/05 06:15:24 INFO ParseDriver: Parse Completed

But when trying to communicate with HBase to get data it is failing with below exception:

17/04/05 06:15:26 WARN AbstractRpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 17/04/05 06:15:26 ERROR AbstractRpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:611) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:156) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:737) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:734) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:734) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:887) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:856) at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1199) at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:213) at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:287) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.execService(ClientProtos.java:32765) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.execService(ProtobufUtil.java:1627) at org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel$1.call(RegionCoprocessorRpcChannel.java:104) at org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel$1.call(RegionCoprocessorRpcChannel.java:94) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:126) at org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel.callExecService(RegionCoprocessorRpcChannel.java:107) at org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel.callBlockingMethod(CoprocessorRpcChannel.java:73) at org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos$AuthenticationService$BlockingStub.getAuthenticationToken(AuthenticationProtos.java:4512) at org.apache.hadoop.hbase.security.token.TokenUtil.obtainToken(TokenUtil.java:86) at org.apache.hadoop.hbase.security.token.TokenUtil$1.run(TokenUtil.java:111) at org.apache.hadoop.hbase.security.token.TokenUtil$1.run(TokenUtil.java:108) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724) at org.apache.hadoop.hbase.security.User$SecureHadoopUser.runAs(User.java:313) at org.apache.hadoop.hbase.security.token.TokenUtil.obtainToken(TokenUtil.java:108) at org.apache.hadoop.hbase.security.token.TokenUtil.addTokenForJob(TokenUtil.java:329) at org.apache.hadoop.hive.hbase.HBaseStorageHandler.addHBaseDelegationToken(HBaseStorageHandler.java:496) at org.apache.hadoop.hive.hbase.HBaseStorageHandler.configureTableJobProperties(HBaseStorageHandler.java:441) at org.apache.hadoop.hive.hbase.HBaseStorageHandler.configureInputJobProperties(HBaseStorageHandler.java:342) at org.apache.spark.sql.hive.HiveTableUtil$.configureJobPropertiesForStorageHandler(TableReader.scala:304) at org.apache.spark.sql.hive.HadoopTableReader$.initializeLocalJobConfFunc(TableReader.scala:323) at org.apache.spark.sql.hive.HadoopTableReader$anonfun$12.apply(TableReader.scala:276) at org.apache.spark.sql.hive.HadoopTableReader$anonfun$12.apply(TableReader.scala:276) at org.apache.spark.rdd.HadoopRDD$anonfun$getJobConf$6.apply(HadoopRDD.scala:174) at org.apache.spark.rdd.HadoopRDD$anonfun$getJobConf$6.apply(HadoopRDD.scala:174) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:174) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:195) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$anonfun$org$apache$spark$sql$DataFrame$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.DataFrame$anonfun$org$apache$spark$sql$DataFrame$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$execute$1(DataFrame.scala:1498) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$collect(DataFrame.scala:1505) at org.apache.spark.sql.DataFrame$anonfun$head$1.apply(DataFrame.scala:1375) at org.apache.spark.sql.DataFrame$anonfun$head$1.apply(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311) at com.hpe.eap.batch.EAPDataRefinerMain.main(EAPDataRefinerMain.java:88) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497)

The job runs fine when we query a normal Hive table and also on non-kerberized cluster.

Kindly suggest if we need to modify any configuration parameter/code changes to resolve the issue.

1 ACCEPTED SOLUTION

avatar
Guru

@Sudeep Mishra, In secure env, you need to add all Hbase dependent jars to SPARK CLASSPATH. Add this configuration to spark-env.sh.

export SPARK_CLASSPATH=<List of Hbase jars separated by 😆

View solution in original post

4 REPLIES 4

avatar
Guru

@Sudeep Mishra, In secure env, you need to add all Hbase dependent jars to SPARK CLASSPATH. Add this configuration to spark-env.sh.

export SPARK_CLASSPATH=<List of Hbase jars separated by 😆

avatar
Super Collaborator

@Sudeep Mishra

Please pass the user keytab in along with spark-submit command.

--files /<key_tab_location>/<user_keytab.keytab>

This is due to the executors are not authenticated to extract the data from HBase Region servers or any other components.

by passing the keytab all the executors will have the key-tab and able to communicate

avatar
New Contributor

Hi, were you able to resolve this issue. Im also getting the same error and done all the above approach but it is still not resolved. And im only getting error in cluser mode, local mode working fine.

avatar
New Contributor

On Spark2, while we were unable to get it working by setting this in the SPARK_CLASSPATH, the fix did work by passing the same set of jars in the executor classpath.