Member since
02-12-2019
1
Post
0
Kudos Received
0
Solutions
02-15-2019
02:45 PM
Hi @Raj Zalavadia
As described in SPARK-16996 and SPARK-15348, Spark currently doesn't support Hive ACID ( v1 (Hive 1.XX) or v2 (3.XX) )
To circumvent that you can use the Hive Warewhouse connector.
It will create the necessary link between the 2 components, by getting Spark to connect via Hive Server2. I'm not sure if it's directly bundled into HDI (should be). In any case, it's available publicly at :
https://github.com/hortonworks/hive-warehouse-connector-release/tree/HDP-3.0.1.10-7-tag You'll find the documentation here :
https://docs.hortonworks.com/HDPDocuments/HDP3/HDP-3.1.0/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html
Here's another HCC article that gives you a concrete example on how to use it. : https://community.hortonworks.com/articles/223626/integrating-apache-hive-with-apache-spark-hive-war.html To get you started, here's a quick example of how to use it :
1. The Hive Warehouse connector must be given as a dependency to spark
spark.jars=[path to the Hive warehouse connector]
usually : /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-[Build version].jar
2. It also requiers a few more configuration - basically describing where the Hive meta store and Hive Server2 instances reside
spark.datasource.hive.warehouse.metastoreUri=thrift://[YOUR METASTORE URI]:9083
spark.hadoop.hive.llap.daemon.service.hosts=@llap0
spark.hadoop.hive.zookeeper.quorum=[YOUR HIVE ZOOKEEPER QUORUM]:2181
spark.sql.hive.hiveserver2t.jdbc.url=[YOUR HIVE LLAP JDBC URL]
These can be passed as spark conf items
--conf spark.hadoop.hive.llap.daemon.service.hosts=@llap0
or as extra configuration parameters for spark notebooks (ex : zeppelin)
3. Create a hiveWarhouse context import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
val hive = HiveWarehouseSession.session(spark).build()
//set a database
hive.setDatabase("airline_ontime")
//show table
hive.showTables().show(100) 4. Query Data val flights_df = hive.executeQuery("SELECT * FROM flights WHERE year = 1989")
flights_df.createOrReplaceTempView("flights_1998")
+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+----+
|month|dayofmonth|dayofweek|deptime|crsdeptime|arrtime|crsarrtime|uniquecarrier|flightnum|tailnum|actualelapsedtime|crselapsedtime|airtime|arrdelay|depdelay|origin|dest|distance|taxiin|taxiout|cancelled|cancellationcode|diverted|carrierdelay|weatherdelay|nasdelay|securitydelay|lateaircraftdelay|year|
+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+----+
| 12| 25| 1| 1415| 1415| 1547| 1552| US| 1478| NA| 92| 97| null| -5| 0| TPA| CLT| 508| null| null| 0| NA| 0| null| null| null| null| null|1989| 5. Write data Back to Hive ( in ACID Format ) hive.table("flights").filter("month = 01")
.write
.format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR)
.option("table", "flight_2019_01")
.save()
... View more