Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar

Spark Thrift Server is a service that allows JDBC and ODBC clients to run Spark SQL queries on Apache Spark.

By default, Spark Thrift Server runs queries under the identity of the operating system account running the Spark Thrift Server. In a multi-user environment, queries often need to run under the identity of the end user who originated the query; this capability is called "user impersonation".

This article describes the work done in HDP 2.6 to support user impersonation for the Spark Thrift Server. The feature is supported in HDP 2.5.x and later versions, for Apache Spark 1 versions 1.6.3 and later.

When user impersonation is enabled, Spark Thrift Server runs queries as the submitting user. By running queries under the user account associated with the submitter, the Thrift Server can enforce user level permissions and access control lists. Associated data cached in Spark is visible only to queries from the submitting user.

User impersonation enables granular access control to Spark SQL at the level of files or tables. For finer-grained access control, such as row- or column-level security, see this article.

The user impersonation feature is controlled with a property called doAs. When doAs is set to true, Spark Thrift Server launches an on-demand Spark application to handle user queries. These queries are shared only with connections from the same user. Spark Thrift Server forwards incoming queries to the appropriate Spark application for execution, making the Spark Thrift Server extremely lightweight: it merely acts as a proxy to forward requests and responses. When all user connections for a Spark application are closed at the Spark Thrift Server, the corresponding Spark application also terminates.

Pre-requisites

If storage based authorization is to be enabled, please follow instructions from Hive documentation.

Configuring and Enabling User Impersonation

To enable user impersonation for the Spark Thrift Server on an Ambari-managed cluster, complete the following steps:

  1. Enable doAs support. Navigate to the “Advanced spark-hive-site-override” section and set
    hive.server2.enable.doAs=true
  2. Add DataNucleus jars to the Spark Thrift Server classpath. Navigate to the “Custom spark-thrift-sparkconf” section and add:
    spark.jars=/usr/hdp/current/spark-thriftserver/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/current/spark-thriftserver/lib/datanucleus-core-3.2.10.jar,/usr/hdp/current/spark-thriftserver/lib/datanucleus-rdbms-3.2.9.jar
  3. (Optional) Disable Spark yarn application for Spark Thrift Server master. Navigate to the “Advanced spark-thrift-sparkconf” section and set
    spark.master=local

    This prevents launching an unused spark-client HiveThriftServer2 application master.

  4. Restart the Spark Thrift Server.

To enable user impersonation for the Spark Thrift Server on a cluster not managed by Ambari, complete the following steps:

  1. Enable doAs support. Add the following setting to the /usr/hdp/current/spark-thriftserver/conf/hive-site.xml file:
    <property>
      <name>hive.server2.enable.doAs</name>
      <value>true</value>
    </property>
  2. Add DataNucleus jars to the Spark Thrift Server classpath. Add the following setting to the /usr/hdp/current/spark-thriftserver/conf/spark-thrift-sparkconf.conf file:
    spark.jars=/usr/hdp/current/spark-thriftserver/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/current/spark-thriftserver/lib/datanucleus-core-3.2.10.jar,/usr/hdp/current/spark-thriftserver/lib/datanucleus-rdbms-3.2.9.jar
  3. (Optional) Disable Spark yarn application for Spark Thrift Server master. Add the following setting to the /usr/hdp/current/spark-thriftserver/conf/spark-thrift-sparkconf.conf file:
    spark.master=local

    This prevents launching an unused spark-client HiveThriftServer2 application master.

  4. Restart the Spark Thrift Server.

Impersonation in Action

Permission and ACL Enforcement

When doAs is enabled, permissions and ACL restrictions are applied on behalf of the submitting user. In the following example, “foo_db” database has a table “drivers”, which only user “foo” can access:

15147-initial-tables-acl.png

A Beeline session running as user “foo” can access the data, read the drivers table, and create a new table based on the table:

15148-basic-beeline.png

The Spark queries run in a YARN application as user “foo”:

15149-foo-am-spark-app.png

Hence all user permissions and acls are enforced while accessing tables, data or other resources. In addition, all output generated will be for user “foo”.

For the table created in the preceding Beeline session, the owner is user “foo”:

15150-tables-acl.png

The per-user Spark Application Master ("AM") also gives us the ability to cache data in memory without other users accessing it. The cached data and state are restricted to the Spark AM running the query. The data and state are not in Spark Thrift Server, hence they are not visible to other users.

