Reply
Highlighted
New Contributor
Posts: 1
Registered: ‎09-18-2017

Hive on Spark using BigBench benchmark throwing java.util.concurrent.ExecutionException

[ Edited ]

I have a 4 node cloudera cluster, 1 master node and 3 data nodes:

Version: Cloudera Express 5.11.0 (#101 built by jenkins on 20170412-1255 git: 70cb1442626406432a6e7af5bdf206a384ca3f98)

Java VM Name: Java HotSpot(TM) 64-Bit Server VM

Java VM Vendor: Oracle Corporation

Java Version: 1.8.0_131

Data Nodes have 256GB memory each, 22 cores, 2 sockets, hyperthreading enabled.

 

I am running BigBench benchmark using hive on spark, with 3 user streams. When I run the throughput test which executes the 3 user streams in parallel (each query in the userstream will run sequentially), some of the queries fail with the below mentioned errors. Some of them do succeed. I tried increasing executor memory to 30GB, tried disabling dynamic executor flag, tried number of executors 4 and 8. Even when I ran 3 user streams with 4 queries in total, they still fail. It seems like there is some resource contention that I am unable to identify.

 

Appreciate any pointers about where should I look.

 

In the benchmark log here is what I see:

 

Failed to monitor Job[ 0] with exception 'java.lang.IllegalStateException(RPC channel is closed.)'
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.spark.SparkTask

 

In the Spark application on the Yarn site, log file for attempt1 shows this:

 

 

Log Type: stderr
Log Upload Time: Sun Sep 17 20:55:59 -0700 2017
Log Length: 377320
Showing 4096 bytes of 377320 total. Click here for the full log.
g task 163.0 in stage 0.0 (TID 225, datanode3.myhost.com, executor 1, partition 163, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:28 INFO scheduler.TaskSetManager: Finished task 151.0 in stage 0.0 (TID 207) in 39287 ms on datanode3.myhost.com (executor 1) (210/695)
17/09/17 20:55:28 INFO scheduler.TaskSetManager: Starting task 515.0 in stage 0.0 (TID 226, datanode2.myhost.com, executor 3, partition 515, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:28 INFO scheduler.TaskSetManager: Finished task 491.0 in stage 0.0 (TID 214) in 31926 ms on datanode2.myhost.com (executor 3) (211/695)
17/09/17 20:55:28 INFO scheduler.TaskSetManager: Starting task 518.0 in stage 0.0 (TID 227, datanode2.myhost.com, executor 4, partition 518, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:28 INFO scheduler.TaskSetManager: Finished task 473.0 in stage 0.0 (TID 208) in 37462 ms on datanode2.myhost.com (executor 4) (212/695)
17/09/17 20:55:28 INFO scheduler.TaskSetManager: Starting task 521.0 in stage 0.0 (TID 228, datanode2.myhost.com, executor 2, partition 521, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:28 INFO scheduler.TaskSetManager: Finished task 485.0 in stage 0.0 (TID 212) in 33996 ms on datanode2.myhost.com (executor 2) (213/695)
17/09/17 20:55:29 INFO scheduler.TaskSetManager: Starting task 524.0 in stage 0.0 (TID 229, datanode2.myhost.com, executor 3, partition 524, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:29 INFO scheduler.TaskSetManager: Finished task 482.0 in stage 0.0 (TID 211) in 34862 ms on datanode2.myhost.com (executor 3) (214/695)
17/09/17 20:55:31 INFO scheduler.TaskSetManager: Starting task 527.0 in stage 0.0 (TID 230, datanode2.myhost.com, executor 4, partition 527, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:31 INFO scheduler.TaskSetManager: Finished task 488.0 in stage 0.0 (TID 213) in 36576 ms on datanode2.myhost.com (executor 4) (215/695)
17/09/17 20:55:34 INFO scheduler.TaskSetManager: Starting task 530.0 in stage 0.0 (TID 231, datanode2.myhost.com, executor 2, partition 530, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:34 INFO scheduler.TaskSetManager: Finished task 494.0 in stage 0.0 (TID 216) in 33409 ms on datanode2.myhost.com (executor 2) (216/695)
17/09/17 20:55:35 INFO scheduler.TaskSetManager: Starting task 166.0 in stage 0.0 (TID 232, datanode3.myhost.com, executor 1, partition 166, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:35 INFO scheduler.TaskSetManager: Finished task 154.0 in stage 0.0 (TID 215) in 37992 ms on datanode3.myhost.com (executor 1) (217/695)
17/09/17 20:55:38 INFO scheduler.TaskSetManager: Starting task 533.0 in stage 0.0 (TID 233, datanode2.myhost.com, executor 4, partition 533, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:38 INFO scheduler.TaskSetManager: Finished task 500.0 in stage 0.0 (TID 218) in 34060 ms on datanode2.myhost.com (executor 4) (218/695)
17/09/17 20:55:39 INFO scheduler.TaskSetManager: Starting task 536.0 in stage 0.0 (TID 234, datanode2.myhost.com, executor 2, partition 536, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:39 INFO scheduler.TaskSetManager: Finished task 503.0 in stage 0.0 (TID 219) in 32503 ms on datanode2.myhost.com (executor 2) (219/695)
17/09/17 20:55:39 INFO scheduler.TaskSetManager: Starting task 539.0 in stage 0.0 (TID 235, datanode2.myhost.com, executor 2, partition 539, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:39 INFO scheduler.TaskSetManager: Finished task 497.0 in stage 0.0 (TID 217) in 36073 ms on datanode2.myhost.com (executor 2) (220/695)
17/09/17 20:55:54 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 16
17/09/17 20:55:54 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
17/09/17 20:55:54 INFO spark.SparkContext: Invoking stop() from shutdown hook
17/09/17 20:55:55 INFO scheduler.TaskSetManager: Starting task 169.0 in stage 0.0 (TID 236, datanode3.myhost.com, executor 1, partition 169, NODE_LOCAL, 4880 bytes)
17/09/17 20:55:55 INFO scheduler.TaskSetManager: Finished task 160.0 in stage 0.0 (TID 221) in 36667 ms on datanode3.myhost.com (executor 1) (221/695)

Log Type: stdout
Log Upload Time: Sun Sep 17 20:55:59 -0700 2017
Log Length: 0

 

 

In the Spark application on the Yarn site, log file for attempt2 shows this:

 

Log Type: stderr
Log Upload Time: Sun Sep 17 20:55:59 -0700 2017
Log Length: 5150
Showing 4096 bytes of 5150 total. Click here for the full log.
122:45081
java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081
	at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:37)
	at org.apache.hive.spark.client.RemoteDriver.<init>(RemoteDriver.java:156)
	at org.apache.hive.spark.client.RemoteDriver.main(RemoteDriver.java:556)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:552)
Caused by: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at java.lang.Thread.run(Thread.java:748)
17/09/17 20:55:57 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081)
17/09/17 20:55:57 ERROR yarn.ApplicationMaster: Uncaught exception: 
java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081
	at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:37)
	at org.apache.hive.spark.client.RemoteDriver.<init>(RemoteDriver.java:156)
	at org.apache.hive.spark.client.RemoteDriver.main(RemoteDriver.java:556)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:552)
Caused by: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at java.lang.Thread.run(Thread.java:748)
17/09/17 20:55:57 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused: masternode.myhost.com/masternode.ip.address:45081)
17/09/17 20:55:57 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1505703713655_0006
17/09/17 20:55:57 INFO util.ShutdownHookManager: Shutdown hook called

Log Type: stdout
Log Upload Time: Sun Sep 17 20:55:59 -0700 2017
Log Length: 0

 

 

Contributor
Posts: 44
Registered: ‎01-05-2016

Re: Hive on Spark using BigBench benchmark throwing java.util.concurrent.ExecutionException

Have you tried a configuration similar to:

 

- 3 executors (1 per datanode)

- Setting a very low initial memory setting for the executors  (e.g. 1 GBytes)

- Limiting the VCPU to 1 for the executors (at least initially)

- Maybe you can try with 2 VCPUs and 2 GBytes for the driver initially

- It's important to allow a conspicuous overhead to the amount of RAM usable by the executors if they run out of initial resources (that 1 GBytes we specified before), but just when/if they need it. So set the following parameters accordingly:

 

spark.yarn.executor.memoryOverhead = 8 GBytes

spark.yarn.driver.memoryOverhead = 8 GBytes

 

- You can read the following docs to get a better grasp on the concepts behind resources allocation:

 

https://www.cloudera.com/documentation/enterprise/5-11-x/topics/cdh_ig_yarn_tuning.html

 

https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_running_spark_on_yarn.html

 

The first Doc also includes a link to a "Yarn Tuning Spreadsheet". The Docs contain details on how to configure all the features I've been talking about just above :)

 

HTH

Announcements