Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (2)
New Contributor

Short Description: This article targets to describe and demonstrate Apache Hive Warehouse Connector which is a newer generation to read and write data between Apache Spark and Apache Hive.

1. Motivation

Apache Spark and Apache Hive integration has always been an important use case and continues to be so. Both provide their own efficient ways to process data by the use of SQL, and is used for data stored in distributed file systems. Both provide compatibilities for each other. As both systems evolve, it is critical to find a solution that provides the best of both worlds for data processing needs.

In case of Apache Spark, it provides a basic Hive compatibility. It allows an access to tables in Apache Hive and some basic use cases can be achieved by this. However, not all the modern features from Apache Hive are supported, for instance, ACID table in Apache Hive, Ranger integration, Live Long And Process (LLAP), etc. as of Spark 2.

Apache Spark supports a pluggable approach for various data sources and Apache Hive itself can be also considered as one data source. Therefore, this library, Hive Warehouse Connector, was implemented as a data source to overcome the limitations and provide those modern functionalities in Apache Hive to Apache Spark users.


Note: From HDP 3.0, catalogs for Apache Hive and Apache Spark are separated, and they use their own catalog; namely, they are mutually exclusive - Apache Hive catalog can only be accessed by Apache Hive or this library, and Apache Spark catalog can only be accessed by existing APIs in Apache Spark . In other words, some features such as ACID tables or Apache Ranger with Apache Hive table are only available via this library in Apache Spark. Those tables in Hive should not directly be accessible within Apache Spark APIs themselves.

2. Introduction

This library provides both Scala (Java compatible) and Python APIs for:

  • SQL / DataFrame APIs interacting with both transactional and non-transactional tables in Apache Hive
  • SQL / DataFrame read support
  • SQL / DataFrame and Structured Streaming write support

2.1. SQL / DataFrame Read Support

Note: This diagram shows read execution path to illustrate the key points.

For instance, it is able to read an Apache Hive table in Apache Spark as below:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
val df = hive.executeQuery("SELECT * FROM tableA")

It leverages Apache Hive LLAP and retrieves data from Hive table into Spark DataFrame.

2.2. SQL / DataFrame & Structured Streaming Write Support

  • Hive Streaming API is used in both batch and streaming write, which Apache Hive introduced to continuously digest data.
  • Since it is implemented by Data Source V2, it supports a commit protocol and supports atomic write operation.
  • Native Apache ORC writer is used instead, which has many important fixes in terms of performance and stability.

The code below illustrate writing data from Apache Spark to Apache Hive table in Structured Streaming.

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
df.writeStream
  .format(HiveWarehouseSession.STREAM_TO_STREAM)
  .option("table", "hwx_table").start()

Internally this fully utilizes Apache Hive’s Streaming API to write Apache’s DataFrame out to Apache Hive’s table.

Note: This does not need Hive LLAP daemons to be running.

3. Prerequisites

In Apache Hive:

  • ‘Interactive Query' (LLAP) should be enabled (see 7. Appendix for Apache Ambari UI)

In Apache Spark:

  • spark.hadoop.hive.llap.daemon.service.hosts should be set for the application name of the LLAP service since this library utilizes LLAP. For example, @llap0.
  • HiveServer2's JDBC URL should be specified in spark.sql.hive.hiveserver2.jdbc.url as well as configured in the cluster. For example, jdbc:hive2://localhost:10000. Use the HiveServer2 Interactive JDBC URL, rather than the traditional HiveServer2's JDBC URL.
  • Make sure spark.datasource.hive.warehouse.load.staging.dir is pointed into a suitable HDFS-compatible staging directory, e.g. /tmp.
  • Also, ensure spark.datasource.hive.warehouse.metastoreUri is configured properly. For example, thrift://localhost:9083 to indicate Metastore URI.
  • Note that spark.security.credentials.hiveserver2.enabled should be set to false for YARN client deploy mode, and true for YARN cluster deploy mode (by default). This configuration is required for a Kerberized cluster.
  • When spark.security.credentials.hiveserver2.enabled is set to false, spark.sql.hive.hiveserver2.jdbc.url.principal can be optionally set if spark.sql.hive.hiveserver2.jdbc.url does not contain principal, for example, hive/_HOST@EXAMPLE.COM.
  • When spark.security.credentials.hiveserver2.enabled is set to true, spark.sql.hive.hiveserver2.jdbc.url.principal should be set, for example, hive/_HOST@EXAMPLE.COM. Also, spark.sql.hive.hiveserver2.jdbc.url should not contain principal.

