Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Yarn / Spark Strange behaviour between 2 clusters

avatar
New Contributor

Hello,

We are experiencing a strange behaviour on one of our cluster : Our biggest cluster is running much ore slowly than the other one. I'am talking of a 4 times factor while nodes are 4 times bigger.

Both clusters have 3 machines running the same linux distribution & configuration. Both clusters are running the same Ambari / HDP versions (ambari 2.2.2.0 / hdp 2.4.2) with same services distribution. I am not trying to compare the time spend to resolve the submitted jobs but want to understand if everything is going as expected.

First cluster hardware (3 nodes) :

  • 16 logical cores
  • 128Gb RAM
  • 2To HDD

Second cluster hardware (3 nodes) :

  • 64 logical cores
  • 256Gb RAM
  • 2To HDD

I know that spark-pi is not the best way to monitor / benchmark but it reflects the behaviour we observed. On our first (the smaller) cluster and while running 10.000.000 iterations, the 3 nodes are running a very high CPU load.

On the second cluster and while running the same job only one node is running a high CPU load and the time to get results is approximatively 3 times longer.

What is even more strange is that containers are correctly balanced between nodes and achieve the same amount of tasks. The network does not seem to be the bottleneck as few hundreds of kilobytes are being transferred over a 10Gb/s network able to support 8Gb/s transferts with a 0.3 ms of latency.

PS: I ran the "word count" benchmark on both servers and experience the same behaviour as observed on the spark-pi.

Do you think this behaviour is correct ?

3 REPLIES 3

avatar

This seems to be moreover a job tunning problem. Kindly share following parameters used while submitting the jobs in both the clusters --num-executors ? --executor-cores ? --executor-memory ? --driver-memory ?

avatar
New Contributor

Hi @Aditya Deshpande and thanks to take a look on our problem 😉

We tried many configurations with the following ranges:

  • --num-executors [3-99]
  • --executor-cores [1-8]
  • --executor-memory [1g-32g]
  • --driver-memory [1g-32g]

However, today while trying to look deeper, we have the following output error on the containers logs (not the application logs but the containers) :

WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(26,[Lscala.Tuple2;@1b43683e,BlockManagerId(26, node02-bench161.tellmeplus.lan, 33144))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:449)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:470)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:470)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more

WARN TransportResponseHandler: Ignoring response for RPC 8585110775140253074 from /172.16.34.200:33796 (81 bytes) since it is not outstanding

Increasing the timeouts just postpone the problem.

Any tips ?

Best regards

avatar

I see the problem.

Second cluster hardware (3 nodes) :

  • 64 logical cores
  • 256Gb RAM
  • 2To HDD

As per my maths, total cores = 64*3 = 192, total RAM = 256*3 = 768GB.

The NodeManager capacities, yarn.nodemanager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should probably be set to 252* 1024 = 158048 (megabytes) and 60 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case leave 4 gigabyte and a 4 cores for these system processes. Check in yarn-site.xml, it could already be set, if not, set yourself.

Now, a better option would be to use --num-executors 34 --executor-cores 5 --executor-memory 19G --driver-memory 32G. Why?

  • This config results in three executors on all nodes except for the one with the AM, which will have two executors.
    • --executor-memory was derived as (252/12 executors per node) = 21. 21 * 0.07 = 1.47. 21 – 1.47 ~ 19.