Spark Thrift Server as Proxy

Spark Thrift Server does not execute the actual user queries, but forwards them to the appropriate user-specific Spark AM.

15151-multiple-am-spark-app.png

This improves the scalability and fault tolerance of Spark Thrift Server.

When doAs is enabled for the Spark Thrift Server, the Thrift Server is responsible for the following features and capabilities:

  • Authorizing incoming user connections as per SASL rules.
  • Managing Spark applications launched on behalf of users:
    • Launch Spark application if no appropriate application exist for the incoming request.
    • Terminate Spark AM when all relevant user connections are closed at Spark Thrift Server.
  • Acting as a proxy and forwarding requests/responses to the appropriate user’s Spark AM.
  • Ensuring that users' long running Spark SQL sessions are supported, by keeping the Kerberos state valid.
    • Spark Thrift Server and Spark AM launched on behalf of user, can be long running applications in secure kerberized clusters.
    • We do not require the submitter’s principal/keytab for long running user Spark AM.
    • Note that Spark Thrift Server continues to require hive principal and keytab.

Enhancements for connection url support.

The connection url format for hive is documented here for reference. In doAs mode, we have enhanced Spark Thrift Server to support:

  • default database.
  • Hive var variables.

Default Database in Connection URL

Specifying the connection URL as “jdbc:hive2://$HOST:$PORT/ my_db” effectively results in an implicit “use my_db” when a user connects.

For an example, see the preceding Beeline session. The !connect command specified the connection URL for “foo_db”.

hive var variables support.

Hive variables can be used to parameterize queries.

To set a Hive variable, use the set hivevar command:

set hivevar:key=value

You can also set a Hive variable as part of the connection URL (similar to Hive connection URL format). In the following Beeline example, plan=miles is appended to the connection URL, and is referenced in the query as ${hivevar:plan}.

15152-hive-var.png

Advanced connection management

By default, all connections for a user are forwarded to the same user Spark AM, to execute queries. In some cases, it is necessary to exercise finer-grained control.

Named connections

In doAs mode, we support user-named connections--identified by user-specified connectionId--a Hive conf parameter in the connection URL.

Names connections are useful in scenarios when there is a need to override spark configuration, for example to override YARN queue, or specify a different memory/cores for Spark executors. Named connections are scoped to a user.

For a user, an explicitly specified connectionId can be used to control which Spark AM executes the queries issued. If unspecified, a default implicit connectionId is associated with the Spark AM.

If Spark Thrift Server is unable to find a Spark AM for the given (user, connectionId) combination, it launches a new Spark AM. If already available, the user connection is associated with the existing Spark AM.

For explicitly naming a connection, the Hive conf parameter name to be used is “ spark.sql.thriftServer.connectionId” as detailed in the example session below.

15153-beeline-connectionid.png

Every Spark AM managed by Spark Thrift Server is associated with a user and a connectionId. Connection Id’s are not globally unique; they are specific to the user.

Named connections allow users to specify their own Spark AM connections. They do not allow a user to access the Spark AM associated with another user.

15154-am-connectionid.png

Data sharing and Named connections

Each connectionId for a user identifies a different Spark AM.

For a user, cached data is shared and available only within a single AM, not across Spark AM’s.

Different user connections on the same Spark AM can leverage previously cached data. Each user connection has its own Hive session (which maintains the current database, Hive variables, and so on), but shares the underlying cached data, executors, and Spark application.

To illustrate, here is a session for the first connection from user “foo” to named connection “conn1”:

15155-user-conn1-data-sharing.png

As expected, after caching the ‘drivers’ table, the query runs an order of magnitude faster.

A second connection to the same connectionId from user “foo” is able to leverage the cached table from the other active Beeline session and significantly speed up query execution:

15156-user-conn2-data-sharing.png

Overriding Spark Configuration Settings

If Spark Thrift Server is unable to find an existing Spark AM for a user connection, it will launch a new Spark AM to service user queries. This is applicable to both named connections and for un-named connections.

When a new Spark AM is to be launched, you can override current Spark configuration settings by specifying them in the connection URL. Specify Spark configuration settings as hiveconf variables prepended by the ‘ sparkconf’ prefix:

15157-beeline-spark-conf.png

The following connection URL includes a spark.executor.memory setting of 4 GB:

jdbc:hive2://sandbox.hortonworks.com:10015/foo_db;principal=hive/_HOST@REALM.COM?spark.sql.thriftServer.connectionId=my_conn;sparkconf.spark.executor.memory=4g

