Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Unable to append overwrite Hive ACID table using spark and HWC

avatar
New Contributor

I m trying to use HWC (HiveWarehouseConnector) with Spark
- to append to an existing Hive ACID table
- to overwrite an existing Hive ACID table
- to append to a new Hive ACID table that does not exist before
- to overwrite to a new Hive ACID table that does not exist before

Writing to a new Hive ACID table works but writing to an existing table does not work using append/overwrite.

 

Please let me know how to use HWC to append/ overwrite existing ACID tables.

 

List of steps that I followed in CDP 7.1.7 SP1 single node cluster is given below:

1) Ran kinit username
2) Ran these below commands in beeline hive client command line.
drop database testDatabase cascade;
drop database tpcds_bin_partitioned_orc_1000 cascade;
create database tpcds_bin_partitioned_orc_1000;
use tpcds_bin_partitioned_orc_1000;
create managed table web_sales(ws_sold_time_sk bigint, ws_ship_date_sk bigint) stored as orc;
insert into web_sales values(80000,1), (80001,2), (80002,3);
create database testDatabase;

3)I prepared a file called hwc_example2.scala containing the same example as in
https://docs.cloudera.com/cdp-private-cloud-base/7.1.7/integrating-hive-and-bi/topics/hive-hwc-confi...
import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
val hive = HiveWarehouseSession.session(spark).build();
hive.setDatabase("tpcds_bin_partitioned_orc_1000");
val df = hive.sql("select * from web_sales");
df.createOrReplaceTempView("web_sales");
hive.setDatabase("testDatabase");
hive.createTable("newTable").ifNotExists().column("ws_sold_time_sk", "bigint").column("ws_ship_date_sk", "bigint").create();
sql("SELECT ws_sold_time_sk, ws_ship_date_sk FROM web_sales WHERE ws_sold_time_sk > 80000").write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("table", "newTable").save();

4)I stared the spark-shell using the below command:
spark-shell --master yarn --deploy-mode client --jars
/opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p1000.24102687/jars/hive-warehouse-connector-assembly-1.0.0.7.1.7.1000-141.jar
--conf spark.datasource.hive.warehouse.read.mode=secure_access
--conf spark.datasource.hive.warehouse.load.staging.dir=hdfs://ip-nn-nn-nn-nn.ec2.internal:8020/tmp/staging/hwc
--conf spark.yarn.dist.files=/etc/hive/conf.cloudera.hive/hive-site.xml,/etc/hive/conf/hive-env.sh
--conf spark.driver.extraJavaOptions=-Djavax.security.auth.useSubjectCredsOnly=false
--conf spark.sql.crossJoin.enabled=true --conf spark.hadoop.hive.enforce.bucketing=false
--conf spark.hadoop.hive.enforce.sorting=false
--conf spark.sql.hive.hiveserver2.jdbc.url=jdbc:hive2://ip-10-203-4-139.ec2.internal:10000/
--conf spark.sql.hive.hiveserver2.jdbc.url.principal=hive/ip-nn-nn-nn-nn.ec2.internal@CLOUDERALABS.COM

5) In the spark shell prompt ran the command ":load hwc_example2.scala" and got exception table name already exists. Exception trace is given below.
scala> :load hwc_example2.scala
Loading hwc_example2.scala...
import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
hive: com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl = com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl@a15e4fd
22/11/15 08:50:44 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ws_sold_time_sk: bigint, ws_ship_date_sk: bigint]
22/11/15 08:50:54 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
Hive Session ID = 90af6bc3-1efa-4ab4-982c-61d2deb46ece
22/11/15 08:51:04 ERROR v2.WriteToDataSourceV2Exec: Data source writer com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter@517c2be7 is aborting.
22/11/15 08:51:04 ERROR writers.HiveWarehouseDataSourceWriter: Aborted DataWriter job 69330822-9d06-48ac-baac-8c7eacfffebc
22/11/15 08:51:04 ERROR v2.WriteToDataSourceV2Exec: Data source writer com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter@517c2be7 aborted.
org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:141)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:137)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:162)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:137)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:93)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:91)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:704)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:704)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:704)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:280)
... 78 elided
Caused by: java.lang.RuntimeException: Error while compiling statement: FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.ddl.DDLTask. AlreadyExistsException(message:Table hive.testDatabase.newTable already exists)
at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.commit(HiveWarehouseDataSourceWriter.java:225)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:76)
... 93 more
Caused by: java.sql.SQLException: Error while compiling statement: FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.ddl.DDLTask. AlreadyExistsException(message:Table hive.testDatabase.newTable already exists)
at org.apache.hive.jdbc.HiveStatement.waitForOperationToComplete(HiveStatement.java:411)
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:276)
at org.apache.hive.jdbc.HivePreparedStatement.execute(HivePreparedStatement.java:101)
at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:94)
at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:94)
at com.hortonworks.spark.sql.hive.llap.wrapper.PreparedStatementWrapper.execute(PreparedStatementWrapper.java:37)
at com.hortonworks.spark.sql.hive.llap.JDBCWrapper.executeUpdate(HS2JDBCWrapper.scala:370)
at com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper.executeUpdate(HS2JDBCWrapper.scala)
at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.handleWriteWithSaveMode(HiveWarehouseDataSourceWriter.java:330)
at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.commit(HiveWarehouseDataSourceWriter.java:223)
... 94 more