4. User Scenarios

In this chapter, HDP 3.0.1.0 with Spark2, Hive and Ranger on a Kerberized cluster is used. Note that If you are running the examples on Kerberized cluster, for instance in HDP, make sure to obtain (or renew) the Kerberos ticket-granting ticket by kinit with an appropriate keytab.

The following scenarios are run by Spark shell as below. For other interaction paradigms, like spark-submit, Livy, Zeppelin etc. please see the subsequent sections for specific setup steps.

In case of Scala:

$ spark-shell --master yarn \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
  --conf spark.security.credentials.hiveserver2.enabled=false

In case of Python:

$ pyspark --master yarn \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
  --py-files /usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip \
  --conf spark.security.credentials.hiveserver2.enabled=false

4.1. SQL / DataFrame Read

This scenario targets to demonstrate a bulk read operation, as a batch job, from Hive table to Spark DataFrame with SQL expression. For this scenario, data from FBI crime rate is used (see 7. Appendix for SQL INSERT queries).

Before we start, we should initialize HiveWarehouseSession instance.

In case of Scala:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

In case of Python:

from pyspark_llap.sql.session import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()

In this scenarios, it uses hwc_db database. The code below shows the decreasing crime rate (per 100,000 Inhabitants) between 2000 and 2010 in USA:

hive.setDatabase("hwc_db")
hive.table("crimes").filter("year = 2000 OR year = 2010").show()
+----+----------+
|year|crime_rate|
+----+----------+
|2010|     404.5|
|2000|     506.5|
+----+----------+

SQL queries can be also directly executed via executeQuery API, which directly interacts with Hive LLAP daemons.

hive.executeQuery("SELECT avg(crime_rate) AS average_crime_rate FROM crimes").show()
+------------------+
|average_crime_rate|
+------------------+
|456.33500000000004|
+------------------+

4.2. SQL / DataFrame Read (Apache Ranger Integration)

As of HDP 2.6, Row/Column Level Access Control for Apache Spark is available that enables fine grained security control leveraged by Apache Ranger. This feature can also be leveraged by this library - Apache Spark APIs do not support this.

For instance, the scenarios below demonstrate row level control with users, billing, and datascience. billing principal can access all rows and columns while datascience principal can access some of filtered and masked data.

Firstly, we destroy existing ticket and obtains a ticket as billing principal. Then, it executes a Spark shell.

$ kdestroy
$ kinit billing/billing@EXAMPLE.COM
$ spark-shell --master yarn \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
  --conf spark.security.credentials.hiveserver2.enabled=false

Since billing principal can access every row, it shows all data as are.

sql("select * from db_spark.t_spark").show()
+------+-------+
|fruits|  color|
+------+-------+
| apple|    red|
| grape| purple|
|orange|scarlet|
+------+-------+

Now, we try the same query as datascience, principal.

$ kdestroy
$ kinit datascience/datascience@EXAMPLE.COM
$ spark-shell --master yarn \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
  --conf spark.security.credentials.hiveserver2.enabled=false

Since datascience, principal can only access to filtered and masked rows, it only shows partial values.

sql("select * from db_spark.t_spark").show()
+------+-----+
|fruits|color|
+------+-----+
| apxxx|  red|
+------+-----+

For more information, please see 'Introducing Row/ Column Level Access Control for Apache Spark' in 7. Appendix. The Ranger security features previously available in the spark-llap connector are now covered by the Hive Warehouse Connector. Apache Hive and Apache Spark configurations should be followed as described in 3. Prerequisites.

4.3. SQL / DataFrame Write

This scenario targets to demonstrate a bulk write operation, as a batch job, between Apache Hive table and Apache Spark DataFrame with SQL expression.

A table in Hive can be created as below:

hive.createTable("crimes_2010")
  .column("year", "int")
  .column("crime_rate", "double")
  .create()

Here, crimes table (from 4.1 SQL / DataFrame Read) is written into a different Hive table after filtering the data in Spark. The code below writes the crime rate at 2010 into the table created above:

hive.table("crimes").filter("year = 2010")
  .write
  .format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR)
  .option("table", "crimes_2010")
  .save()

The data can also be written in a stream manner as below:

hive.table("crimes").filter("year = 2010")
  .write
  .format(HiveWarehouseSession.DATAFRAME_TO_STREAM)
  .option("table", "crimes_2010")
  .save()

