Created 02-12-2019 11:23 AM
Hi There,
I am trying to insert data into Hive Managed table that has partition.
Show create table output for reference.
+--------------------------------------------------------------------------------------------------+--+ | createtab_stmt | +--------------------------------------------------------------------------------------------------+--+ | CREATE TABLE `part_test08`( | | `id` string, | | `name` string, | | `baseamount` double, | | `billtoaccid` string, | | `extendedamount` double, | | `netamount` decimal(19,5), | | `netunitamount` decimal(19,5), | | `pricingdate` timestamp, | | `quantity` int, | | `invoiceid` string, | | `shiptoaccid` string, | | `soldtoaccid` string, | | `ingested_on` timestamp, | | `external_id` string) | | PARTITIONED BY ( | | `productid` string) | | ROW FORMAT SERDE | | 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' | | STORED AS INPUTFORMAT | | 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' | | OUTPUTFORMAT | | 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' | | LOCATION | | 'wasb://default@rebatehdidefault3.blob.core.windows.net/hive/warehouse/db_103.db/part_test08' | | TBLPROPERTIES ( | | 'bucketing_version'='2', | | 'transactional'='true', | | 'transactional_properties'='default', | | 'transient_lastDdlTime'='1549962363') | +--------------------------------------------------------------------------------------------------+--+
Trying to execute sql statement to insert records into part table like below.
sparkSession.sql("INSERT INTO TABLE db_103.part_test08 PARTITION(ProductId) SELECT reflect('java.util.UUID', 'randomUUID'),stg_name,stg_baseamount,stg_billtoaccid,stg_extendedamount,stg_netamount,stg_netunitamount,stg_pricingdate,stg_quantity,stg_invoiceid,stg_shiptoaccid,stg_soldtoaccid,'2019-02-12 09:06:07.566',stg_id,stg_ProductId FROM tmp_table WHERE part_id IS NULL");
Without insert statement if we run select query then getting below data.
+-----------------------------------+--------+--------------+--------------------+------------------+-------------+-----------------+-------------------+------------+-------------+--------------------+--------------------+-----------------------+------+-------------+ |reflect(java.util.UUID, randomUUID)|stg_name|stg_baseamount| stg_billtoaccid|stg_extendedamount|stg_netamount|stg_netunitamount| stg_pricingdate|stg_quantity|stg_invoiceid| stg_shiptoaccid| stg_soldtoaccid|2019-02-12 09:06:07.566|stg_id|stg_ProductId| +-----------------------------------+--------+--------------+--------------------+------------------+-------------+-----------------+-------------------+------------+-------------+--------------------+--------------------+-----------------------+------+-------------+ | 4e0b4331-b551-42d...| OLI6| 16.0|2DD4E682-6B4F-E81...| 34.567| 1166.74380| 916.78000|2018-10-18 05:06:22| 13| I1|2DD4E682-6B4F-E81...|2DD4E682-6B4F-E81...| 2019-02-12 09:06:...| 6| P3| | 8b327a8e-dd3c-445...| OLI7| 16.0|2DD4E682-6B4F-E81...| 34.567| 766.74380| 1016.78000|2018-10-18 05:06:22| 13| I6|2DD4E682-6B4F-E81...|2DD4E682-6B4F-E81...| 2019-02-12 09:06:...| 7| P4| | c0e14b9a-8d1a-426...| OLI5| 14.6555| null| 34.56| 500.87000| 814.65000|2018-10-11 05:06:22| 45| I4|29B73C4E-846B-E71...|29B73C4E-846B-E71...| 2019-02-12 09:06:...| 5| P1| +-----------------------------------+--------+--------------+--------------------+------------------+-------------+-----------------+-------------------+------------+-------------+--------------------+--------------------+-----------------------+------+-------------+
Getting below error with Managed Table.
Exception in thread "main" org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Exception when loading 3 in table part_test04 with loadPath=wasb://default@rebatehdidefault3.blob.core.windows.net/hive/warehouse/db_103.db/part_test04/.hive-staging_hive_2019-02-12_07-22-38_389_7974756741328135674-1/-ext-10000; at org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(Hive.java:2481)
Exception when loading partition with parameters partPath=wasb://default@rebatehdidefault3.blob.core.windows.net/hive/warehouse/db_103.db/part_test04/.hive-staging_hive_2019-02-12_07-22-38_389_7974756741328135674-1/-ext-10000/productid=P1, table=part_test04, partSpec={productid=P1}, loadFileType=KEEP_EXISTING, listBucketingLevel=0, isAcid=false, hasFollowingStatsTask=false java.lang.NullPointerException at org.apache.hadoop.hive.ql.metadata.Hive.addWriteNotificationLog(Hive.java:2915) at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:2005)
Also I can see some warnings as below not sure its related to error or not.
Cannot get ACID state for db_103.part_test08 from null
One more note : If I use External Table then it is working fine.
We are using Azure HDInsight Spark 2.3 (HDI 4.0 Preview) cluster. Service Stacks as below.
HDFS : 3.1.1
Hive : 3.1.0
Spark2 : 2.3.1
Created on 02-15-2019 02:45 PM - edited 08-17-2019 02:36 PM
As described in SPARK-16996 and SPARK-15348, Spark currently doesn't support Hive ACID ( v1 (Hive 1.XX) or v2 (3.XX) )
To circumvent that you can use the Hive Warewhouse connector.
It will create the necessary link between the 2 components, by getting Spark to connect via Hive Server2.
I'm not sure if it's directly bundled into HDI (should be).
In any case, it's available publicly at :
https://github.com/hortonworks/hive-warehouse-connector-release/tree/HDP-3.0.1.10-7-tag
You'll find the documentation here :
Here's another HCC article that gives you a concrete example on how to use it. :
To get you started, here's a quick example of how to use it :
1. The Hive Warehouse connector must be given as a dependency to spark
spark.jars=[path to the Hive warehouse connector] usually : /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-[Build version].jar
2. It also requiers a few more configuration - basically describing where the Hive meta store and Hive Server2 instances reside
spark.datasource.hive.warehouse.metastoreUri=thrift://[YOUR METASTORE URI]:9083 spark.hadoop.hive.llap.daemon.service.hosts=@llap0 spark.hadoop.hive.zookeeper.quorum=[YOUR HIVE ZOOKEEPER QUORUM]:2181 spark.sql.hive.hiveserver2t.jdbc.url=[YOUR HIVE LLAP JDBC URL]
These can be passed as spark conf items
--conf spark.hadoop.hive.llap.daemon.service.hosts=@llap0
or as extra configuration parameters for spark notebooks (ex : zeppelin)
3. Create a hiveWarhouse context
import com.hortonworks.hwc.HiveWarehouseSession import com.hortonworks.hwc.HiveWarehouseSession._ val hive = HiveWarehouseSession.session(spark).build() //set a database hive.setDatabase("airline_ontime") //show table hive.showTables().show(100)
4. Query Data
val flights_df = hive.executeQuery("SELECT * FROM flights WHERE year = 1989") flights_df.createOrReplaceTempView("flights_1998") +-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+----+ |month|dayofmonth|dayofweek|deptime|crsdeptime|arrtime|crsarrtime|uniquecarrier|flightnum|tailnum|actualelapsedtime|crselapsedtime|airtime|arrdelay|depdelay|origin|dest|distance|taxiin|taxiout|cancelled|cancellationcode|diverted|carrierdelay|weatherdelay|nasdelay|securitydelay|lateaircraftdelay|year| +-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+----+ | 12| 25| 1| 1415| 1415| 1547| 1552| US| 1478| NA| 92| 97| null| -5| 0| TPA| CLT| 508| null| null| 0| NA| 0| null| null| null| null| null|1989|
5. Write data Back to Hive ( in ACID Format )
hive.table("flights").filter("month = 01") .write .format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR) .option("table", "flight_2019_01") .save()