Created 06-22-2020 03:05 PM
Hi All
I am receiving below error while running my spark job which does insert overwrite of table.
I have configured below python code in oozie to perform insert overwrite for multiple types of files.
import argparse
from pyspark.sql import SparkSession
import re
def read_file(src_path, file_typ):
df = None
if file_typ.lower() == "parquet":
df = spark.read.parquet(src_path)
elif file_typ.lower() == "avro":
df = spark.read.format("com.databricks.spark.avro").load(src_path)
return df
def create_query(db_nm,tab_nm,db_dir,comp_typ,file_typ):
file_typ = file_typ.lower()
# get the colums in string with its datatype
cols_df = spark.sql("describe local_tab_" + tab_nm )
col_lst = cols_df.rdd.map(lambda x:str(x.col_name + ' ' + x.data_type)).collect()
cols = ",".join(col_lst)
cols = cols.lower()
# query to create table
query = "create table if not exists " + db_nm + "." + tab_nm + "(" + cols + ")" + " stored as " + file_typ + " location '" + db_dir + tab_nm + "' tblproperties('" + file_typ + ".compress'='" + comp_typ + "')"
out = spark.sql(query)
return query
def insert_cols_query(db_nm,tab_nm):
# get the colums in string
cols_df = spark.sql("show columns in local_tab_" + tab_nm )
col_lst = cols_df.rdd.map(lambda x:str(x.col_name)).collect()
cols = ",".join(col_lst)
cols = cols.lower()
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
# get the columns in string without the p_col column
query = "insert overwrite table " + db_nm + "." + tab_nm + " select " + cols + " from local_tab_" + tab_nm
out = spark.sql(query)
return query
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("--tabname",help="table name")
parser.add_argument("--fileformat",help="fileformat")
args = parser.parse_args()
if args.tabname:
tabname = args.tabname
if args.fileformat:
fileformat = args.fileformat
return (tabname,fileformat)
if __name__ == "__main__":
arg = get_args()
tab_nm = arg[0]
file_typ = arg[1]
#### code ########
src_path="/appldigi/data/raw/landing/" + tab_nm
db_nm="db_appldigi"
db_dir="/appldigi/hive/db_appldigi/"
comp_typ="snappy"
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
df = read_file(src_path, file_typ)
df.createOrReplaceTempView("local_tab_" + tab_nm )
chk_tab = spark.sql("show tables in " + db_nm)
flag_tab_exists = chk_tab.filter(chk_tab.isTemporary=='false').filter(chk_tab.tableName==tab_nm.lower()).count()
out=""
if flag_tab_exists==0: # should not happened
out = create_query(db_nm,tab_nm,db_dir,comp_typ,file_typ)
out = insert_cols_query(db_nm,tab_nm)
print out
Created 06-26-2020 07:00 AM
It seems there are issues is removing trash from the user folder for which disk space quota is enabled leading to errors exceeding quota.
What is the spark version? The support for purge was added in Spark 2.0:
As a workaround you can alter table properties to enable auto purge.
ALTER TABLE table_name SET TBLPROPERTIES('auto.purge'='TRUE');
Hope this helps,
Paras
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created 06-27-2020 08:39 AM
Hi Paras,
autopurge=true will not work for external tables. I tried this and its not working.
In spark there is one configuration --conf spark.hadoop.dfs.user.home.dir.prefix=/tmp --executor-memory 5G --num-executors 5, which directed the .Trash files to Temperory directory.
Now same I want to implement for Hiveql through oozie action and looking for parameters to redirect the temporary directory to /tmp.
Created 06-29-2020 01:56 AM
You should consider clearing the user .Trash directory or consider increasing the disk space quota.
You can try changing the 'yarn.app.mapreduce.am.staging-dir' in YARN configuration. 'yarn.app.mapreduce.am.staging-dir' is the property used to set the root HDFS directory of the staging area for users' MR2 jobs. By default this is set /user. You can change this to a different location that doesn't have any quota. YARN staging directories will be created under the new path always named after the user.
Created 06-29-2020 09:14 AM
@paras , Thanks a lot for your reply. The solution you had provided was for spark oozie action.
I was able to solve this using the same configuration --conf spark.hadoop.dfs.user.home.dir.prefix=/tmp 2 days ago.
This was during ingestion part of flow. So ultimately my sqoop and spark jobs are redirecting any .Trash to my tmp directory which has enough quota. Now I am facing this issue with Hive action where I am not sure of such configuration equivalend to --conf spark.hadoop.dfs.user.home.dir.prefix=/appldigi/tmp or
-Dyarn.app.mapreduce.am.staging-dir=/tmp.
Can you please guide on this . I am unable to solve this.
I am trying to execute hiveql which is insert overwrite script. I have already tried auto.purge = true option which is not working
Created 06-30-2020 05:44 AM
Can you please share your workflow.xml to understand exactly how you are trying to execute the operations?
What is the behaviour you observe when you run insert overwrite query from beeline?
You can use the below query to set the hive scratch directories to set the temporary output path.
Let me know if this helps.
Created 07-05-2020 02:49 PM
@paras ,Please find workflow.xml as below.
<workflow-app name="Customer_Journey_FirstActions_Events_Set2"
xmlns="uri:oozie:workflow:0.5">
<credentials>
<credential name="hs2-cred" type="hive2">
<property>
<name>hive2.jdbc.url</name>
<value>jdbc:hive2://adcuxxxx.adcbmis.local:10001/db_appldigi;transportMode=http;httpPath=cliservice</value>
</property>
<property>
<name>hive2.server.principal</name>
<value>hive/adcuxxxx.adcbmis.local@xxxxxMIS.LOCAL</value>
</property>
</credential>
</credentials>
<start to="Trash1"/>
<action name="Trash1" cred="hs2-cred">
<fs>
<name-node>${nameNode}</name-node>
<delete path="/user/anja21614/.Trash"></delete>
</fs>
<ok to="FirstActions2"/>
<error to="kill"/>
</action>
<action name="FirstActions2" cred="hs2-cred">
<hive2
xmlns="uri:oozie:hive2-action:0.2">
<job-tracker>${resourceManager}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>fs.trash.interval</name>
<value>0</value>
</property>
<property>
<name>hive.exec.scratchdir</name>
<value>/appldigi/tmp</value>
</property>
</configuration>
<jdbc-url>jdbc:hive2://adcuxxxx.adcbmis.local:10001/db_appldigi;transportMode=http;httpPath=cliservice</jdbc-url>
<script>FirstActions2.hql</script>
<param>-Dyarn.app.mapreduce.am.staging-dir=/appldigi/tmp</param>
<file>/user/anja21614/CustomerJourney/FirstActions2.hql</file>
</hive2>
<ok to="Trash3"/>
<error to="kill"/>
</action>
<action name="Trash3" cred="hs2-cred">
<fs>
<name-node>${nameNode}</name-node>
<delete path="/user/anja21614/.Trash"></delete>
</fs>
<ok to="end"/>
<error to="kill"/>
</action>
<kill name="kill">
<message>${wf:errorMessage(wf:lastErrorNode())}</message>
</kill>
<end name="end"/>
</workflow-app>
please note you can find scratch directory configuration in workflow which I just added. Please check and let me know if the configuration is correctly mentioned, I had already tried this configuration in Ambari hive which was giving me error.