Note: In this API, the job at Apache Spark side is a batch write job but it writes out data internally by Apache Hive Streaming API.

Of course, we can write data from Apache Hive table via Apache Spark data sources:

hive.table("crimes")
  .write
  .format("csv")
  .option("header", "true")
  .save("/tmp/zoo")

Likewise, Apache Spark DataFrame can directly write out to Apache Hive table as well:

spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/tmp/zoo")
  .filter("year = 2010")
  .write
  .format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR)
  .option("table", "crimes_2010")
  .save()

4.4. Structured Streaming Write

This scenario demonstrates a streaming write operation, as a micro batch job, from Apache Spark DataFrame to Apache Hive table with SQL expression.

In this scenario, socket structured streaming is used as test data.

In case of Scala:

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)  .load()

In case of Python:

lines = spark.readStream \
  .format("socket") \
  .option("host", "localhost") \
  .option("port", 9999) \
  .load()

In another terminal, execute below command to insert test data:

$ nc -lk 9999

We create a target table to store the stream data. Note that this table can be created in Hive side as well, for instance, CREATE TABLE hwx_table(value STRING);

hive.createTable("hwx_table").column("value", "string").create()

Now, the stream data is ready. After that, import the library so that we can easily specify the format.

Before we start, we should initialize HiveWarehouseSession instance.

In case of Scala:

import com.hortonworks.hwc.HiveWarehouseSession

In case of Python:

from pyspark_llap.sql.session import HiveWarehouseSession

Next, it starts the structured streaming job. At the terminal which opened nc -lk 9999 we can insert arbitrary data and when the terminal inserts Hortonworks, it saves the data into the hwx_table table.

