Community Articles

Find and share helpful community-sourced technical articles.
avatar
Cloudera Employee

Cloudera Data Engineering offers the integration with Hive Warehouse Connector (HWC) to access and manage Hive-managed tables from Cloudera Data Engineering Spark. The HWC dependencies are built into the Cloudera Data Engineering product.

HWC can be used to "Read Data from Hive" and "Write Data to Hive" through Cloudera Data Engineering Spark.

The data can be read and written in the following ways using HWC. Each of these read and write modes has its own advantages and limitations. Hence, this needs to be chosen carefully based on the use case.

Read modes

  • DIRECT_READER_V2
  • JDBC_CLUSTER
  • SECURE_ACCESS


Write modes

  • HIVE_WAREHOUSE_CONNECTOR
  • DATAFRAME_TO_STREAM
  • STREAM_TO_STREAM

Note: Cloudera recommends using the "SECURE_ACCESS" read mode to enforce Fine-Grained Access Control (FGAC) through Ranger authorization in the production environment with large workloads.

HWC provides a wide range of functionalities to Cloudera Data Engineering, which includes: 

  • CREATE/DROP databases
  • CREATE External/Managed Hive Table
  • ACID transactions on Hive Managed Tables
  • CREATE/INSERT data to Iceberg tables
  • Enforce FGAC policies through Ranger using secure_access read mode

Connection to HiveServer2 is required to enforce the FGAC policies using the "SECURE_ACCESS" / "JDBC_CLUSTER" read modes; however, "DIRECT_READER" does not require connection to HiveServer2.

Read modes:
    -> DIRECT_READER_V2: HWC needs connection to Hive Metastore (HMS). Connection to HiveServer2 is not required. Use this only if ETL jobs do not require FGAC. Spark reads the data directly from the managed table location using the transaction snapshot.
      Limitations

  • Cannot write data using HWC Direct Reader
  • No support for FGAC
  • Supports only single-table transaction consistency. The direct reader does not guarantee that multiple tables referenced in a query read the same snapshot of data.

      Configurations:
      spark.datasource.hive.warehouse.metastoreUri=thrift://<HMS-Host>:9083
      spark.hadoop.hive.zookeeper.quorum=<Zk-Host>:2181
      spark.sql.hive.hiveserver2.jdbc.url=<HS2-Url>
      spark.sql.hive.hiveserver2.jdbc.url.principal=hive/_HOST@<REALM>
      spark.security.credentials.hiveserver2.enabled=true
      spark.sql.extensions=com.hortonworks.spark.sql.rule.Extensions
      spark.kryo.registrator=com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator
      spark.datasource.hive.warehouse.read.mode=DIRECT_READER_V2
      spark.hadoop.secure.access.cache.disable=true
     

    -> JDBC_CLUSTER: HWC makes a connection to HiveServer (HS2) to get transaction information. This read mode is secured through Ranger authorization and supports FGAC, such as column masking.
      Limitations

  • Recommended for workloads with a size of less than 1GB to avoid bottleneck, as data is transferred through a single JDBC connection.
  • Requires HiveServer2 connection. 
  • Cannot correctly resolve queries that use the ORDER BY clause when run as hive.sql(" <query> ").
  • Cannot correctly resolve query of a table that has a complex column type, such as ARRAY, STRUCT and MAP: it incorrectly represents the type as String in the returned data frame.

      Configurations:
      spark.datasource.hive.warehouse.metastoreUri=thrift://<HMS-Host>:9083
      spark.hadoop.hive.zookeeper.quorum=<Zk-Host>:2181
      spark.sql.hive.hiveserver2.jdbc.url=<HS2-Url>
      spark.sql.hive.hiveserver2.jdbc.url.principal=hive/_HOST@<REALM>
      spark.security.credentials.hiveserver2.enabled=true
      spark.sql.extensions=com.hortonworks.spark.sql.rule.Extensions
      spark.kryo.registrator=com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator
      spark.datasource.hive.warehouse.read.mode=JDBC_CLUSTER
      spark.hadoop.secure.access.cache.disable=true
  

    -> SECURE_ACCESS: Allows FGAC column masking and row filtering to secure the data of managed (ACID) or external Hive tables read from Cloudera Data Engineering Spark. It is recommended for large workloads and low-latency requirements. HWC creates external tables on a configured staging location when launched. It uses CREATE TABLE AS SELECT (CTAS) to create the tables. Ranger policies are applied and FGAC is enforced during the CTAS operation. Users read external tables in the secure staging location.
      Configurations:
      spark.hadoop.hive.metastore.uris=thrift://<HMS-Host>:9083
      spark.datasource.hive.warehouse.metastoreUri=thrift://<HMS-Host>:9083
      spark.hadoop.hive.zookeeper.quorum=<Zk-Host>:2181
      spark.sql.hive.hiveserver2.jdbc.url=<HS2-Url>
      spark.sql.hive.hiveserver2.jdbc.url.principal=hive/_HOST@<REALM>
      spark.security.credentials.hiveserver2.enabled=true
      spark.sql.extensions=com.hortonworks.spark.sql.rule.Extensions
      spark.kryo.registrator=com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator
      spark.datasource.hive.warehouse.read.mode=SECURE_ACCESS
      spark.datasource.hive.warehouse.load.staging.dir=<staging-dir-path>
      spark.hadoop.secure.access.cache.disable=true

