Support Questions

Find answers, ask questions, and share your expertise

Who agreed with this topic

reason: org.apache.spark.shuffle.FetchFailedException: Unable to create Channel from NioSocketChanne

avatar
Explorer

dear:

i am run spark streaming application in yarn-cluster and run 17.5 hour application killed and throw Exception.

 

User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4896 (foreachRDD at SparkStreamingApp.scala:73) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel
  

my config:

spark-submit

--master yarn-cluster

--executor-cores 6

--num-executors 5

--driver-memory 10G

--conf "spark.sql.shuffle.partitions=50"

--conf "spark.storage.memoryFraction=0.2"

--conf "spark.shuffle.consolidateFiles=true"

--conf "spark.ui.showConsoleProgress=false"

--conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=1024M -XX:+UseConcMarkSweepGC"

--conf "spark.streaming.backpressure.enabled=true"

--conf "spark.sql.tungsten.enabled=false"

--conf "spark.locality.wait=1s"

--conf "spark.streaming.blockInterval=1000ms"

 

and

spark.defaults.conf:

spark.network.timeout=300
spark.akka.timeout=300
spark.storage.blockManagerSlaveTimeoutMs=300
spark.shuffle.io.connectionTimeout=300
spark.rpc.askTimeout=300
spark.rpc.lookupTimeout=300
spark.executor.memory=12g
spark.driver.memory=4g
spark.yarn.driver.memoryOverhead=102
spark.yarn.executor.memoryOverhead=409
spark.dynamicAllocation.initialExecutors=1
spark.dynamicAllocation.minExecutors=1
spark.rdd.compress=true
spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
spark.executor.extraJavaOptions="-verbose:gc -XX: MaxPermSize=256MB -XX: +UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX: +PrintGCDetails -XX: +PrintGCTimeStamps -Xloggc:/var/log/spark/spark_gc_1.log"
spark.shuffle.sort.bypassMergeThreshold=50
spark.scheduler.executorTaskBlacklistTime=30000
#spark.cleaner.ttl=43200
spark.shuffle.manager=sort
spark.streaming.receiver.writeAheadLog.enable=true

 

my code SparkStreamingApp.scala:73 in 

 val logDataFrame = rdd.map(w => DCLog(w._1, w._2, w._3)).toDF()

 

val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
.map(line => parseData(line)).filter(line => line._4)
.flatMap(data => {
var outData: Seq[(String, String, String)] = Seq[(String, String, String)]()
val (uid, realid, docType, isValid) = data
if (isValid) {
val rids = realid.split(",")
outData = for (i <- 0 until rids.length if rids(i).trim.length > 0 && uid.trim.length > 0) yield (uid, rids(i), docType)
}
outData
}).repartition(rddRepartitionNum)

kafkaStream.foreachRDD((rdd: RDD[(String, String, String)], time: Time) => {
val hiveContext = HiveContextFactory.getInstance(rdd.sparkContext)
import hiveContext.implicits._
val currTimeIdx = Util.getCurrTimeIndex()
println("currTimeIdx=" + currTimeIdx)
val logDataFrame = rdd.map(w => DCLog(w._1, w._2, w._3)).toDF()
val windowTableName = "inveno_zhizi_windown"
logDataFrame.registerTempTable(windowTableName)

 

 

Diagnostics:

User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4896 (foreachRDD at SparkStreamingApp.scala:73) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:306)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:217)
at org.apache.spark.sql.execution.ExternalSort$$anonfun$doExecute$2$$anonfun$apply$4.apply(sort.scala:78)
at org.apache.spark.sql.execution.ExternalSort$$anonfun$doExecute$2$$anonfun$apply$4.apply(sort.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel
at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455)
at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306)
at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134)
at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:188)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:104)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:113)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:112)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:71)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
... 25 more
Caused by: io.netty.channel.ChannelException: Failed to open a socket.
at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62)
at io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:72)
at sun.reflect.GeneratedConstructorAccessor99.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:383)
at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:453)
... 41 more
Caused by: java.net.SocketException: Too many open files
at sun.nio.ch.Net.socket0(Native Method)
at sun.nio.ch.Net.socket(Net.java:441)
at sun.nio.ch.Net.socket(Net.java:434)
at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105)
at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:60)
... 47 more

 

 

Thanks for your help!

Who agreed with this topic