Created on 02-06-2019 05:35 AM - edited 09-16-2022 07:07 AM
Hi,
I am aggregating a 1,3 TB using Spark Scala, after aggregatoin step i am getting connection lost error while writing result in a csv file.Please help.
Below are details
Code :
spark-shell --master yarn \--driver-memory 10g \--executor-memory 100g \--num-executors=8 \--executor-cores 20 \--queue QueueB
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val result = sqlContext.sql("FROM ps.detail_ufdr_other_17920 SELECT *")
result.registerTempTable("DFUO")
val sdf=sqlContext.sql("select rat,sum(L4_DW_THROUGHPUT) from DFUO group by rat")
sdf.write.mode("overwrite").format("com.databricks.spark.csv").option("delimiter", "|").save("/root/test/other_file/")
Error Details
org.apache.spark.shuffle.FetchFailedException: Failed to send request StreamChunkId{streamId=432915604128, chunkIndex=27602} to /172.16.5.7:23516: java.io.IOException: Connection reset by peer at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:458) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:443) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:122) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$3.apply(TungstenAggregate.scala:144) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$3.apply(TungstenAggregate.scala:144) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:68) at org.apache.spark.scheduler.Task.run(Task.scala:90) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to send request StreamChunkId{streamId=432915604128, chunkIndex=27602} to /172.16.5.7:23516: java.io.IOException: Connection reset by peer at org.apache.spark.network.client.TransportClient$1.operationComplete(TransportClient.java:139) at org.apache.spark.network.client.TransportClient$1.operationComplete(TransportClient.java:125) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) at io.netty.channel.ChannelOutboundBuffer.safeFail(ChannelOutboundBuffer.java:678) at io.netty.channel.ChannelOutboundBuffer.remove0(ChannelOutboundBuffer.java:298) at io.netty.channel.ChannelOutboundBuffer.failFlushed(ChannelOutboundBuffer.java:621) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:765) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:317) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:519) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.spark.network.sasl.SaslEncryption$EncryptedMessage.transferTo(SaslEncryption.java:219) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:254) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:237) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:281) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:761) ... 7 more
Created 05-09-2019 01:35 AM
@Zaka Were you able to resolve? I am facing same issue
Created 06-07-2019 02:51 AM