Created 08-10-2024 04:53 PM
Hi All,
I am trying to write my spark dataframe to hive table using HWC - Hive Warehouse connector. My spark application is in pySpark and i have 5 concurrent spark application running at same time and trying to write to same hive table probably at same time. I am getting following issue
Error- caused by: java.lang.RuntimeException: Error while compiling statement: FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.exec.MoveTask. java.io.FileNotFoundException: File hdfs://tablespace/managed/hive/my_db_name.db/hive_table_name/.hive-staging_hive_2024-08-05_15-55-26_488_5678420092852048777-45678
Does HWC won't allow concurrent write to same hive table? or its limitation with hive table?
Created 08-28-2024 12:40 AM
When writing to a statically partitioned table using HWC, the following query is internally fired to Hive through JDBC after writing data to a temporary location:
Spark write statement:
df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1='val1',c2='val2'").option("table", "t1").save();
HWC internal query:
LOAD DATA INPATH '<spark.datasource.hive.warehouse.load.staging.dir>' [OVERWRITE] INTO TABLE db.t1 PARTITION (c1='val1',c2='val2');
During static partitioning, the partition information is known during compile time, resulting in the creation of a staging directory in the partition directory.
On the other hand, when writing to a dynamically partitioned table using HWC, the following query is internally fired to Hive through JDBC after writing data to a temporary location:
Spark write statement:
df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1='val1',c2").option("table", "t1").save();
HWC internal query:
CREATE TEMPORARY EXTERNAL TABLE db.job_id_table(cols....) STORED AS ORC LOCATION '<spark.datasource.hive.warehouse.load.staging.dir>';
INSERT INTO TABLE t1 PARTITION (c1='val1',c2) SELECT <cols> FROM db.job_id_table;
During dynamic partitioning, the partition information is known during runtime, hence the staging directory is created at the table level. Once the DAG is completed, the MOVE TASK will move the files to the respective partitions.
Created 08-11-2024 11:37 PM
Could you please share the cluster version and spark-submit command .
What's the HWC execution mode?
Can you please share the complete StackTrace?
As the issue is with respect to MoveTask , HIVE-24163 can be a problem.
Created on 08-12-2024 01:38 AM - edited 08-12-2024 01:42 AM
Hi @ggangadharan , thanks for your replay
so basically my multiple spark jobs writes dataframes to same hive table via HWC. each spark job is different set of ingestion/transformations applied and am trying to write to hive table - kind of audit / logs table for each spark job ingestion status , time etc.. these two tables are common across all spark jobs. I am executing the spark job via oozie workflow. Following stack trace i could see :
Error creating/checking hive table An error occurred while calling o117.save. : org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:142) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:170) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:167) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:142) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:91) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:704) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:704) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:704) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:280) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.RuntimeException: Error while compiling statement: FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.exec.MoveTask. java.io.FileNotFoundException: File hdfs://HDFS-HA/warehouse/tablespace/managed/hive/my_db_name.db/my_log_table_name/.hive-staging_hive_2024-08-09_16-16-49_474_6465304774056330032-46249 does not exist. at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.commit(HiveWarehouseDataSourceWriter.java:232) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:76) ... 26 more Caused by: java.sql.SQLException: Error while compiling statement: FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.exec.MoveTask.
java.io.FileNotFoundException: File hdfs://HDFS-HA/warehouse/tablespace/managed/hive/my_db_name.db/my_log_table_name/.hive-staging_hive_2024-08-09_16-16-49_474_6465304774056330032-46249 does not exist. at org.apache.hive.jdbc.HiveStatement.waitForOperationToComplete(HiveStatement.java:411) at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:276) at org.apache.hive.jdbc.HivePreparedStatement.execute(HivePreparedStatement.java:101) at com.hortonworks.spark.sql.hive.llap.wrapper.PreparedStatementWrapper.execute(PreparedStatementWrapper.java:48) at com.hortonworks.spark.sql.hive.llap.JDBCWrapper.executeUpdate(HS2JDBCWrapper.scala:396) at com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper.executeUpdate(HS2JDBCWrapper.scala) at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.handleWriteWithSaveMode(HiveWarehouseDataSourceWriter.java:345) at com.hortonworks.spark.sql.hive.llap.writers.HiveWarehouseDataSourceWriter.commit(HiveWarehouseDataSourceWriter.java:230
HIVE version - Hive 3.1.3000.7.1.8.55-1
This is the way I am trying to ingest using spark.
df.write.mode("append").format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR)\
.option("table", table_name).option("database", atabase_name).save()
Created 08-12-2024 01:51 AM
Can you please share the stacktrace from HiveServer2 logs and spark-submit command used
Created 08-12-2024 02:02 AM
sure. I will share. It seems this hive staging directory is being created at table level ? I have tried to add partitions and ingest to these partitions . my partitions was (batch name, date). so each spark job write to their respective batch name, However that also failed since hive staging temp dir. is been created at table level not in partition level
Created 08-22-2024 05:25 AM
make sure below is enabled at cluster level.
hive.acid.direct.insert.enabled
Also use below format to insert into partitioned tables.
Static partition
df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1='val1',c2='val2'").option("table", "t1").save();
Dynamic partition
df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1,c2").option("table", "t1").save();
Created 08-23-2024 01:54 AM
Thank you. it seems hive warehouse connector creating these tmp/staging directories in table level rather than partition level.
Created on 08-23-2024 06:09 AM - edited 08-23-2024 06:10 AM
Since the partition related information not mentioned in the write statement staging directory created in the table directory instead of partition directory.
Created 08-26-2024 01:09 AM
I have tried with partition statement (dynamic) and still staging directory has been creatde at table level. it seems this works for only static partition
Created 08-28-2024 12:40 AM
When writing to a statically partitioned table using HWC, the following query is internally fired to Hive through JDBC after writing data to a temporary location:
Spark write statement:
df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1='val1',c2='val2'").option("table", "t1").save();
HWC internal query:
LOAD DATA INPATH '<spark.datasource.hive.warehouse.load.staging.dir>' [OVERWRITE] INTO TABLE db.t1 PARTITION (c1='val1',c2='val2');
During static partitioning, the partition information is known during compile time, resulting in the creation of a staging directory in the partition directory.
On the other hand, when writing to a dynamically partitioned table using HWC, the following query is internally fired to Hive through JDBC after writing data to a temporary location:
Spark write statement:
df.write.format(HIVE_WAREHOUSE_CONNECTOR).mode("append").option("partition", "c1='val1',c2").option("table", "t1").save();
HWC internal query:
CREATE TEMPORARY EXTERNAL TABLE db.job_id_table(cols....) STORED AS ORC LOCATION '<spark.datasource.hive.warehouse.load.staging.dir>';
INSERT INTO TABLE t1 PARTITION (c1='val1',c2) SELECT <cols> FROM db.job_id_table;
During dynamic partitioning, the partition information is known during runtime, hence the staging directory is created at the table level. Once the DAG is completed, the MOVE TASK will move the files to the respective partitions.