Created 11-15-2022 02:56 AM
I m trying to use HWC (HiveWarehouseConnector) with Spark
- to append to an existing Hive ACID table
- to overwrite an existing Hive ACID table
- to append to a new Hive ACID table that does not exist before
- to overwrite to a new Hive ACID table that does not exist before
Writing to a new Hive ACID table works but writing to an existing table does not work using append/overwrite.
Please let me know how to use HWC to append/ overwrite existing ACID tables.
List of steps that I followed in CDP 7.1.7 SP1 single node cluster is given below:
1) Ran kinit username
2) Ran these below commands in beeline hive client command line.
drop database testDatabase cascade;
drop database tpcds_bin_partitioned_orc_1000 cascade;
create database tpcds_bin_partitioned_orc_1000;
use tpcds_bin_partitioned_orc_1000;
create managed table web_sales(ws_sold_time_sk bigint, ws_ship_date_sk bigint) stored as orc;
insert into web_sales values(80000,1), (80001,2), (80002,3);
create database testDatabase;
3)I prepared a file called hwc_example2.scala containing the same example as in
https://docs.cloudera.com/cdp-private-cloud-base/7.1.7/integrating-hive-and-bi/topics/hive-hwc-confi...
import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
val hive = HiveWarehouseSession.session(spark).build();
hive.setDatabase("tpcds_bin_partitioned_orc_1000");
val df = hive.sql("select * from web_sales");
df.createOrReplaceTempView("web_sales");
hive.setDatabase("testDatabase");
hive.createTable("newTable").ifNotExists().column("ws_sold_time_sk", "bigint").column("ws_ship_date_sk", "bigint").create();
sql("SELECT ws_sold_time_sk, ws_ship_date_sk FROM web_sales WHERE ws_sold_time_sk > 80000").write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("table", "newTable").save();
4)I stared the spark-shell using the below command:
spark-shell --master yarn --deploy-mode client --jars
/opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p1000.24102687/jars/hive-warehouse-connector-assembly-1.0.0.7.1.7.1000-141.jar
--conf spark.datasource.hive.warehouse.read.mode=secure_access
--conf spark.datasource.hive.warehouse.load.staging.dir=hdfs://ip-nn-nn-nn-nn.ec2.internal:8020/tmp/staging/hwc
--conf spark.yarn.dist.files=/etc/hive/conf.cloudera.hive/hive-site.xml,/etc/hive/conf/hive-env.sh
--conf spark.driver.extraJavaOptions=-Djavax.security.auth.useSubjectCredsOnly=false
--conf spark.sql.crossJoin.enabled=true --conf spark.hadoop.hive.enforce.bucketing=false
--conf spark.hadoop.hive.enforce.sorting=false
--conf spark.sql.hive.hiveserver2.jdbc.url=jdbc:hive2://ip-10-203-4-139.ec2.internal:10000/
--conf spark.sql.hive.hiveserver2.jdbc.url.principal=hive/ip-nn-nn-nn-nn.ec2.internal@CLOUDERALABS.COM
5) In the spark shell prompt ran the command ":load hwc_example2.scala" and got exception table name already exists. Exception trace is given below.
scala> :load hwc_example2.scala
Loading hwc_example2.scala...
import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
hive: com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl = com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl@a15e4fd
22/11/15 08:50:44 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ws_sold_time_sk: bigint, ws_ship_date_sk: bigint]
22/11/15 08:50:54 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
Hive Session ID = 90af6bc3-1efa-4ab4-982c-61d2deb46ece
22/11/15 08:51:04 ERROR v2.WriteToDataSourceV2Exec: Data source writer com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter@517c2be7 is aborting.
22/11/15 08:51:04 ERROR writers.HiveWarehouseDataSourceWriter: Aborted DataWriter job 69330822-9d06-48ac-baac-8c7eacfffebc
22/11/15 08:51:04 ERROR v2.WriteToDataSourceV2Exec: Data source writer com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter@517c2be7 aborted.
org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:141)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:137)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:162)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:137)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:93)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:91)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:704)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:704)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:704)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:280)
... 78 elided
Caused by: java.lang.RuntimeException: Error while compiling statement: FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.ddl.DDLTask. AlreadyExistsException(message:Table hive.testDatabase.newTable already exists)
at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.commit(HiveWarehouseDataSourceWriter.java:225)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:76)
... 93 more
Caused by: java.sql.SQLException: Error while compiling statement: FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.ddl.DDLTask. AlreadyExistsException(message:Table hive.testDatabase.newTable already exists)
at org.apache.hive.jdbc.HiveStatement.waitForOperationToComplete(HiveStatement.java:411)
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:276)
at org.apache.hive.jdbc.HivePreparedStatement.execute(HivePreparedStatement.java:101)
at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:94)
at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:94)
at com.hortonworks.spark.sql.hive.llap.wrapper.PreparedStatementWrapper.execute(PreparedStatementWrapper.java:37)
at com.hortonworks.spark.sql.hive.llap.JDBCWrapper.executeUpdate(HS2JDBCWrapper.scala:370)
at com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper.executeUpdate(HS2JDBCWrapper.scala)
at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.handleWriteWithSaveMode(HiveWarehouseDataSourceWriter.java:330)
at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.commit(HiveWarehouseDataSourceWriter.java:223)
... 94 more
scala>
Thanks
Radhakrishnan