Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

org.apache.spark.shuffle.FetchFailedException: Failed to send request StreamChunkId{streamId=4329156

avatar
New Contributor

Hi,

 

I am aggregating a 1,3 TB using Spark Scala, after aggregatoin step i am getting connection lost error while writing result in a csv file.Please help.

Below are details 

Code :


spark-shell --master yarn \--driver-memory 10g \--executor-memory 100g \--num-executors=8 \--executor-cores 20 \--queue QueueB

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val result = sqlContext.sql("FROM ps.detail_ufdr_other_17920 SELECT *")
result.registerTempTable("DFUO")
val sdf=sqlContext.sql("select rat,sum(L4_DW_THROUGHPUT) from DFUO group by rat")
sdf.write.mode("overwrite").format("com.databricks.spark.csv").option("delimiter", "|").save("/root/test/other_file/")

 

 

Error Details

org.apache.spark.shuffle.FetchFailedException: Failed to send request StreamChunkId{streamId=432915604128, chunkIndex=27602} to /172.16.5.7:23516: java.io.IOException: Connection reset by peer
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:458)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:443)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:122)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$3.apply(TungstenAggregate.scala:144)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$3.apply(TungstenAggregate.scala:144)
	at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:68)
	at org.apache.spark.scheduler.Task.run(Task.scala:90)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to send request StreamChunkId{streamId=432915604128, chunkIndex=27602} to /172.16.5.7:23516: java.io.IOException: Connection reset by peer
	at org.apache.spark.network.client.TransportClient$1.operationComplete(TransportClient.java:139)
	at org.apache.spark.network.client.TransportClient$1.operationComplete(TransportClient.java:125)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
	at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
	at io.netty.channel.ChannelOutboundBuffer.safeFail(ChannelOutboundBuffer.java:678)
	at io.netty.channel.ChannelOutboundBuffer.remove0(ChannelOutboundBuffer.java:298)
	at io.netty.channel.ChannelOutboundBuffer.failFlushed(ChannelOutboundBuffer.java:621)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:765)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:317)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:519)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	... 1 more
Caused by: java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.spark.network.sasl.SaslEncryption$EncryptedMessage.transferTo(SaslEncryption.java:219)
	at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:254)
	at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:237)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:281)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:761)
	... 7 more

 

2 REPLIES 2

avatar
New Contributor

@Zaka  Were you able to resolve? I am facing same issue

avatar
Expert Contributor

Hello @Zaka,

 

Is this job ever ran successfully? Have you upgraded any components?