- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 10-16-2018 05:14 AM - edited on 01-27-2021 09:41 AM by VidyaSargur
Short Description: This article targets to describe and demonstrate Apache Hive Warehouse Connector which is a newer generation to read and write data between Apache Spark and Apache Hive.
1. Motivation
Apache Spark and Apache Hive integration has always been an important use case and continues to be so. Both provide their own efficient ways to process data by the use of SQL, and is used for data stored in distributed file systems. Both provide compatibilities for each other. As both systems evolve, it is critical to find a solution that provides the best of both worlds for data processing needs.
In case of Apache Spark, it provides a basic Hive compatibility. It allows an access to tables in Apache Hive and some basic use cases can be achieved by this. However, not all the modern features from Apache Hive are supported, for instance, ACID table in Apache Hive, Ranger integration, Live Long And Process (LLAP), etc. as of Spark 2.
Apache Spark supports a pluggable approach for various data sources and Apache Hive itself can be also considered as one data source. Therefore, this library, Hive Warehouse Connector, was implemented as a data source to overcome the limitations and provide those modern functionalities in Apache Hive to Apache Spark users.
Note: From HDP 3.0, catalogs for Apache Hive and Apache Spark are separated, and they use their own catalog; namely, they are mutually exclusive - Apache Hive catalog can only be accessed by Apache Hive or this library, and Apache Spark catalog can only be accessed by existing APIs in Apache Spark . In other words, some features such as ACID tables or Apache Ranger with Apache Hive table are only available via this library in Apache Spark. Those tables in Hive should not directly be accessible within Apache Spark APIs themselves.
2. Introduction
This library provides both Scala (Java compatible) and Python APIs for:
- SQL / DataFrame APIs interacting with both transactional and non-transactional tables in Apache Hive
- SQL / DataFrame read support
- SQL / DataFrame and Structured Streaming write support
2.1. SQL / DataFrame Read Support
- Live Long And Process (LLAP) is fully utilized which Apache Hive introduced for faster performance
- Apache Spark's Apache Arrow integration is fully utilized for vectorized operations, faster and compact data interactions
- It is implemented by Data Source V2 which has a columnar format support and various functionalities
For instance, it is able to read an Apache Hive table in Apache Spark as below:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build() val df = hive.executeQuery("SELECT * FROM tableA")
It leverages Apache Hive LLAP and retrieves data from Hive table into Spark DataFrame.
2.2. SQL / DataFrame & Structured Streaming Write Support
- Hive Streaming API is used in both batch and streaming write, which Apache Hive introduced to continuously digest data.
- Since it is implemented by Data Source V2, it supports a commit protocol and supports atomic write operation.
- Native Apache ORC writer is used instead, which has many important fixes in terms of performance and stability.
The code below illustrate writing data from Apache Spark to Apache Hive table in Structured Streaming.
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build() df.writeStream .format(HiveWarehouseSession.STREAM_TO_STREAM) .option("table", "hwx_table").start()
Internally this fully utilizes Apache Hive’s Streaming API to write Apache’s DataFrame out to Apache Hive’s table.
Note: This does not need Hive LLAP daemons to be running.
3. Prerequisites
In Apache Hive:
- ‘Interactive Query' (LLAP) should be enabled (see 7. Appendix for Apache Ambari UI)
In Apache Spark:
spark.hadoop.hive.llap.daemon.service.hosts
should be set for the application name of the LLAP service since this library utilizes LLAP. For example,@llap0
.- HiveServer2's JDBC URL should be specified in
spark.sql.hive.hiveserver2.jdbc.url
as well as configured in the cluster. For example,jdbc:hive2://localhost:10000
. Use the HiveServer2 Interactive JDBC URL, rather than the traditional HiveServer2's JDBC URL. - Make sure
spark.datasource.hive.warehouse.load.staging.dir
is pointed into a suitable HDFS-compatible staging directory, e.g./tmp
. - Also, ensure
spark.datasource.hive.warehouse.metastoreUri
is configured properly. For example,thrift://localhost:9083
to indicate Metastore URI. - Note that
spark.security.credentials.hiveserver2.enabled
should be set tofalse
for YARN client deploy mode, andtrue
for YARN cluster deploy mode (by default). This configuration is required for a Kerberized cluster. - When
spark.security.credentials.hiveserver2.enabled
is set tofalse
,spark.sql.hive.hiveserver2.jdbc.url.principal
can be optionally set ifspark.sql.hive.hiveserver2.jdbc.url
does not containprincipal
, for example,hive/_HOST@EXAMPLE.COM
. - When
spark.security.credentials.hiveserver2.enabled
is set totrue
,spark.sql.hive.hiveserver2.jdbc.url.principal
should be set, for example,hive/_HOST@EXAMPLE.COM
. Also,spark.sql.hive.hiveserver2.jdbc.url
should not containprincipal
.
4. User Scenarios
In this chapter, HDP 3.0.1.0 with Spark2, Hive and Ranger on a Kerberized cluster is used. Note that If you are running the examples on Kerberized cluster, for instance in HDP, make sure to obtain (or renew) the Kerberos ticket-granting ticket by kinit with an appropriate keytab.
The following scenarios are run by Spark shell as below. For other interaction paradigms, like spark-submit, Livy, Zeppelin etc. please see the subsequent sections for specific setup steps.
In case of Scala:
$ spark-shell --master yarn \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
In case of Python:
$ pyspark --master yarn \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \ --py-files /usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip \ --conf spark.security.credentials.hiveserver2.enabled=false
4.1. SQL / DataFrame Read
This scenario targets to demonstrate a bulk read operation, as a batch job, from Hive table to Spark DataFrame with SQL expression. For this scenario, data from FBI crime rate is used (see 7. Appendix for SQL INSERT
queries).
Before we start, we should initialize HiveWarehouseSession
instance.
In case of Scala:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
In case of Python:
from pyspark_llap.sql.session import HiveWarehouseSession hive = HiveWarehouseSession.session(spark).build()
In this scenarios, it uses hwc_db database. The code below shows the decreasing crime rate (per 100,000 Inhabitants) between 2000 and 2010 in USA:
hive.setDatabase("hwc_db") hive.table("crimes").filter("year = 2000 OR year = 2010").show() +----+----------+ |year|crime_rate| +----+----------+ |2010| 404.5| |2000| 506.5| +----+----------+
SQL queries can be also directly executed via executeQuery API, which directly interacts with Hive LLAP daemons.
hive.executeQuery("SELECT avg(crime_rate) AS average_crime_rate FROM crimes").show() +------------------+ |average_crime_rate| +------------------+ |456.33500000000004| +------------------+
4.2. SQL / DataFrame Read (Apache Ranger Integration)
As of HDP 2.6, Row/Column Level Access Control for Apache Spark is available that enables fine grained security control leveraged by Apache Ranger. This feature can also be leveraged by this library - Apache Spark APIs do not support this.
For instance, the scenarios below demonstrate row level control with users, billing
, and datascience
. billing
principal can access all rows and columns while datascience
principal can access some of filtered and masked data.
Firstly, we destroy existing ticket and obtains a ticket as billing
principal. Then, it executes a Spark shell.
$ kdestroy $ kinit billing/billing@EXAMPLE.COM $ spark-shell --master yarn \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Since billing
principal can access every row, it shows all data as are.
sql("select * from db_spark.t_spark").show() +------+-------+ |fruits| color| +------+-------+ | apple| red| | grape| purple| |orange|scarlet| +------+-------+
Now, we try the same query as datascience
, principal.
$ kdestroy $ kinit datascience/datascience@EXAMPLE.COM $ spark-shell --master yarn \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Since datascience
, principal can only access to filtered and masked rows, it only shows partial values.
sql("select * from db_spark.t_spark").show() +------+-----+ |fruits|color| +------+-----+ | apxxx| red| +------+-----+
For more information, please see 'Introducing Row/ Column Level Access Control for Apache Spark' in 7. Appendix. The Ranger security features previously available in the spark-llap connector are now covered by the Hive Warehouse Connector. Apache Hive and Apache Spark configurations should be followed as described in 3. Prerequisites.
4.3. SQL / DataFrame Write
This scenario targets to demonstrate a bulk write operation, as a batch job, between Apache Hive table and Apache Spark DataFrame with SQL expression.
A table in Hive can be created as below:
hive.createTable("crimes_2010") .column("year", "int") .column("crime_rate", "double") .create()
Here, crimes table (from 4.1 SQL / DataFrame Read) is written into a different Hive table after filtering the data in Spark. The code below writes the crime rate at 2010 into the table created above:
hive.table("crimes").filter("year = 2010") .write .format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR) .option("table", "crimes_2010") .save()
The data can also be written in a stream manner as below:
hive.table("crimes").filter("year = 2010") .write .format(HiveWarehouseSession.DATAFRAME_TO_STREAM) .option("table", "crimes_2010") .save()
Note: In this API, the job at Apache Spark side is a batch write job but it writes out data internally by Apache Hive Streaming API.
Of course, we can write data from Apache Hive table via Apache Spark data sources:
hive.table("crimes") .write .format("csv") .option("header", "true") .save("/tmp/zoo")
Likewise, Apache Spark DataFrame can directly write out to Apache Hive table as well:
spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/tmp/zoo") .filter("year = 2010") .write .format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR) .option("table", "crimes_2010") .save()
4.4. Structured Streaming Write
This scenario demonstrates a streaming write operation, as a micro batch job, from Apache Spark DataFrame to Apache Hive table with SQL expression.
In this scenario, socket structured streaming is used as test data.
In case of Scala:
val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
In case of Python:
lines = spark.readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load()
In another terminal, execute below command to insert test data:
$ nc -lk 9999
We create a target table to store the stream data. Note that this table can be created in Hive side as well, for instance, CREATE TABLE hwx_table(value STRING);
hive.createTable("hwx_table").column("value", "string").create()
Now, the stream data is ready. After that, import the library so that we can easily specify the format.
Before we start, we should initialize HiveWarehouseSession
instance.
In case of Scala:
import com.hortonworks.hwc.HiveWarehouseSession
In case of Python:
from pyspark_llap.sql.session import HiveWarehouseSession
Next, it starts the structured streaming job. At the terminal which opened nc -lk 9999
we can insert arbitrary data and when the terminal inserts Hortonworks, it saves the data into the hwx_table
table.
lines.filter("value = 'Hortonworks'") .writeStream .format(HiveWarehouseSession.STREAM_TO_STREAM) .option("database", "hwc_db") .option("table", "hwx_table") .option( "metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")) .option("checkpointLocation", "/tmp/checkpoint").start()
Note: There is an issue about interpreting spark.datasource.*
configurations into options internally in Apache Spark, which currently makes this library require to set metastoreUri
and database
options manually. See SPARK-25460 for more details. As soon as this issue is resolved, both metastoreUri
and database
can be omitted likewise.
After that, type Hortonworks and also arbitrary words in the terminal opening nc -lk 9999
Foo Bar Hortonworks
This continuously inserts the word Hortonworks into the table hwx_table
.
hive.table("hwx_table").show() +------------+ | value| +------------+ | Hortonworks| +------------+
5. Interaction Paradigms
This library can be interacted with, of course, Apache Spark but also Apache Zeppelin and Apache Livy. This chapter demonstrates entry points of those interaction paradigms in order to leverage this library to utilize provided functionalities.
5.1. Spark Shell:
$ spark-shell --master yarn \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
5.2. PySpark Shell:
$ pyspark --master yarn \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \ --py-files /usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip \ --conf spark.security.credentials.hiveserver2.enabled=false
5.3. Scala / Java Application Submission:
In case of client mode:
$ spark-submit --master yarn --deploy-mode client \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \ --conf spark.security.credentials.hiveserver2.enabled=false [JAR_NAME]
If 3. Prerequisites at Apache Spark side was not globally configured, use the command with specifying all configurations, for instance:
$ spark-submit --master yarn --deploy-mode client \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \ --conf spark.hadoop.hive.llap.daemon.service.hosts=@llap0 \ --conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://hostname.hwx.site:2181,hostname9.hwx.site:2181,hostname.hwx.site:2181,hostname.hwx.site:2181,hostname.hwx.site:2181/default;principal=hive/_HOST@HWX.COM;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive" \ --conf spark.security.credentials.hiveserver2.enabled=false \ --conf spark.datasource.hive.warehouse.load.staging.dir=/tmp \ --conf spark.datasource.hive.warehouse.metastoreUri=thrift://hostname.hwx.site:9083,thrift://hostname.hwx.site:9083 [JAR_NAME]
In case of cluster mode:
$ spark-submit --master yarn --deploy-mode cluster \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar [JAR_NAME]
If 3. Prerequisites at Apache Spark side was not globally configured with HWC, use the command with specifying all configurations, for instance:
$ spark-submit --master yarn --deploy-mode cluster \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \ --conf spark.hadoop.hive.llap.daemon.service.hosts=@llap0 \ --conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://hostname.hwx.site:2181,hostname.hwx.site:2181,hostname.hwx.site:2181,hostname.hwx.site:2181,hostname.hwx.site:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive" \ --conf spark.sql.hive.hiveserver2.jdbc.url.principal="hive/_HOST@EXAMPLE.COM" \ --conf spark.datasource.hive.warehouse.load.staging.dir=/tmp \ --conf spark.datasource.hive.warehouse.metastoreUri=thrift://hostname.hwx.site:9083,thrift://hostname.hwx.site:9083 [JAR_NAME]
5.4. Python Application Submission:
In case of client mode:
$ spark-submit --master yarn --deploy-mode client \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \ --py-files /usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip \ --conf spark.security.credentials.hiveserver2.enabled=false [PY_FILE]
In case of cluster mode:
$ spark-submit --master yarn --deploy-mode cluster \ --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \ --py-files /usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip [PY_FILE]
5.5. Apache Zeppelin
Go to the interpreter setting.
Find the interpreter settings and set the configuration accordingly.
In case of Spark interpreter:
Add spark.jars
for instance:
spark.jars=/usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar
For Python usage, the configuration below should also be added. For instance:
spark.submit.pyFiles=/usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip
If the interpreter is set to YARN client mode, spark.security.credentials.hiveserver2.enabled
should be set to false as below:
spark.security.credentials.hiveserver2.enabled=false
If you use a Kerberized cluster, do not forget to set:
spark.yarn.keytab spark.yarn.principal
In the case of Livy interpreter:
Add livy.spark.jars
for instance:
livy.spark.jars=file:/usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar
For Python usage, the configuration below should also be added. For instance:
livy.spark.submit.pyFiles=file:/usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip
Note: Local directories are disallowed by default for security reasons. Set the local directory to livy.file.local-dir-whitelist
to allow a specific directory to allow to read. To avoid this setting, please use HDFS path after uploading the JAR and zipped file.
If the interpreter is set to YARN client mode (see livy.spark.master
and livy.spark.submit.deployMode
), livy.spark.security.credentials.hiveserver2.enabled
should be set to false
as below:
livy.spark.security.credentials.hiveserver2.enabled=false
If you use a Kerberized cluster, do not forget to set:
zeppelin.livy.keytab zeppelin.livy.principal
Note: To use HWC with Livy in a secure cluster please follow the documentation here.
5.6. Apache Livy
The code below describes an example to submit Python application by Livy request with curl
.
$ curl -X POST -H "Content-Type: application/json" -H "X-Requested-By: hive" --negotiate \ -u : `hostname`:8999/batches --data '{ "conf": { "spark.master": "yarn-cluster", "spark.jars": "file:/usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar", "spark.submit.pyFiles": "file:/usr/hdp/3.0.1.0-183/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-183.zip" }, "file": "[PY_FILE]"}' | python -m json.tool
It shows the job information.
{ "appId": null, "appInfo": { "driverLogUrl": null, "sparkUiUrl": null }, "id": 1, "log": [ "stdout: ", "\nstderr: ", "\nYARN Diagnostics: " ], "state": "starting" }
To patch job information, GET batches/[ID]
is used.
$ curl --negotiate -u : `hostname`:8999/batches/1 | python -m json.tool
This request shows an output, for example, as below.
{ "appId": "application_1539107884019_0071", "appInfo": { "driverLogUrl": "http://hostname.hwx.site:8188/applicationhistory/logs/hostname.hwx.site:25454/container_e02_1539107884019_0071_01_000001/container_e02_1539107884019_0071_01_000001/hive", "sparkUiUrl": "http://hostname.hwx.site:8088/proxy/application_1539107884019_0071/" }, "id": 1, "log": [ "\t queue: default", "\t start time: 1539151855183", "\t final status: UNDEFINED", "\t tracking URL: http://hostname.hwx.site:8088/proxy/application_1539107884019_0071/", "\t user: hive", "18/10/10 06:10:55 INFO ShutdownHookManager: Shutdown hook called", "18/10/10 06:10:55 INFO ShutdownHookManager: Deleting directory /tmp/spark-6e254b5c-50f9-40d0-af06-c37d8c1a6428", "18/10/10 06:10:55 INFO ShutdownHookManager: Deleting directory /tmp/spark-253eac26-e86e-4699-82e6-ad281702fb85", "\nstderr: ", "\nYARN Diagnostics: " ], "state": "success" }
Note: The example above used Kerberized cluster; therefore, --negotiate
option is used in curl
.
Note: Local directories are disallowed by default for security reasons. Set the local directory to livy.file.local-dir-whitelist
to allow a specific directory to allow to read. To avoid this setting, please use HDFS path after uploading the JAR and zipped file.
Note: To use HWC with Livy in a secure cluster please follow the documentation here.
6. Limitations
- Missing SQL support including USING syntax: currently Data Source V2 implementation does not support SQL syntax including USING syntax
- Missing R library support: this library currently supports only Scala, Java, and Python APIs.
- Limitations in Apache Arrow integration and Data Source V2 are inherited. See SPARK-22386 for Data Source V2 and SPARK-21187 for Apache Arrow integration in Apache Spark to track ongoing efforts and limitations.
- Supported/unsupported data types and their mapping between Apache Hive and Apache Spark can be found at this link.
7. Appendix
- HDP 3.0.1 documentation
- Hive Warehouse Connector Github
- Introducing Row/ Column Level Access Control for Apache Spark
SQL INSERT
queries to insert https://ucr.fbi.gov/crime-in-the-u.s/2016/crime-in-the-u.s.-2016/topic-pages/tables/table-1
CREATE DATABASE hwc_db; USE hwc_db; CREATE TABLE crimes(year INT, crime_rate DOUBLE); INSERT INTO crimes VALUES (1997, 611.0), (1998, 567.6), (1999, 523.0), (2000, 506.5), (2001, 504.5), (2002, 494.4), (2003, 475.8); INSERT INTO crimes VALUES (2004, 463.2), (2005, 469.0), (2006, 479.3), (2007, 471.8), (2008, 458.6), (2009, 431.9), (2010, 404.5); INSERT INTO crimes VALUES (2011, 387.1), (2012, 387.8), (2013, 369.1), (2014, 361.6), (2015, 373.7), (2016, 386.3);
Created on 10-17-2018 12:11 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Great article !!!
Created on 10-17-2018 02:35 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Thanks, very helpful.
Created on 11-16-2018 11:47 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
How to save a table using HWC of a dataframe with hundred columns. This seems very impractical.
val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build() hive.createTable("newTable") .ifNotExists() .column("ws_sold_time_sk", "bigint") ... // 500 columns .column("ws_ship_date_sk", "bigint") .create() df.write.format(HIVE_WAREHOUSE_CONNECTOR) .option("table", "newTable") .save()
Created on 01-07-2019 11:38 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Hi Hyukjin Kwon and thank you for your article!
I'm trying to write to MongoDB, using spark, a DataSet<Row> created from method HiveWarehouseSession.executeQuery. MongoSpark.save command is able to write DataSet to the selected collection but the Job not end. You know a best way to do that. I'm using Ambari Cluster. Thank you. https://community.hortonworks.com/questions/231385/save-dataframe-loaded-from-hive-to-mongodb-using-...
Created on 01-11-2019 02:36 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Thank you for posting this very informative Hive Warehouse Connector article.
I followed your steps but got errors when running pyspark, see below my steps and error.
Can you shed some light on this error. Thank you in advance for your help.
- 1)Downloaded HDP Sandbox 3.0.1 and set it up
Hive: 3.1.0
Spark: 2.3.1
- 2)Enabled ‘Interactive Query’ in Hive
- 3)Appended the following to /usr/hdp/3.0.1.0-187/etc/spark2/conf/spark-defaults.conf
spark.hadoop.hive.llap.daemon.service.hosts @llap0
spark.sql.hive.hiveserver2.jdbc.url jdbc:hive2://sandbox-hdp.hortonworks.com:10000
spark.datasource.hive.warehouse.load.staging.dir /tmp
spark.datasource.hive.warehouse.metastoreUri thrift://sandbox-hdp.hortonworks.com:9083
spark.hadoop.hive.zookeeper.quorum sandbox-hdp.hortonworks.com:2181
- 4)Ran pyspark:
pyspark --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \ --py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip \ --conf spark.security.credentials.hiveserver2.enabled=false |
- 5)Got below error:
[root@~]# pyspark --master yarn \ > --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \ > --py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip \ > --conf spark.security.credentials.hiveserver2.enabled=false SPARK_MAJOR_VERSION is set to 2, using Spark2 Python 2.7.5 (default, Jul 13 2018, 13:06:57) [GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2 Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/01/10 22:57:49 ERROR SparkContext: Error initializing SparkContext. org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1547158600691_0002 to YARN : org.apache.hadoop.security.AccessControlException: Queue root.default already has 0 applications, cannot accept submission of application: application_1547158600691_0002 at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:304) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:174) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164) at org.apache.spark.SparkContext.<init>(SparkContext.scala:500) at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:238) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) 19/01/10 22:57:49 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered! 19/01/10 22:57:49 WARN MetricsSystem: Stopping a MetricsSystem that is not running 19/01/10 22:57:49 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext may be running in this JVM (see SPARK-2243). The other SparkContext was created at: org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58) sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) java.lang.reflect.Constructor.newInstance(Constructor.java:423) py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) py4j.Gateway.invoke(Gateway.java:238) py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) py4j.GatewayConnection.run(GatewayConnection.java:238) java.lang.Thread.run(Thread.java:748) 19/01/10 22:57:57 ERROR SparkContext: Error initializing SparkContext. org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1547158600691_0003 to YARN : org.apache.hadoop.security.AccessControlException: Queue root.default already has 0 applications, cannot accept submission of application: application_1547158600691_0003 at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:304) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:174) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164) at org.apache.spark.SparkContext.<init>(SparkContext.scala:500) at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) |
Created on 01-11-2019 02:36 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Continued previous post:
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
19/01/10 22:57:57 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
19/01/10 22:57:57 WARN MetricsSystem: Stopping a MetricsSystem that is not running
Traceback (most recent call last):
File "/usr/hdp/current/spark2-client/python/pyspark/shell.py", line 54, in <module>
spark = SparkSession.builder.getOrCreate()
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 173, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 353, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 119, in __init__
conf, jsc, profiler_cls)
File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 181, in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 292, in _initialize_context
return self._jvm.JavaSparkContext(jconf)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1547158600691_0003 to YARN : org.apache.hadoop.security.AccessControlException: Queue root.default already has 0 applications, cannot accept submission of application: application_1547158600691_0003
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:304)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:174)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:500)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Created on 04-09-2019 02:44 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Hi guys,
i followed the above steps, and was able to execute commands like ( show databases, show tables) successfully, also created a database from spark-shell and created a table and inserted some data in it, but i am not able to query the data either from the newly created table from spark, nor the tables that already exists in hive, and getting this error
java.lang.AbstractMethodError: Method com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataSourceReader.createBatchDataReaderFactories()Ljava/util/List; is abstract at com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader.createBatchDataReaderFactories(HiveWarehouseDataSourceReader.java) |
the commands is as below:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build() hive.createTable("hwx_table").column("value", "string").create() hive.executeUpdate("insert into hwx_table values('1')") hive.executeQuery("select * from hwx_table").show |
then the error appears, i am using the below command to start spark-shell
spark-shell --master yarn --jars /usr/hdp/current/hive-warehouse-connector/hive-warehouse-connector_2.11-1.0.0.3.1.2.0-4.jar --conf spark.security.credentials.hiveserver2.enabled=false