The following diagram shows the typical read authorisation process:

Read-Authorisation-Process.png

Write modes:
   -> HIVE_WAREHOUSE_CONNECTOR: HWC requires the connection to HiveServer (HS2) to perform batch writes from Cloudera Data Engineering Spark to Hive. HWC writes to an intermediate location as defined by the value of the spark.datasource.hive.warehouse.load.staging.dir configuration.

      Configurations:
      spark.sql.extensions=com.hortonworks.spark.sql.rule.Extensions
      spark.kryo.registrator=com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator
      spark.datasource.hive.warehouse.load.staging.dir=<staging-dir-path>
      spark.datasource.hive.warehouse.read.mode=secure_access
      spark.security.credentials.hiveserver2.enabled=true
      spark.sql.hive.hiveserver2.jdbc.url=<HS2-Url>
      spark.sql.hive.hiveserver2.jdbc.url.principal=hive/_HOST@<REALM>
      spark.hadoop.secure.access.cache.disable=true

    -> DATAFRAME_TO_STREAM: HWC does not rely on HiveServer2 for streaming. Instead, it interacts with HMS for transaction management and writes ORC bucket files directly to the table's location. Make sure that the target Hive table is transactional and pre-created before writing to it.
      Configurations:
      spark.datasource.hive.warehouse.metastoreUri=thrift://<HMS-Host>:9083
      spark.hadoop.hive.zookeeper.quorum=<Zk-Host>:2181
      spark.sql.extensions=com.hortonworks.spark.sql.rule.Extensions
      spark.kryo.registrator=com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator
      spark.sql.warehouse.dir=<Hive-Warehouse-Dir-S3-Path>

    -> STREAM_TO_STREAM: HWC supports streaming data from Spark into Hive tables, enabling real-time data ingestion. However, the target Hive table must be transactional and pre-created before writing to it.
    Configurations:
      spark.datasource.hive.warehouse.metastoreUri=thrift://<HMS-Host>:9083
      spark.hadoop.hive.zookeeper.quorum=<Zk-Host>:2181
      spark.sql.extensions=com.hortonworks.spark.sql.rule.Extensions
      spark.kryo.registrator=com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator
      spark.sql.warehouse.dir=<Hive-Warehouse-Dir-Path>
      spark.sql.streaming.checkpointLocation=<Stream-checkpoint-dir-Path>

Example:
## Write Using HIVE_WAREHOUSE_CONNECTOR ##
<DF>.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).mode("append").option("database",db).option("table",tname).option("fileformat","orc").save()


## Write Using DATAFRAME_TO_STREAM ##
<DF>.write.format(HiveWarehouseSession().DATAFRAME_TO_STREAM).mode("append").option("metastoreUri", "<MetaStoreUri>").option("metastoreKrbPrincipal", "<Principal>").option("database","<db>").option("table","<table>").option("fileformat","orc").save()


## Write Using STREAM_TO_STREAM ##
query = <StreamDF>.writeStream.format(HiveWarehouseSession().STREAM_TO_STREAM).outputMode("append").option("metastoreUri", "<MetaStoreUri>").option("database", "<db>").option("table", "<pre-created-table>").trigger(processingTime='1 seconds').start()
query.awaitTermination()

The following diagram shows the typical write authorisation process.

Write-Authorisation-Process.png

Establishing connectivity to HiveServer2 

Based on your infrastructure setup, you can use one of these methods to establish a connection to HS2 required for HWC:

  • Through Data Hub cluster: Cloudera Data Engineering can connect to HS2 provided by the Data Hub cluster. It can also enforce the FGAC policies through Ranger.
  • Through Cloudera Data Warehouse Data Service: Cloudera Data Engineering can connect to the Virtual Warehouse of Cloudera Data Warehouse through HS2. FGAC policies can also be enforced through Ranger. You can fetch the JDBC URl from the Cloudera Data Warehouse UI as shown in the following screenshot: CDW's-JDBC.png
  • Through Cloudera Base cluster (on premises): Cloudera Data Engineering can connect to HS2 provided by the Cloudera Base cluster as well.

 

Ready to Learn More?

4,704 Views