Created on 03-24-2016 08:57 PM - edited 09-16-2022 03:10 AM
dear:
i am run spark streaming application in yarn-cluster and run 17.5 hour application killed and throw Exception.
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4896 (foreachRDD at SparkStreamingApp.scala:73) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel
my config:
spark-submit
--master yarn-cluster
--executor-cores 6
--num-executors 5
--driver-memory 10G
--conf "spark.sql.shuffle.partitions=50"
--conf "spark.storage.memoryFraction=0.2"
--conf "spark.shuffle.consolidateFiles=true"
--conf "spark.ui.showConsoleProgress=false"
--conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=1024M -XX:+UseConcMarkSweepGC"
--conf "spark.streaming.backpressure.enabled=true"
--conf "spark.sql.tungsten.enabled=false"
--conf "spark.locality.wait=1s"
--conf "spark.streaming.blockInterval=1000ms"
and
spark.defaults.conf:
spark.network.timeout=300
spark.akka.timeout=300
spark.storage.blockManagerSlaveTimeoutMs=300
spark.shuffle.io.connectionTimeout=300
spark.rpc.askTimeout=300
spark.rpc.lookupTimeout=300
spark.executor.memory=12g
spark.driver.memory=4g
spark.yarn.driver.memoryOverhead=102
spark.yarn.executor.memoryOverhead=409
spark.dynamicAllocation.initialExecutors=1
spark.dynamicAllocation.minExecutors=1
spark.rdd.compress=true
spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
spark.executor.extraJavaOptions="-verbose:gc -XX: MaxPermSize=256MB -XX: +UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX: +PrintGCDetails -XX: +PrintGCTimeStamps -Xloggc:/var/log/spark/spark_gc_1.log"
spark.shuffle.sort.bypassMergeThreshold=50
spark.scheduler.executorTaskBlacklistTime=30000
#spark.cleaner.ttl=43200
spark.shuffle.manager=sort
spark.streaming.receiver.writeAheadLog.enable=true
my code SparkStreamingApp.scala:73 in
val logDataFrame = rdd.map(w => DCLog(w._1, w._2, w._3)).toDF()
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
.map(line => parseData(line)).filter(line => line._4)
.flatMap(data => {
var outData: Seq[(String, String, String)] = Seq[(String, String, String)]()
val (uid, realid, docType, isValid) = data
if (isValid) {
val rids = realid.split(",")
outData = for (i <- 0 until rids.length if rids(i).trim.length > 0 && uid.trim.length > 0) yield (uid, rids(i), docType)
}
outData
}).repartition(rddRepartitionNum)
kafkaStream.foreachRDD((rdd: RDD[(String, String, String)], time: Time) => {
val hiveContext = HiveContextFactory.getInstance(rdd.sparkContext)
import hiveContext.implicits._
val currTimeIdx = Util.getCurrTimeIndex()
println("currTimeIdx=" + currTimeIdx)
val logDataFrame = rdd.map(w => DCLog(w._1, w._2, w._3)).toDF()
val windowTableName = "inveno_zhizi_windown"
logDataFrame.registerTempTable(windowTableName)
Diagnostics:
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4896 (foreachRDD at SparkStreamingApp.scala:73) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:306) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:217) at org.apache.spark.sql.execution.ExternalSort$$anonfun$doExecute$2$$anonfun$apply$4.apply(sort.scala:78) at org.apache.spark.sql.execution.ExternalSort$$anonfun$doExecute$2$$anonfun$apply$4.apply(sort.scala:75) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455) at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306) at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134) at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:188) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:104) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:113) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152) at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265) at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:112) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43) at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:71) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) ... 25 more Caused by: io.netty.channel.ChannelException: Failed to open a socket. at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62) at io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:72) at sun.reflect.GeneratedConstructorAccessor99.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:383) at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:453) ... 41 more Caused by: java.net.SocketException: Too many open files at sun.nio.ch.Net.socket0(Native Method) at sun.nio.ch.Net.socket(Net.java:441) at sun.nio.ch.Net.socket(Net.java:434) at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105) at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60) at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:60) ... 47 more |
Thanks for your help!