Support Questions

Find answers, ask questions, and share your expertise

Hive Warehouse Connector concurrent write to Hive table issue using spark

avatar
Contributor

Hi All,

I am trying to write my spark dataframe to hive table using HWC - Hive Warehouse connector. My spark application is in pySpark and i have 5 concurrent spark application running at same time and trying to write to same hive table probably at same time. I am getting following issue 


Error- caused by: java.lang.RuntimeException: Error while compiling statement: FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.exec.MoveTask. java.io.FileNotFoundException: File hdfs://tablespace/managed/hive/my_db_name.db/hive_table_name/.hive-staging_hive_2024-08-05_15-55-26_488_5678420092852048777-45678

Does HWC won't allow concurrent write to same hive table? or its limitation with hive table?

 

 

1 ACCEPTED SOLUTION

avatar
Master Collaborator

When writing to a statically partitioned table using HWC, the following query is internally fired to Hive through JDBC after writing data to a temporary location:

Spark write statement:

df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1='val1',c2='val2'").option("table", "t1").save();

 HWC internal query:

LOAD DATA INPATH '<spark.datasource.hive.warehouse.load.staging.dir>' [OVERWRITE] INTO TABLE db.t1 PARTITION (c1='val1',c2='val2');

 During static partitioning, the partition information is known during compile time, resulting in the creation of a staging directory in the partition directory.

 

On the other hand, when writing to a dynamically partitioned table using HWC, the following query is internally fired to Hive through JDBC after writing data to a temporary location:

Spark write statement:

df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1='val1',c2").option("table", "t1").save();

HWC internal query:

CREATE TEMPORARY EXTERNAL TABLE db.job_id_table(cols....) STORED AS ORC LOCATION '<spark.datasource.hive.warehouse.load.staging.dir>'; 
INSERT INTO TABLE t1 PARTITION (c1='val1',c2) SELECT <cols> FROM db.job_id_table;


During dynamic partitioning, the partition information is known during runtime, hence the staging directory is created at the table level. Once the DAG  is completed, the MOVE TASK will move the files to the respective partitions. 

View solution in original post

9 REPLIES 9

avatar
Master Collaborator

Could you please share the cluster version and spark-submit command . 

What's the HWC execution mode?
Can you please share the complete StackTrace?

As the issue is with respect to MoveTask , HIVE-24163  can be a problem. 


avatar
Contributor

Hi @ggangadharan , thanks for your replay

so basically my multiple spark jobs writes dataframes to same hive table via HWC. each spark job is different set of ingestion/transformations applied and am trying to  write to hive table - kind of audit / logs table for each spark job ingestion status , time etc.. these two tables are common across all spark jobs. I am executing the spark job via oozie workflow. Following stack trace i could see :

 

 

Error creating/checking hive table An error occurred while calling o117.save. : org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:142) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:170) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:167) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:142) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:91) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:704) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:704) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:704) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:280) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.RuntimeException: Error while compiling statement: FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.exec.MoveTask. java.io.FileNotFoundException: File hdfs://HDFS-HA/warehouse/tablespace/managed/hive/my_db_name.db/my_log_table_name/.hive-staging_hive_2024-08-09_16-16-49_474_6465304774056330032-46249 does not exist. at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.commit(HiveWarehouseDataSourceWriter.java:232) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:76) ... 26 more Caused by: java.sql.SQLException: Error while compiling statement: FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.exec.MoveTask.
java.io.FileNotFoundException: File hdfs://HDFS-HA/warehouse/tablespace/managed/hive/my_db_name.db/my_log_table_name/.hive-staging_hive_2024-08-09_16-16-49_474_6465304774056330032-46249 does not exist. at org.apache.hive.jdbc.HiveStatement.waitForOperationToComplete(HiveStatement.java:411) at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:276) at org.apache.hive.jdbc.HivePreparedStatement.execute(HivePreparedStatement.java:101) at com.hortonworks.spark.sql.hive.llap.wrapper.PreparedStatementWrapper.execute(PreparedStatementWrapper.java:48) at com.hortonworks.spark.sql.hive.llap.JDBCWrapper.executeUpdate(HS2JDBCWrapper.scala:396) at com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper.executeUpdate(HS2JDBCWrapper.scala) at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.handleWriteWithSaveMode(HiveWarehouseDataSourceWriter.java:345) at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.commit(HiveWarehouseDataSourceWriter.java:230

 

HIVE version -  Hive 3.1.3000.7.1.8.55-1

 

 

This is the way I am trying to ingest using spark.
df.write.mode("append").format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR)\
.option("table", table_name).option("database", atabase_name).save()

avatar
Master Collaborator

Can you please share the stacktrace from HiveServer2 logs  and spark-submit command used

avatar
Contributor

sure. I will share. It seems this hive staging directory is being created at table level ? I have tried to add partitions and ingest to these partitions . my partitions was (batch name, date). so each spark job write to their respective batch name, However that also failed since hive staging temp dir. is been created at table level not in partition level

avatar
Master Collaborator

make sure below is enabled at cluster level. 

hive.acid.direct.insert.enabled


Also use below format to insert into partitioned tables. 

Static partition

 

df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1='val1',c2='val2'").option("table", "t1").save();

 


Dynamic partition

 

df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1,c2").option("table", "t1").save(); 

 

avatar
Contributor

Thank you. it seems hive warehouse connector creating these tmp/staging directories in table level rather than partition level. 

avatar
Master Collaborator

Since the partition related information not mentioned in the write statement staging directory created in the table directory instead of partition directory. 

avatar
Contributor

I have tried with partition statement (dynamic) and still staging directory has been creatde at table level. it seems this works for only static partition

avatar
Master Collaborator

When writing to a statically partitioned table using HWC, the following query is internally fired to Hive through JDBC after writing data to a temporary location:

Spark write statement:

df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1='val1',c2='val2'").option("table", "t1").save();

 HWC internal query:

LOAD DATA INPATH '<spark.datasource.hive.warehouse.load.staging.dir>' [OVERWRITE] INTO TABLE db.t1 PARTITION (c1='val1',c2='val2');

 During static partitioning, the partition information is known during compile time, resulting in the creation of a staging directory in the partition directory.

 

On the other hand, when writing to a dynamically partitioned table using HWC, the following query is internally fired to Hive through JDBC after writing data to a temporary location:

Spark write statement:

df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1='val1',c2").option("table", "t1").save();

HWC internal query:

CREATE TEMPORARY EXTERNAL TABLE db.job_id_table(cols....) STORED AS ORC LOCATION '<spark.datasource.hive.warehouse.load.staging.dir>'; 
INSERT INTO TABLE t1 PARTITION (c1='val1',c2) SELECT <cols> FROM db.job_id_table;


During dynamic partitioning, the partition information is known during runtime, hence the staging directory is created at the table level. Once the DAG  is completed, the MOVE TASK will move the files to the respective partitions.