Created 07-17-2016 02:07 PM
Hi,
I am working on HDP 2.4.2 ( hadoop 2.7, hive 1.2.1 , JDK 1.8, scala 2.10.5 ) . My Spark/Scala job reads hive table ( using Spark-SQL) into DataFrames ,performs few Left joins and insert the final results into a Hive Table which is partitioned. The source tables having apprx 50millions of records. Spark creates 74 stages for this job. It executes 72 stages successfully but hangs at 499th task of 73rd stage, and not able to execute the final stage no 74. I can see many message on console i:e "INFO: BlockManagerInfo : Removed broadcast in memory" .
...it doesn't show any error/exception...even after 1 hours it doesn't come out and only way is to Kill the job.
I have total 15 nodes with 40Gb RAM with 6 cores in each node. I am using spark-submit in yarn client mode . Scheduling is configured as FIFO and my job is consuming 79% of resources.
Can anybody advise on this. whats could be the issue?
Regards
Praveen Khare
Created 07-18-2016 04:57 AM
Could you share more details like command used to execute and input size?
Created 07-18-2016 05:37 AM
Thank Puneet for reply..here is my command & other information
spark-submit --master yarn-client --driver-memory 15g --num-executors 25 --total-executor-cores 60 --executor-memory 15g --driver-cores 2 --conf "spark.executor.memory=-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms10g -Xmx10g -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThread=20" --class logicdriver logic.jar
configuration .....
ContextService.getHiveContext.sql("SET hive.execution.engine=tez"); ContextService.getHiveContext.sql("SET hive.optimize.tez=true"); ContextService.getHiveContext.sql("set hive.vectorized.execution.enabled = true "); ContextService.getHiveContext.sql("set hive.vectorized.execution.reduce.enabled = true "); ContextService.getHiveContext.sql("set spark.sql.shuffle.partitions=2050"); ContextService.getHiveContext.sql("SET spark.sql.hive.metastore.version=0.14.0.2.2.4.10-1"); ContextService.getHiveContext.sql("SET hive.warehouse.data.skipTrash=true "); ContextService.getHiveContext.sql("SET hive.exec.dynamic.partition = true "); ContextService.getHiveContext.sql("SET hive.exec.dynamic.partition.mode=nonstrict "); ContextService.getHiveContext.sql("SET spark.driver.maxResultSize= 8192"); ContextService.getHiveContext.sql("SET spark.default.parallelism = 350"); ContextService.getHiveContext.sql("SET spark.yarn.executor.memoryOverhead=1024");
data---------
It reads data from from 2 tables and perform join and put result in Dataframes...then again read new tables and does join on previous Dataframe...this cycle goes for 7-8 times and finally it insert result in hive.
first table has - 63245969 records
2nd table has - 49275922 records....all the tables have records in this range.
Created 07-18-2016 08:09 AM
Hi Praveen,
Here are a few points to help:
1. Try running your API without options like "--driver-memory 15g --num-executors 25 --total-executor-cores 60 --executor-memory 15g --driver-cores 2" and check logs for memory allocated to RDDs/DataFrames.
2. Driver doesn't need 15g memory if you are not collecting data on driver. Try setting it to 4g rather. I hope u r not using .collect() or similar operations which collect all data to driver.
3. The error needs fine tuning your configurations between executor memory and driver memory. The total number of executors(25) are pretty much higher considering the memory allocated(15g). Reduce number of executors and consider allocating less memory(4g to start with).
Thanks,
Puneet
Created 07-18-2016 09:03 AM
Okay...I will try these optiona and update. thank you
Created 07-18-2016 01:07 PM
Before your suggestion, I had started a run with same configuration...I got below issues in my logs
16/07/18 09:24:52 INFO RetryInvocationHandler: Exception while invoking renewLease of class ClientNamenodeProtocolTranslatorPB over . Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details :
:
:
Already tried 8 time(s); retry policy is RetryPolicy[MultipleLinearRandomRetry[500x2000ms], TryOnceThenFail]
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standbyIt seems some issues in HDFS/NN or cluster itself.....
Created 07-18-2016 01:11 PM
You can refer https://community.hortonworks.com/questions/9790/orgapachehadoopipcstandbyexception.html for this issue.
Created 07-19-2016 09:48 AM
Hi Puneet --as per suggestion I tried with
--driver-memory 4g --num-executors 15 --total-executor-cores 30 --executor-memory 10g --driver-cores 2
and it failed withException in thread "dispatcher-event-loop-3" java.lang.OutOfMemoryError: Java heap space.
What I am suspecting is parttioning pushing huge data on on one or more executors, and it failes....I saw in spark job environment and
spark.yarn.driver.memoryOverhead = 384 |
spark.yarn.executor.memoryOverhead = 384 |
spark.yarn.executor.memoryOverhead | executorMemory * 0.10, with minimum of 384 |
Created 07-19-2016 10:00 AM
why i asked this Question becuase I am runnign my job in client mode and I am not sure if below setting with client mode
ContextService.getHiveContext.sql("SET spark.yarn.executor.memoryOverhead = 3000 "); ContextService.getHiveContext.sql("SET spark.yarn.am.memoryOverhead = 3000");
spark.yarn.executor.memoryOverhead works in cluster mode...
spark.yarm.am.memoryOverhead is Same as spark.yarn.driver.memoryOverhead
, but for the YARN Application Master in client mode.
Created 07-19-2016 10:00 AM
why i asked this Question becuase I am runnign my job in client mode and I am not sure if below setting with client mode
ContextService.getHiveContext.sql("SET spark.yarn.executor.memoryOverhead = 3000 "); ContextService.getHiveContext.sql("SET spark.yarn.am.memoryOverhead = 3000");
spark.yarn.executor.memoryOverhead works in cluster mode...
spark.yarm.am.memoryOverhead is Same as spark.yarn.driver.memoryOverhead
, but for the YARN Application Master in client mode.