Created 06-26-2018 06:13 AM
I am running a spark job on yarn. The job runs properly on the Amazon EMR. (1 Master and 2 slaves with m4.xlarge)
I have set up similar infra using HDP 2.6 distribution on AWS ec2 machines. While running a spark job it gets stuck in between
and the following error is thrown by the container.
18/06/25 07:15:31 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.210.150.150:44343) 18/06/25 07:15:31 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 9, fetching them 18/06/25 07:15:31 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 9, fetching them 18/06/25 07:15:31 INFO spark.MapOutputTrackerWorker: Got the output locations 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Getting 5 non-empty blocks out of 1000 blocks 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Started 1 remote fetches in 0 ms 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Getting 5 non-empty blocks out of 1000 blocks 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Getting 5 non-empty blocks out of 1000 blocks 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Started 1 remote fetches in 0 ms 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Getting 5 non-empty blocks out of 1000 blocks 18/06/25 07:15:31 INFO storage.ShuffleBlockFetcherIterator: Started 1 remote fetches in 1 ms 18/06/25 07:15:31 INFO codegen.CodeGenerator: Code generated in 4.822611 ms 18/06/25 07:15:31 INFO codegen.CodeGenerator: Code generated in 8.430244 ms 18/06/25 07:17:31 ERROR server.TransportChannelHandler: Connection to ip-10-210-150-180.********/10.210.150.180:7447 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 18/06/25 07:17:31 ERROR client.TransportResponseHandler: Still have 307 requests outstanding when connection from ip-10-210-150-180.********/10.210.150.180:7447 is closed 18/06/25 07:17:31 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 197 outstanding blocks after 5000 ms 18/06/25 07:17:31 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection from ip-10-210-150-180.********/10.210.150.180:7447 closed at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146) at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:108) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893) at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:748) 18/06/25 07:17:31 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 166 outstanding blocks after 5000 ms 18/06/25 07:17:31 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection from ip-10-210-150-180.********/10.210.150.180:7447 closed at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146) at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:108) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893) at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:748)
As per the error, the connection b/n shuffle service and the container is idle for more than 120s. This happens mainly during shuffle and I have tried increasing the timeout to larger value but with no luck.
I am currently running spark on yarn cluster with the following configurations.
spark-defaults.conf on the master machine.
spark.eventLog.dir=hdfs:///user/spark/applicationHistory spark.eventLog.enabled=true spark.yarn.historyServer.address=ppv-qa12-tenant8-spark-cluster-master.periscope-solutions.local:18080 spark.shuffle.service.enabled=true spark.dynamicAllocation.enabled=true spark.driver.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64 spark.executor.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64 spark.driver.maxResultSize=0 spark.driver.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' spark.executor.memory=5g spark.driver.memory=1g spark.executor.cores=4
yarn-site.xml of slave machines
<configuration> <property> <name>yarn.application.classpath</name> <value>/usr/hdp/current/spark2-client/aux/*,/etc/hadoop/conf,/usr/hdp/current/hadoop-client/*,/usr/hdp/current/hadoop-client/lib/*,/usr/hdp/current/hadoop-hdfs-client/*,/usr/hdp/current/hadoop-hdfs-client/lib/*,/usr/hdp/current/hadoop-yarn-client/*,/usr/hdp/current/hadoop-yarn-client/lib/*</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>spark2_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> <property> <name>yarn.nodemanager.aux-services.spark2_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property> <name>yarn.nodemanager.container-manager.thread-count</name> <value>64</value> </property> <property> <name>yarn.nodemanager.localizer.client.thread-count</name> <value>20</value> </property> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>5</value> </property> <property> <name>yarn.resourcemanager.hostname</name> <value>************</value> </property> <property> <name>yarn.resourcemanager.resource-tracker.client.thread-count</name> <value>64</value> </property> <property> <name>yarn.resourcemanager.scheduler.client.thread-count</name> <value>64</value> </property> <property> <name>yarn.scheduler.increment-allocation-mb</name> <value>32</value> </property> <property> <name>yarn.scheduler.increment-allocation-vcores</name> <value>1</value> </property> <property> <name>yarn.scheduler.maximum-allocation-vcores</name> <value>128</value> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>32</value> </property> <property> <name>yarn.timeline-service.enabled</name> <value>true</value> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>8</value> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>11520</value> </property> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>11520</value> </property> <property> <name>yarn.nodemanager.hostname</name> <value>*************</value> </property> </configuration>
Created 07-05-2018 06:28 PM
Short Answer:
Turn off scatter gather
Long Version:
The data transfer b/n container and shuffle service happens through RPC Calls(ChunkFetchRequest, ChunkFetchSuccess and ChunkFetchFailure)
On further debugging with trace level logs, we found that RPC calls were indeed happening b/n the container and the shuffle service and after some time the RPC call's were abruptly suppressed(meaning no more RPC calls were logged) from both shuffle service and container.
On looking into kernel and system activity logs we found the following
xen_netfront: xennet: skb rides the rocket: 19 slots
That means that our ec2 machines were having network packet loss.
More info on this log can be found in the following thread
http://www.brendangregg.com/blog/2014-09-11/perf-kernel-line-tracing.html
So we tried turning off the scatter-gather using the following command.
sudo ethtool -K eth0 sg off
The error was gone after that.
Created 07-05-2018 06:28 PM
Short Answer:
Turn off scatter gather
Long Version:
The data transfer b/n container and shuffle service happens through RPC Calls(ChunkFetchRequest, ChunkFetchSuccess and ChunkFetchFailure)
On further debugging with trace level logs, we found that RPC calls were indeed happening b/n the container and the shuffle service and after some time the RPC call's were abruptly suppressed(meaning no more RPC calls were logged) from both shuffle service and container.
On looking into kernel and system activity logs we found the following
xen_netfront: xennet: skb rides the rocket: 19 slots
That means that our ec2 machines were having network packet loss.
More info on this log can be found in the following thread
http://www.brendangregg.com/blog/2014-09-11/perf-kernel-line-tracing.html
So we tried turning off the scatter-gather using the following command.
sudo ethtool -K eth0 sg off
The error was gone after that.