I am running a spark application, where I am loading two tables as a dataframe, doing a left join, and generating a row number on records missing from right table.
I have my code below and my spark submit command as well.
spark-submit --master yarn --deploy-mode client --num-executors 16 --driver-memory 10g --executor-memory 7g --executor-cores 5 --class CLASS_NAME PATH_TO_JAR
public static void main(String[] args){
SparkConf conf = new SparkConf().setAppName("Spark Sequence Test");
JavaSparkContext jsc = new JavaSparkContext(conf);
HiveContext hc = new HiveContext(jsc.sc());
DataFrame cvxDataFrame = hc.sql("select * from Table_A");
DataFrame skDataFrame = hc.sql("select * from Table_B");
DataFrame new_df = cvxDataFrame.join(skDataFrame, cvxDataFrame.col("cvx_code").equalTo(skDataFrame.col("xref_code_id")), "left_outer");
DataFrame fordf = new_df.select(new_df.col("cvx_code").as("xref_code_id"),new_df.col("xref_code_sk")).filter(new_df.col("xref_code_sk").isNull());
Column rowNum = functions.row_number().over(Window.orderBy(fordf.col("xref_code_id")));
DataFrame df = fordf.select(fordf.col("xref_code_id"), rowNum.as("xref_code_sk"));
df.registerTempTable("final_result");
hc.sql("INSERT INTO TABLE TABLE_C SELECT xref_code_id, xref_code_sk, 'CVX' as xref_code_tp_cd from final_result" );
}
This works when both Table A and Table B has 50 million records, but It is failing when Table A has 50 million records and Table B has 0 records. The error I am getting is “Executor heartbeat timed out…”
ERROR cluster.YarnScheduler: Lost executor 7 on sas-hdp-d03.devapp.domain: Executor heartbeat timed out after 161445 ms
16/09/14 11:23:58 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 232, sas-hdp-d03.devapp.domain): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 161445 ms
I would really appreciate if anyone has any suggestion on how I can get around this.
Thanks