Member since
12-08-2015
4
Posts
1
Kudos Received
0
Solutions
03-24-2016
08:57 PM
1 Kudo
dear: i am run spark streaming application in yarn-cluster and run 17.5 hour application killed and throw Exception. User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4896 (foreachRDD at SparkStreamingApp.scala:73) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel my config: spark-submit --master yarn-cluster --executor-cores 6 --num-executors 5 --driver-memory 10G --conf "spark.sql.shuffle.partitions=50" --conf "spark.storage.memoryFraction=0.2" --conf "spark.shuffle.consolidateFiles=true" --conf "spark.ui.showConsoleProgress=false" --conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=1024M -XX:+UseConcMarkSweepGC" --conf "spark.streaming.backpressure.enabled=true" --conf "spark.sql.tungsten.enabled=false" --conf "spark.locality.wait=1s" --conf "spark.streaming.blockInterval=1000ms" and spark.defaults.conf: spark.network.timeout=300 spark.akka.timeout=300 spark.storage.blockManagerSlaveTimeoutMs=300 spark.shuffle.io.connectionTimeout=300 spark.rpc.askTimeout=300 spark.rpc.lookupTimeout=300 spark.executor.memory=12g spark.driver.memory=4g spark.yarn.driver.memoryOverhead=102 spark.yarn.executor.memoryOverhead=409 spark.dynamicAllocation.initialExecutors=1 spark.dynamicAllocation.minExecutors=1 spark.rdd.compress=true spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec spark.executor.extraJavaOptions="-verbose:gc -XX: MaxPermSize=256MB -XX: +UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX: +PrintGCDetails -XX: +PrintGCTimeStamps -Xloggc:/var/log/spark/spark_gc_1.log" spark.shuffle.sort.bypassMergeThreshold=50 spark.scheduler.executorTaskBlacklistTime=30000 #spark.cleaner.ttl=43200 spark.shuffle.manager=sort spark.streaming.receiver.writeAheadLog.enable=true my code SparkStreamingApp.scala:73 in val logDataFrame = rdd.map(w => DCLog(w._1, w._2, w._3)).toDF() val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2) .map(line => parseData(line)).filter(line => line._4) .flatMap(data => { var outData: Seq[(String, String, String)] = Seq[(String, String, String)]() val (uid, realid, docType, isValid) = data if (isValid) { val rids = realid.split(",") outData = for (i <- 0 until rids.length if rids(i).trim.length > 0 && uid.trim.length > 0) yield (uid, rids(i), docType) } outData }).repartition(rddRepartitionNum) kafkaStream.foreachRDD((rdd: RDD[(String, String, String)], time: Time) => { val hiveContext = HiveContextFactory.getInstance(rdd.sparkContext) import hiveContext.implicits._ val currTimeIdx = Util.getCurrTimeIndex() println("currTimeIdx=" + currTimeIdx) val logDataFrame = rdd.map(w => DCLog(w._1, w._2, w._3)).toDF() val windowTableName = "inveno_zhizi_windown" logDataFrame.registerTempTable(windowTableName) Diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4896 (foreachRDD at SparkStreamingApp.scala:73) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:306) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:217) at org.apache.spark.sql.execution.ExternalSort$$anonfun$doExecute$2$$anonfun$apply$4.apply(sort.scala:78) at org.apache.spark.sql.execution.ExternalSort$$anonfun$doExecute$2$$anonfun$apply$4.apply(sort.scala:75) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455) at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306) at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134) at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:188) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:104) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:113) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152) at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265) at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:112) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43) at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:71) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) ... 25 more Caused by: io.netty.channel.ChannelException: Failed to open a socket. at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62) at io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:72) at sun.reflect.GeneratedConstructorAccessor99.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:383) at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:453) ... 41 more Caused by: java.net.SocketException: Too many open files at sun.nio.ch.Net.socket0(Native Method) at sun.nio.ch.Net.socket(Net.java:441) at sun.nio.ch.Net.socket(Net.java:434) at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105) at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60) at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:60) ... 47 more Thanks for your help!
... View more
Labels:
- Labels:
-
Apache Spark
-
Apache YARN