Cloudera Labs
Provide feedback on Cloudera Labs
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

New in Cloudera Labs: SparkOnHBase

Highlighted

New in Cloudera Labs: SparkOnHBase

Master Collaborator

Announcing the newest component in Cloudera Labs: SparkOnHBase

 

SparkOnHBase is a simple reusable library for working with HBase and Spark. Among other things, it makes HBase connections seamless, allows you to do any combination of HBase operations on an RDD, and more.

 

Read more here.

6 REPLIES 6

Re: New in Cloudera Labs: SparkOnHBase

New Contributor

Have you guys been able to figure out how to get this working on a secure HBase in --deploy-mode cluster ?

I've only been able to get it to work in client mode.

 

In cluster mode since the driver runs in the AM, I don't see any way for it to acquire the right credentials.

 

Thoughts?

Re: New in Cloudera Labs: SparkOnHBase

Cloudera Employee

Hey @ramblingpolak,

 

I would have to see the exception.  The only issue I can think of off hand would be a configuration difference from the edge node to the datanode.

 

Send the output of yarn logs -applicationId XYZ and I will see what I can do.

 

Ted Malaska

Re: New in Cloudera Labs: SparkOnHBase

New Contributor

I've found a way to make this work. But first, the problem that I ran into...

 

When running spark with --deploy-mode client everything is kosher because you have run kinit on your edge node and have a valid TGT. The driver code (your Spark job) runs on the edge node and is able to obtain a delegation token for HBase. These credentials are then sent over to executors in the broadcast variable. 

 

However, when you run spark with --deploy-mode cluster your driver code is run within the application master which is launched on some nodemanager. This means that  you'll get the error below when you hit the code trying to request an HBase delegation token because you're not authenticated on the nodemanager your driver code happens to launch on.

 

This is avoided when accessing things like HDFS because the org.apache.spark.deploy.yarn.Client (which submits your job to YARN) is aware of HDFS and requests the proper tokens. It does not do this for HBase. By the time your driver code is reached, it's too late because its no longer running on the node you're running spark-submit on.

 

15/01/15 22:45:49 WARN security.UserGroupInformation: PriviledgedActionException as:adam (auth:SIMPLE) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
15/01/15 22:45:49 WARN ipc.RpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
15/01/15 22:45:49 ERROR ipc.RpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
	at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
	at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
	at org.apache.hadoop.hbase.ipc.RpcClient$Connection.setupSaslConnection(RpcClient.java:770)
	at org.apache.hadoop.hbase.ipc.RpcClient$Connection.access$600(RpcClient.java:357)
	at org.apache.hadoop.hbase.ipc.RpcClient$Connection$2.run(RpcClient.java:891)
	at org.apache.hadoop.hbase.ipc.RpcClient$Connection$2.run(RpcClient.java:888)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
	at org.apache.hadoop.hbase.ipc.RpcClient$Connection.setupIOstreams(RpcClient.java:888)
	at org.apache.hadoop.hbase.ipc.RpcClient.getConnection(RpcClient.java:1543)
	at org.apache.hadoop.hbase.ipc.RpcClient.call(RpcClient.java:1442)
	at org.apache.hadoop.hbase.ipc.RpcClient.callBlockingMethod(RpcClient.java:1661)
	at org.apache.hadoop.hbase.ipc.RpcClient$BlockingRpcChannelImplementation.callBlockingMethod(RpcClient.java:1719)
	at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.execService(ClientProtos.java:30352)
	at org.apache.hadoop.hbase.protobuf.ProtobufUtil.execService(ProtobufUtil.java:1623)
	at org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel$1.call(RegionCoprocessorRpcChannel.java:93)
	at org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel$1.call(RegionCoprocessorRpcChannel.java:90)
	at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
	at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)
	at org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel.callExecService(RegionCoprocessorRpcChannel.java:96)
	at org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel.callBlockingMethod(CoprocessorRpcChannel.java:74)
	at org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos$AuthenticationService$BlockingStub.getAuthenticationToken(AuthenticationProtos.java:4512)
	at org.apache.hadoop.hbase.security.token.TokenUtil.obtainToken(TokenUtil.java:62)
	at org.apache.hadoop.hbase.security.token.TokenUtil$1.run(TokenUtil.java:98)
	at org.apache.hadoop.hbase.security.token.TokenUtil$1.run(TokenUtil.java:96)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
	at org.apache.hadoop.hbase.security.token.TokenUtil.obtainAndCacheToken(TokenUtil.java:95)

 