The environment tab of the Spark application shows the appropriate value:

15158-ui-spark-conf.png

12,264 Views
Comments
avatar
Explorer

Does this paper apply to spark2.2 ?

avatar

Currently this applies only to Apache Spark 1.6 in HDP.

avatar
Explorer

Another question, following this paper, should I install livy and configure it? Following this paper, comes the following error:

7/09/11 17:15:10 ERROR rsc.RSCClient: Failed to connect to context. java.util.concurrent.TimeoutException: Timed out waiting for context to start. at org.sparkproject.shaded.livy.rsc.ContextLauncher.connectTimeout(ContextLauncher.java:133) at org.sparkproject.shaded.livy.rsc.ContextLauncher.access$200(ContextLauncher.java:62) at org.sparkproject.shaded.livy.rsc.ContextLauncher$2.run(ContextLauncher.java:121) at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

please help.

avatar

You dont need to install livy, the remote spark context library required for doAs support comes bundled with Spark thrift server 1.6 in HDP.

The default timeout while waiting for the spark session to come up is 90s, but you can increase it by setting 'server.connect.timeout' to a higher value in thrift server config - particularly if cluster is very busy and within 90 seconds a spark application cannot be launched.

avatar
Explorer

Does the livy support kerberos? And what mean about the following exceptions:

17/09/12 10:36:52 INFO rsc.ContextLauncher: 17/09/12 10:36:52 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2 17/09/12 10:36:52 INFO rsc.ContextLauncher: 17/09/12 10:36:52 INFO retry.RetryInvocationHandler: java.net.ConnectException: Call From xxx/10.2.26.xxx to xxx.hadoop.fat.qa.nt.xxx.com:8032 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused, while invoking ApplicationClientProtocolPBClientImpl.getClusterMetrics over rm2 after 1 failover attempts. Trying to failover after sleeping for 1339ms. 17/09/12 10:36:54 INFO rsc.ContextLauncher: 17/09/12 10:36:54 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm1 17/09/12 10:36:54 INFO rsc.ContextLauncher: 17/09/12 10:36:54 INFO retry.RetryInvocationHandler: org.apache.hadoop.security.authorize.AuthorizationException: User: op1@DC.SH.xxx.COM is not allowed to impersonate op1, while invoking ApplicationClientProtocolPBClientImpl.getClusterMetrics over rm1 after 2 failover attempts. Trying to failover after sleeping for 1130ms. 17/09/12 10:36:55 INFO rsc.ContextLauncher: 17/09/12 10:36:55 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2 17/09/12 10:36:55 INFO rsc.ContextLauncher: 17/09/12 10:36:55 INFO retry.RetryInvocationHandler: java.net.ConnectException: Call From xxxx/10.2.26.xxx to xxx.hadoop.fat.qa.nt.xxx.com:8032 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused, while invoking ApplicationClientProtocolPBClientImpl.getClusterMetrics over rm2 after 3 failover attempts. Trying to failover after sleeping for 2025ms. 17/09/12 10:36:57 INFO rsc.ContextLauncher: 17/09/12 10:36:57 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm1 17/09/12 10:36:57 INFO rsc.ContextLauncher: 17/09/12 10:36:57 INFO retry.RetryInvocationHandler: org.apache.hadoop.security.authorize.AuthorizationException: User: op1@DC.SH.xxx.COM is not allowed to impersonate op1, while invoking ApplicationClientProtocolPBClientImpl.getClusterMetrics over rm1 after 4 failover attempts. Trying to failover after sleeping for 2748ms.

Please help.

avatar
Master Guru

@Mridul M

Can you please update point number 2 in Ambari managed section?

Current point - Add DataNucleus jars to the Spark Thrift Server classpath. Navigate to the “Advanced spark-hive-site-override” section and add:

Modification - Add DataNucleus jars to the Spark Thrift Server classpath. Navigate to the “Custom spark-thrift-sparkconf” section and add:

Thanks,

Kuldeep

avatar

Thanks @Kuldeep Kulkarni, that is a good catch ! I have updated the document accordingly.

avatar
Master Guru

Thank you so much @Mridul M 🙂

avatar

The doc https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.0/bk_spark-component-guide/content/config-sts... doesn't say Kerberos is required in Prerequisites, but do you know if Spark 1.6 impersonation requires Kerberos (unlike Hive)?