lines.filter("value = 'Hortonworks'")
  .writeStream
  .format(HiveWarehouseSession.STREAM_TO_STREAM)
  .option("database", "hwc_db")
  .option("table", "hwx_table")
  .option(
    "metastoreUri",
    spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
  .option("checkpointLocation", "/tmp/checkpoint").start()

Note: There is an issue about interpreting spark.datasource.* configurations into options internally in Apache Spark, which currently makes this library require to set metastoreUri and database options manually. See SPARK-25460 for more details. As soon as this issue is resolved, both metastoreUri and database can be omitted likewise.

After that, type Hortonworks and also arbitrary words in the terminal opening nc -lk 9999

Foo
Bar
Hortonworks

This continuously inserts the word Hortonworks into the table hwx_table .

hive.table("hwx_table").show()
+------------+
|       value|
+------------+
| Hortonworks|
+------------+

5. Interaction Paradigms

This library can be interacted with, of course, Apache Spark but also Apache Zeppelin and Apache Livy. This chapter demonstrates entry points of those interaction paradigms in order to leverage this library to utilize provided functionalities.

5.1. Spark Shell:

$ spark-shell --master yarn \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
  --conf spark.security.credentials.hiveserver2.enabled=false

5.2. PySpark Shell:

$ pyspark --master yarn \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
  --py-files /usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip \
  --conf spark.security.credentials.hiveserver2.enabled=false

5.3. Scala / Java Application Submission:

In case of client mode:

$ spark-submit --master yarn --deploy-mode client \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
  --conf spark.security.credentials.hiveserver2.enabled=false [JAR_NAME]

If 3. Prerequisites at Apache Spark side was not globally configured, use the command with specifying all configurations, for instance:

$ spark-submit --master yarn --deploy-mode client \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
  --conf spark.hadoop.hive.llap.daemon.service.hosts=@llap0 \
  --conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://ctr-e138-1518143905142-509736-01-000007.hwx.site:2181,ctr-e138-1518143905142-509736-01-000009.hwx.site:2181,ctr-e138-1518143905142-509736-01-000006.hwx.site:2181,ctr-e138-1518143905142-509736-01-000005.hwx.site:2181,ctr-e138-1518143905142-509736-01-000008.hwx.site:2181/default;principal=hive/_HOST@HWX.COM;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive" \
  --conf spark.security.credentials.hiveserver2.enabled=false \
  --conf spark.datasource.hive.warehouse.load.staging.dir=/tmp \
  --conf spark.datasource.hive.warehouse.metastoreUri=thrift://ctr-e138-1518143905142-509736-01-000003.hwx.site:9083,thrift://ctr-e138-1518143905142-509736-01-000004.hwx.site:9083 [JAR_NAME]

In case of cluster mode:

$ spark-submit --master yarn --deploy-mode cluster \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar [JAR_NAME]

If 3. Prerequisites at Apache Spark side was not globally configured with HWC, use the command with specifying all configurations, for instance:

$ spark-submit --master yarn --deploy-mode cluster \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
  --conf spark.hadoop.hive.llap.daemon.service.hosts=@llap0 \
  --conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://ctr-e138-1518143905142-509736-01-000007.hwx.site:2181,ctr-e138-1518143905142-509736-01-000009.hwx.site:2181,ctr-e138-1518143905142-509736-01-000006.hwx.site:2181,ctr-e138-1518143905142-509736-01-000005.hwx.site:2181,ctr-e138-1518143905142-509736-01-000008.hwx.site:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive" \
  --conf spark.sql.hive.hiveserver2.jdbc.url.principal="hive/_HOST@EXAMPLE.COM" \
  --conf spark.datasource.hive.warehouse.load.staging.dir=/tmp \
  --conf spark.datasource.hive.warehouse.metastoreUri=thrift://ctr-e138-1518143905142-509736-01-000003.hwx.site:9083,thrift://ctr-e138-1518143905142-509736-01-000004.hwx.site:9083 [JAR_NAME]

5.4. Python Application Submission:

In case of client mode:

$ spark-submit --master yarn --deploy-mode client \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
  --py-files /usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip \
  --conf spark.security.credentials.hiveserver2.enabled=false [PY_FILE]

In case of cluster mode:

$ spark-submit --master yarn --deploy-mode cluster \
  --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
  --py-files /usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip [PY_FILE]

5.5. Apache Zeppelin

Go to the interpreter setting.

Find the interpreter settings and set the configuration accordingly.

In case of Spark interpreter:

Add spark.jars for instance:

spark.jars=/usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar

For Python usage, the configuration below should also be added. For instance:

spark.submit.pyFiles=/usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip

If the interpreter is set to YARN client mode, spark.security.credentials.hiveserver2.enabled should be set to false as below:

spark.security.credentials.hiveserver2.enabled=false

If you use a Kerberized cluster, do not forget to set:

spark.yarn.keytab
spark.yarn.principal

In case of Livy interpreter:

Add livy.spark.jars for instance:

livy.spark.jars=file:/usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar

For Python usage, the configuration below should also be added. For instance:

livy.spark.submit.pyFiles=file:/usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip

Note: Local directories are disallowed by default for security reasons. Set the local directory to livy.file.local-dir-whitelist to allow a specific directory to allow to read. To avoid this setting, please use HDFS path after uploading the JAR and zipped file.

If the interpreter is set to YARN client mode (see livy.spark.master and livy.spark.submit.deployMode), livy.spark.security.credentials.hiveserver2.enabled should be set to false as below:

livy.spark.security.credentials.hiveserver2.enabled=false

If you use a Kerberized cluster, do not forget to set:

zeppelin.livy.keytab
zeppelin.livy.principal

Note: To use HWC with Livy in a secure cluster please follow the documentation here.

5.6. Apache Livy

The code below describes an example to submit Python application by Livy request with curl .

$ curl -X POST -H "Content-Type: application/json" -H "X-Requested-By: hive" --negotiate  \
  -u : `hostname`:8999/batches --data '{
  "conf": {
    "spark.master": "yarn-cluster",
    "spark.jars": "file:/usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar", 
    "spark.submit.pyFiles": "file:/usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip"
  }, 
  "file": "[PY_FILE]"}' | python -m json.tool

It shows the job information.

{
  "appId": null,
  "appInfo": {
    "driverLogUrl": null,
    "sparkUiUrl": null
  },
  "id": 1,
  "log": [
    "stdout: ",
    "\nstderr: ",
    "\nYARN Diagnostics: "
  ],
  "state": "starting"
}

To patch job information, GET batches/[ID] is used.

$ curl --negotiate -u : `hostname`:8999/batches/1 | python -m json.tool

This request shows an output, for example, as below.