So, without modifying the spark YARN client and making it HBase aware the only solution I found was to rely on for each user who wishes to submit a Spark YARN HBase app they must distribute a keytab to each node running a nodemanager (e.g. via ansible / scp).

 

In the you can then authenticate via the keytab to get a TGT and thus are allowed to obtain the proper HBase tokens.

Re: New in Cloudera Labs: SparkOnHBase

Cloudera Employee

Cool, so the problem is I'm getting the key tab in the driver and the key tab is not on the datanode.  I will look into how to get the token given to Spark through spark-submit.

 

 

Re: New in Cloudera Labs: SparkOnHBase

Cloudera Employee

So I updated the code today to work with 5.3 and made a work around for broadcast variables breaking in Spark Streaming.  But I haven't been able to figure out how to get the HBase creds without running in client mode.

 

I'm still working on it.  Let me know if you have any ideas.

Re: New in Cloudera Labs: SparkOnHBase

New Contributor

My solution was to ensure that the user you're running as has an accessible kerberos keytab on each nodemanager.

 
e.g. /home/adam/adam.keytab on all machines running a nodemanager.
 
In that case i've modified my code to rely on the api for keytab auth to provide credentials..
 
Snippet below. This is part of the Kiji Spark integration I'm working on and should be open sourced in the next couple weeks
 
/** Provides Kiji-specific methods on `SparkContext` */
class SparkContextFunctions(@transient val sc: SparkContext) extends Serializable {

  import SparkContextFunctions._

  /** Returns a view of a Kiji table as `KijiRDD[T]`.
    * This method is made available on `SparkContext` by importing `org.kiji.spark._`
    *
    * @param uri A KijiURI.
    * @param dataRequest A KijiDataRequest.
    * @param vClass ??? Need to talk to Adam.
    * @return An instance of a KijiRDD.
    */
  def kijiRDD[T](uri: KijiURI, dataRequest: KijiDataRequest, vClass: Class[_ <: T]): KijiRDD[T] = {
    val authMode = sc.hadoopConfiguration.get("hbase.security.authentication")
    Log.info(s"Running with $authMode authentication.")

    UserGroupInformation.setConfiguration(sc.hadoopConfiguration)

    val sparkConf = sc.getConf

    val kerberosUsername = sparkConf.getOption("spark.kiji.kerberos.username")
    val keytab = sparkConf.getOption("spark.kiji.kerberos.keytab")

    // If the user specified both properties, then attempt to authenticate
    val ugi = if (kerberosUsername.nonEmpty && keytab.nonEmpty) {
      val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
        kerberosUsername.get,
        keytab.get
      )
      // Even if we authenticated, only request a token if security is enabled.
      if (UserGroupInformation.isSecurityEnabled) {
        TokenUtil.obtainAndCacheToken(sc.hadoopConfiguration, ugi)
        Log.info("Obtained and cached auth token for HBase.")
      }
      ugi
    } else {
      // Otherwise assume we are either on a non-secure cluster or the HBase auth token
      // has already been cached by the user.
      UserGroupInformation.getCurrentUser
    }

    val credentials = ugi.getCredentials
    KijiRDD(sc, sc.hadoopConfiguration, credentials, uri, dataRequest).asInstanceOf[KijiRDD[T]]
  }
}