scala>

 

Thanks
Radhakrishnan

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Basic spark-submit command with respect to HWC - JDBC_CLUSTER mode

 pyspark --master yarn   --jars /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.7.1.8.0-801.jar   --py-files /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/pyspark_hwc-1.0.0.7.1.8.0-801.zip   --conf spark.sql.hive.hiveserver2.jdbc.url='jdbc:hive2://c3757-node2.coelab.cloudera.com:2181,c3757-node3.coelab.cloudera.com:2181,c3757-node4.coelab.cloudera.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2'   --conf spark.datasource.hive.warehouse.read.mode='JDBC_CLUSTER'   --conf spark.datasource.hive.warehouse.load.staging.dir='/tmp'   --conf spark.sql.extensions=com.hortonworks.spark.sql.rule.Extensions   --conf spark.kryo.registrator=com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator



To append data to an existing Hive ACID table, ensure that you specify the save mode as 'append'.

Example 

Using Python version 2.7.5 (default, Jun 28 2022 15:30:04)
SparkSession available as 'spark'.
>>> from pyspark_llap import HiveWarehouseSession
>>> hive = HiveWarehouseSession.session(spark).build()
>>> df=hive.sql("select * from spark_hwc.employee")
23/10/10 17:20:00 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
23/10/10 17:20:08 INFO rule.HWCSwitchRule: Registering Listeners
>>> df.write.mode("append").format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table", "spark_hwc.employee_new").save()
>>>
>>>
>>> hive.sql("select count(*) from spark_hwc.employee_new").show()
23/10/10 17:22:04 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
+---+
|_c0|
+---+
|  5|
+---+

>>>

To overwrite data to an existing Hive ACID table, ensure that you specify the save mode as 'overwrite'.

Example 

>>> df.write.mode("overwrite").format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table", "spark_hwc.employee_new").save()
>>>


To append or overwrite a new Hive ACID table, there's no need to specify the saveMode explicitly. The HWC  will automatically create the new ACID table based on its structure and internally trigger the LOAD DATA INPATH command


Ref - https://docs.cloudera.com/cdp-private-cloud-base/7.1.8/integrating-hive-and-bi/topics/hive-read-writ...

View solution in original post

1 REPLY 1

avatar
Expert Contributor

Basic spark-submit command with respect to HWC - JDBC_CLUSTER mode

 pyspark --master yarn   --jars /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.7.1.8.0-801.jar   --py-files /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/pyspark_hwc-1.0.0.7.1.8.0-801.zip   --conf spark.sql.hive.hiveserver2.jdbc.url='jdbc:hive2://c3757-node2.coelab.cloudera.com:2181,c3757-node3.coelab.cloudera.com:2181,c3757-node4.coelab.cloudera.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2'   --conf spark.datasource.hive.warehouse.read.mode='JDBC_CLUSTER'   --conf spark.datasource.hive.warehouse.load.staging.dir='/tmp'   --conf spark.sql.extensions=com.hortonworks.spark.sql.rule.Extensions   --conf spark.kryo.registrator=com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator



To append data to an existing Hive ACID table, ensure that you specify the save mode as 'append'.

Example 

Using Python version 2.7.5 (default, Jun 28 2022 15:30:04)
SparkSession available as 'spark'.
>>> from pyspark_llap import HiveWarehouseSession
>>> hive = HiveWarehouseSession.session(spark).build()
>>> df=hive.sql("select * from spark_hwc.employee")
23/10/10 17:20:00 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
23/10/10 17:20:08 INFO rule.HWCSwitchRule: Registering Listeners
>>> df.write.mode("append").format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table", "spark_hwc.employee_new").save()
>>>
>>>
>>> hive.sql("select count(*) from spark_hwc.employee_new").show()
23/10/10 17:22:04 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
+---+
|_c0|
+---+
|  5|
+---+

>>>

To overwrite data to an existing Hive ACID table, ensure that you specify the save mode as 'overwrite'.

Example 

>>> df.write.mode("overwrite").format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table", "spark_hwc.employee_new").save()
>>>


To append or overwrite a new Hive ACID table, there's no need to specify the saveMode explicitly. The HWC  will automatically create the new ACID table based on its structure and internally trigger the LOAD DATA INPATH command


Ref - https://docs.cloudera.com/cdp-private-cloud-base/7.1.8/integrating-hive-and-bi/topics/hive-read-writ...