{
  "appId": "application_1539107884019_0071",
  "appInfo": {
    "driverLogUrl": "http://ctr-e138-1518143905142-512381-01-000003.hwx.site:8188/applicationhistory/logs/ctr-e138-1518143905142-512381-01-000006.hwx.site:25454/container_e02_1539107884019_0071_01_000001/container_e02_1539107884019_0071_01_000001/hive",
    "sparkUiUrl": "http://ctr-e138-1518143905142-512381-01-000004.hwx.site:8088/proxy/application_1539107884019_0071/"
  },
  "id": 1,
  "log": [
    "\t queue: default",
    "\t start time: 1539151855183",
    "\t final status: UNDEFINED",
    "\t tracking URL: http://ctr-e138-1518143905142-512381-01-000004.hwx.site:8088/proxy/application_1539107884019_0071/",
    "\t user: hive",
    "18/10/10 06:10:55 INFO ShutdownHookManager: Shutdown hook called",
    "18/10/10 06:10:55 INFO ShutdownHookManager: Deleting directory /tmp/spark-6e254b5c-50f9-40d0-af06-c37d8c1a6428",
    "18/10/10 06:10:55 INFO ShutdownHookManager: Deleting directory /tmp/spark-253eac26-e86e-4699-82e6-ad281702fb85",
    "\nstderr: ",
    "\nYARN Diagnostics: "
  ],
  "state": "success"
}

Note: The example above used Kerberized cluster; therefore, --negotiate option is used in curl.

Note: Local directories are disallowed by default for security reasons. Set the local directory to livy.file.local-dir-whitelist to allow a specific directory to allow to read. To avoid this setting, please use HDFS path after uploading the JAR and zipped file.

Note: To use HWC with Livy in a secure cluster please follow the documentation here.

6. Limitations

  • Missing SQL support including USING syntax: currently Data Source V2 implementation does not support SQL syntax including USING syntax
  • Missing R library support: this library currently supports only Scala, Java, and Python APIs.
  • Limitations in Apache Arrow integration and Data Source V2 are inherited. See SPARK-22386 for Data Source V2 and SPARK-21187 for Apache Arrow integration in Apache Spark to track ongoing efforts and limitations.
  • Supported/unsupported data types and their mapping between Apache Hive and Apache Spark can be found at this link.

7. Appendix

CREATE DATABASE hwc_db;
USE hwc_db;
CREATE TABLE crimes(year INT, crime_rate DOUBLE);
INSERT INTO crimes VALUES (1997, 611.0), (1998, 567.6), (1999, 523.0), (2000, 506.5), (2001, 504.5), (2002, 494.4), (2003, 475.8);
INSERT INTO crimes VALUES (2004, 463.2), (2005, 469.0), (2006, 479.3), (2007, 471.8), (2008, 458.6), (2009, 431.9), (2010, 404.5);
INSERT INTO crimes VALUES (2011, 387.1), (2012, 387.8), (2013, 369.1), (2014, 361.6), (2015, 373.7), (2016, 386.3);
22,097 Views
Comments
Expert Contributor

Great article !!!

Cloudera Employee

Thanks, very helpful.

New Contributor

How to save a table using HWC of a dataframe with hundred columns. This seems very impractical.

val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build()

hive.createTable("newTable")
  .ifNotExists()
  .column("ws_sold_time_sk", "bigint")
  ... // 500 columns
  .column("ws_ship_date_sk", "bigint")
  .create()

df.write.format(HIVE_WAREHOUSE_CONNECTOR)
  .option("table", "newTable")
  .save()
New Contributor

Hi Hyukjin Kwon and thank you for your article!

I'm trying to write to MongoDB, using spark, a DataSet<Row> created from method HiveWarehouseSession.executeQuery. MongoSpark.save command is able to write DataSet to the selected collection but the Job not end. You know a best way to do that. I'm using Ambari Cluster. Thank you. https://community.hortonworks.com/questions/231385/save-dataframe-loaded-from-hive-to-mongodb-using-...

Contributor

Thank you for posting this very informative Hive Warehouse Connector article.

I followed your steps but got errors when running pyspark, see below my steps and error.

Can you shed some light on this error. Thank you in advance for your help.

  • 1)Downloaded HDP Sandbox 3.0.1 and set it up

Hive: 3.1.0

Spark: 2.3.1

  • 2)Enabled ‘Interactive Query’ in Hive
  • 3)Appended the following to /usr/hdp/3.0.1.0-187/etc/spark2/conf/spark-defaults.conf

spark.hadoop.hive.llap.daemon.service.hosts @llap0

spark.sql.hive.hiveserver2.jdbc.url jdbc:hive2://sandbox-hdp.hortonworks.com:10000

spark.datasource.hive.warehouse.load.staging.dir /tmp

spark.datasource.hive.warehouse.metastoreUri thrift://sandbox-hdp.hortonworks.com:9083

spark.hadoop.hive.zookeeper.quorum sandbox-hdp.hortonworks.com:2181

  • 4)Ran pyspark:
