Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark SQL Job stcuk indefinitely at last task of a stage -- Shows INFO: BlockManagerInfo : Removed broadcast in memory

avatar
Explorer

Hi,

I am working on HDP 2.4.2 ( hadoop 2.7, hive 1.2.1 , JDK 1.8, scala 2.10.5 ) . My Spark/Scala job reads hive table ( using Spark-SQL) into DataFrames ,performs few Left joins and insert the final results into a Hive Table which is partitioned. The source tables having apprx 50millions of records. Spark creates 74 stages for this job. It executes 72 stages successfully but hangs at 499th task of 73rd stage, and not able to execute the final stage no 74. I can see many message on console i:e "INFO: BlockManagerInfo : Removed broadcast in memory" .

...it doesn't show any error/exception...even after 1 hours it doesn't come out and only way is to Kill the job.

I have total 15 nodes with 40Gb RAM with 6 cores in each node. I am using spark-submit in yarn client mode . Scheduling is configured as FIFO and my job is consuming 79% of resources.

Can anybody advise on this. whats could be the issue?

Regards

Praveen Khare

10 REPLIES 10

avatar
Contributor

Could you share more details like command used to execute and input size?

avatar
Explorer

Thank Puneet for reply..here is my command & other information

spark-submit --master yarn-client --driver-memory 15g --num-executors 25 --total-executor-cores 60 --executor-memory 15g --driver-cores 2 --conf "spark.executor.memory=-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms10g -Xmx10g -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThread=20" --class logicdriver logic.jar

configuration .....

ContextService.getHiveContext.sql("SET hive.execution.engine=tez"); ContextService.getHiveContext.sql("SET hive.optimize.tez=true"); ContextService.getHiveContext.sql("set hive.vectorized.execution.enabled = true "); ContextService.getHiveContext.sql("set hive.vectorized.execution.reduce.enabled = true "); ContextService.getHiveContext.sql("set spark.sql.shuffle.partitions=2050"); ContextService.getHiveContext.sql("SET spark.sql.hive.metastore.version=0.14.0.2.2.4.10-1"); ContextService.getHiveContext.sql("SET hive.warehouse.data.skipTrash=true "); ContextService.getHiveContext.sql("SET hive.exec.dynamic.partition = true "); ContextService.getHiveContext.sql("SET hive.exec.dynamic.partition.mode=nonstrict "); ContextService.getHiveContext.sql("SET spark.driver.maxResultSize= 8192"); ContextService.getHiveContext.sql("SET spark.default.parallelism = 350"); ContextService.getHiveContext.sql("SET spark.yarn.executor.memoryOverhead=1024");

data---------

It reads data from from 2 tables and perform join and put result in Dataframes...then again read new tables and does join on previous Dataframe...this cycle goes for 7-8 times and finally it insert result in hive.

first table has - 63245969 records

2nd table has - 49275922 records....all the tables have records in this range.

avatar
Contributor

Hi Praveen,

Here are a few points to help:

1. Try running your API without options like "--driver-memory 15g --num-executors 25 --total-executor-cores 60 --executor-memory 15g --driver-cores 2" and check logs for memory allocated to RDDs/DataFrames.

2. Driver doesn't need 15g memory if you are not collecting data on driver. Try setting it to 4g rather. I hope u r not using .collect() or similar operations which collect all data to driver.

3. The error needs fine tuning your configurations between executor memory and driver memory. The total number of executors(25) are pretty much higher considering the memory allocated(15g). Reduce number of executors and consider allocating less memory(4g to start with).

Thanks,

Puneet

avatar
Explorer

Okay...I will try these optiona and update. thank you

avatar
Explorer

Before your suggestion, I had started a run with same configuration...I got below issues in my logs

16/07/18 09:24:52 INFO RetryInvocationHandler: Exception while invoking renewLease of class ClientNamenodeProtocolTranslatorPB over . Trying to fail over immediately.

java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details :

:

:

Already tried 8 time(s); retry policy is RetryPolicy[MultipleLinearRandomRetry[500x2000ms], TryOnceThenFail]

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standby

It seems some issues in HDFS/NN or cluster itself.....

avatar
Contributor

avatar
Explorer

Hi Puneet --as per suggestion I tried with

--driver-memory 4g --num-executors 15 --total-executor-cores 30 --executor-memory 10g --driver-cores 2

and it failed with

Exception in thread "dispatcher-event-loop-3" java.lang.OutOfMemoryError: Java heap space.

What I am suspecting is parttioning pushing huge data on on one or more executors, and it failes....I saw in spark job environment and

spark.yarn.driver.memoryOverhead = 384
spark.yarn.executor.memoryOverhead = 384

whihc is very low.......i refered documentation and its says
spark.yarn.executor.memoryOverheadexecutorMemory * 0.10, with minimum of 384
How we can set it to 1G or more

avatar
Explorer

why i asked this Question becuase I am runnign my job in client mode and I am not sure if below setting with client mode

ContextService.getHiveContext.sql("SET spark.yarn.executor.memoryOverhead = 3000 ");
ContextService.getHiveContext.sql("SET spark.yarn.am.memoryOverhead = 3000");

spark.yarn.executor.memoryOverhead works in cluster mode...

spark.yarm.am.memoryOverhead is Same as spark.yarn.driver.memoryOverhead, but for the YARN Application Master in client mode.

avatar
Explorer

why i asked this Question becuase I am runnign my job in client mode and I am not sure if below setting with client mode

ContextService.getHiveContext.sql("SET spark.yarn.executor.memoryOverhead = 3000 ");
ContextService.getHiveContext.sql("SET spark.yarn.am.memoryOverhead = 3000");

spark.yarn.executor.memoryOverhead works in cluster mode...

spark.yarm.am.memoryOverhead is Same as spark.yarn.driver.memoryOverhead, but for the YARN Application Master in client mode.