Created on 09-18-2017 10:39 AM - edited 09-18-2017 11:57 AM
I have a 4 node cloudera cluster, 1 master node and 3 data nodes:
Version: Cloudera Express 5.11.0 (#101 built by jenkins on 20170412-1255 git: 70cb1442626406432a6e7af5bdf206a384ca3f98)
Java VM Name: Java HotSpot(TM) 64-Bit Server VM
Java VM Vendor: Oracle Corporation
Java Version: 1.8.0_131
Data Nodes have 256GB memory each, 22 cores, 2 sockets, hyperthreading enabled.
I am running BigBench benchmark using hive on spark, with 3 user streams. When I run the throughput test which executes the 3 user streams in parallel (each query in the userstream will run sequentially), some of the queries fail with the below mentioned errors. Some of them do succeed. I tried increasing executor memory to 30GB, tried disabling dynamic executor flag, tried number of executors 4 and 8. Even when I ran 3 user streams with 4 queries in total, they still fail. It seems like there is some resource contention that I am unable to identify.
Appreciate any pointers about where should I look.
In the benchmark log here is what I see:
Failed to monitor Job[ 0] with exception 'java.lang.IllegalStateException(RPC channel is closed.)' FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.spark.SparkTask
In the Spark application on the Yarn site, log file for attempt1 shows this:
Log Type: stderr Log Upload Time: Sun Sep 17 20:55:59 -0700 2017 Log Length: 377320 Showing 4096 bytes of 377320 total. Click here for the full log. g task 163.0 in stage 0.0 (TID 225, datanode3.myhost.com, executor 1, partition 163, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:28 INFO scheduler.TaskSetManager: Finished task 151.0 in stage 0.0 (TID 207) in 39287 ms on datanode3.myhost.com (executor 1) (210/695) 17/09/17 20:55:28 INFO scheduler.TaskSetManager: Starting task 515.0 in stage 0.0 (TID 226, datanode2.myhost.com, executor 3, partition 515, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:28 INFO scheduler.TaskSetManager: Finished task 491.0 in stage 0.0 (TID 214) in 31926 ms on datanode2.myhost.com (executor 3) (211/695) 17/09/17 20:55:28 INFO scheduler.TaskSetManager: Starting task 518.0 in stage 0.0 (TID 227, datanode2.myhost.com, executor 4, partition 518, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:28 INFO scheduler.TaskSetManager: Finished task 473.0 in stage 0.0 (TID 208) in 37462 ms on datanode2.myhost.com (executor 4) (212/695) 17/09/17 20:55:28 INFO scheduler.TaskSetManager: Starting task 521.0 in stage 0.0 (TID 228, datanode2.myhost.com, executor 2, partition 521, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:28 INFO scheduler.TaskSetManager: Finished task 485.0 in stage 0.0 (TID 212) in 33996 ms on datanode2.myhost.com (executor 2) (213/695) 17/09/17 20:55:29 INFO scheduler.TaskSetManager: Starting task 524.0 in stage 0.0 (TID 229, datanode2.myhost.com, executor 3, partition 524, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:29 INFO scheduler.TaskSetManager: Finished task 482.0 in stage 0.0 (TID 211) in 34862 ms on datanode2.myhost.com (executor 3) (214/695) 17/09/17 20:55:31 INFO scheduler.TaskSetManager: Starting task 527.0 in stage 0.0 (TID 230, datanode2.myhost.com, executor 4, partition 527, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:31 INFO scheduler.TaskSetManager: Finished task 488.0 in stage 0.0 (TID 213) in 36576 ms on datanode2.myhost.com (executor 4) (215/695) 17/09/17 20:55:34 INFO scheduler.TaskSetManager: Starting task 530.0 in stage 0.0 (TID 231, datanode2.myhost.com, executor 2, partition 530, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:34 INFO scheduler.TaskSetManager: Finished task 494.0 in stage 0.0 (TID 216) in 33409 ms on datanode2.myhost.com (executor 2) (216/695) 17/09/17 20:55:35 INFO scheduler.TaskSetManager: Starting task 166.0 in stage 0.0 (TID 232, datanode3.myhost.com, executor 1, partition 166, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:35 INFO scheduler.TaskSetManager: Finished task 154.0 in stage 0.0 (TID 215) in 37992 ms on datanode3.myhost.com (executor 1) (217/695) 17/09/17 20:55:38 INFO scheduler.TaskSetManager: Starting task 533.0 in stage 0.0 (TID 233, datanode2.myhost.com, executor 4, partition 533, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:38 INFO scheduler.TaskSetManager: Finished task 500.0 in stage 0.0 (TID 218) in 34060 ms on datanode2.myhost.com (executor 4) (218/695) 17/09/17 20:55:39 INFO scheduler.TaskSetManager: Starting task 536.0 in stage 0.0 (TID 234, datanode2.myhost.com, executor 2, partition 536, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:39 INFO scheduler.TaskSetManager: Finished task 503.0 in stage 0.0 (TID 219) in 32503 ms on datanode2.myhost.com (executor 2) (219/695) 17/09/17 20:55:39 INFO scheduler.TaskSetManager: Starting task 539.0 in stage 0.0 (TID 235, datanode2.myhost.com, executor 2, partition 539, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:39 INFO scheduler.TaskSetManager: Finished task 497.0 in stage 0.0 (TID 217) in 36073 ms on datanode2.myhost.com (executor 2) (220/695) 17/09/17 20:55:54 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 16 17/09/17 20:55:54 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 17/09/17 20:55:54 INFO spark.SparkContext: Invoking stop() from shutdown hook 17/09/17 20:55:55 INFO scheduler.TaskSetManager: Starting task 169.0 in stage 0.0 (TID 236, datanode3.myhost.com, executor 1, partition 169, NODE_LOCAL, 4880 bytes) 17/09/17 20:55:55 INFO scheduler.TaskSetManager: Finished task 160.0 in stage 0.0 (TID 221) in 36667 ms on datanode3.myhost.com (executor 1) (221/695) Log Type: stdout Log Upload Time: Sun Sep 17 20:55:59 -0700 2017 Log Length: 0
In the Spark application on the Yarn site, log file for attempt2 shows this:
Log Type: stderr Log Upload Time: Sun Sep 17 20:55:59 -0700 2017 Log Length: 5150 Showing 4096 bytes of 5150 total. Click here for the full log. 122:45081 java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081 at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:37) at org.apache.hive.spark.client.RemoteDriver.<init>(RemoteDriver.java:156) at org.apache.hive.spark.client.RemoteDriver.main(RemoteDriver.java:556) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:552) Caused by: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:748) 17/09/17 20:55:57 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081) 17/09/17 20:55:57 ERROR yarn.ApplicationMaster: Uncaught exception: java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081 at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:37) at org.apache.hive.spark.client.RemoteDriver.<init>(RemoteDriver.java:156) at org.apache.hive.spark.client.RemoteDriver.main(RemoteDriver.java:556) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:552) Caused by: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:748) 17/09/17 20:55:57 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081) 17/09/17 20:55:57 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1505703713655_0006 17/09/17 20:55:57 INFO util.ShutdownHookManager: Shutdown hook called Log Type: stdout Log Upload Time: Sun Sep 17 20:55:59 -0700 2017 Log Length: 0
Created 09-26-2017 09:14 AM
Have you tried a configuration similar to:
- 3 executors (1 per datanode)
- Setting a very low initial memory setting for the executors (e.g. 1 GBytes)
- Limiting the VCPU to 1 for the executors (at least initially)
- Maybe you can try with 2 VCPUs and 2 GBytes for the driver initially
- It's important to allow a conspicuous overhead to the amount of RAM usable by the executors if they run out of initial resources (that 1 GBytes we specified before), but just when/if they need it. So set the following parameters accordingly:
spark.yarn.executor.memoryOverhead = 8 GBytes
spark.yarn.driver.memoryOverhead = 8 GBytes
- You can read the following docs to get a better grasp on the concepts behind resources allocation:
https://www.cloudera.com/documentation/enterprise/5-11-x/topics/cdh_ig_yarn_tuning.html
https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_running_spark_on_yarn.html
The first Doc also includes a link to a "Yarn Tuning Spreadsheet". The Docs contain details on how to configure all the features I've been talking about just above :)
HTH