pyspark --master yarn \

--jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \

--py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip \

--conf spark.security.credentials.hiveserver2.enabled=false

  • 5)Got below error:
[root@~]# pyspark --master yarn \

> --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \

> --py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip \

> --conf spark.security.credentials.hiveserver2.enabled=false

SPARK_MAJOR_VERSION is set to 2, using Spark2

Python 2.7.5 (default, Jul 13 2018, 13:06:57)

[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2

Type "help", "copyright", "credits" or "license" for more information.

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

19/01/10 22:57:49 ERROR SparkContext: Error initializing SparkContext.

org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1547158600691_0002 to YARN : org.apache.hadoop.security.AccessControlException: Queue root.default already has 0 applications, cannot accept submission of application: application_1547158600691_0002

at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:304)

at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:174)

at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)

at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164)

at org.apache.spark.SparkContext.<init>(SparkContext.scala:500)

at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at py4j.Gateway.invoke(Gateway.java:238)

at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)

at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)

at py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

19/01/10 22:57:49 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!

19/01/10 22:57:49 WARN MetricsSystem: Stopping a MetricsSystem that is not running

19/01/10 22:57:49 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext may be running in this JVM (see SPARK-2243). The other SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)

sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

java.lang.reflect.Constructor.newInstance(Constructor.java:423)

py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)

py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

py4j.Gateway.invoke(Gateway.java:238)

py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)

py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)

py4j.GatewayConnection.run(GatewayConnection.java:238)

java.lang.Thread.run(Thread.java:748)

19/01/10 22:57:57 ERROR SparkContext: Error initializing SparkContext.

org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1547158600691_0003 to YARN : org.apache.hadoop.security.AccessControlException: Queue root.default already has 0 applications, cannot accept submission of application: application_1547158600691_0003

at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:304)

at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:174)

at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)

at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164)

at org.apache.spark.SparkContext.<init>(SparkContext.scala:500)

at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

Contributor

Continued previous post:

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at py4j.Gateway.invoke(Gateway.java:238)

at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)

at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)

at py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

19/01/10 22:57:57 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!

19/01/10 22:57:57 WARN MetricsSystem: Stopping a MetricsSystem that is not running

Traceback (most recent call last):

File "/usr/hdp/current/spark2-client/python/pyspark/shell.py", line 54, in <module>

spark = SparkSession.builder.getOrCreate()

File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 173, in getOrCreate

sc = SparkContext.getOrCreate(sparkConf)

File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 353, in getOrCreate

SparkContext(conf=conf or SparkConf())

File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 119, in __init__

conf, jsc, profiler_cls)

File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 181, in _do_init

self._jsc = jsc or self._initialize_context(self._conf._jconf)

File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 292, in _initialize_context

return self._jvm.JavaSparkContext(jconf)

File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__

File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.

: org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1547158600691_0003 to YARN : org.apache.hadoop.security.AccessControlException: Queue root.default already has 0 applications, cannot accept submission of application: application_1547158600691_0003

at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:304)

at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:174)

at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)

at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164)

at org.apache.spark.SparkContext.<init>(SparkContext.scala:500)

at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at py4j.Gateway.invoke(Gateway.java:238)

at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)

at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)

at py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

Expert Contributor

Hi guys,

i followed the above steps, and was able to execute commands like ( show databases, show tables) successfully, also created a database from spark-shell and created a table and inserted some data in it, but i am not able to query the data either from the newly created table from spark, nor the tables that already exists in hive, and getting this error


java.lang.AbstractMethodError: Method com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataSourceReader.createBatchDataReaderFactories()Ljava/util/List; is abstract

at com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader.createBatchDataReaderFactories(HiveWarehouseDataSourceReader.java)




the commands is as below:

import com.hortonworks.hwc.HiveWarehouseSession

val hive = HiveWarehouseSession.session(spark).build()

hive.createTable("hwx_table").column("value", "string").create()

hive.executeUpdate("insert into hwx_table values('1')")

hive.executeQuery("select * from hwx_table").show



then the error appears, i am using the below command to start spark-shell


spark-shell --master yarn --jars /usr/hdp/current/hive-warehouse-connector/hive-warehouse-connector_2.11-1.0.0.3.1.2.0-4.jar --conf spark.security.credentials.hiveserver2.enabled=false

Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
1 of 1
Last update:
‎10-16-2018 05:14 AM
Updated by:
 
Contributors
Top Kudoed Authors