Support Questions

Find answers, ask questions, and share your expertise

terminated with error org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage

avatar
Explorer

While executing the spark application in yarn i was facing the following issue twice or thrice a day these days. 


ERROR MicroBatchExecution:91 - Query [id = 0ec965b7-5dda-43be-940c-3ec8672bcd5c, runId = 17c15719-ab20-4488-ba37-ccf6a6ca27e1] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 185575 (start at DeviceLocationDataListener.scala:148) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:470)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:184)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:206)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334) at scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111)
at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1334) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:117)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:117) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) 
 
Caused by: java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:202)
at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:170)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:361) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:348)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:348) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:369)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:462) ... 32 more 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)


I suppose, i was casused when executing the following code:

 

 

 

 

df.writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.persist()
    
------mongo insert 1---------
------mongo insert 2---------

batchDF
  .select(DeviceIOIDS.TENANTGROUPUID.value, DeviceIOIDS.WORKERID.value, DeviceIOIDS.TASKUID.value)
  .groupBy(DeviceIOIDS.TENANTGROUPUID.value, DeviceIOIDS.WORKERID.value, DeviceIOIDS.TASKUID.value)
  .count()
  .withColumnRenamed("count", "recordcount")
  .withColumn(DeviceIOIDS.ISTRANSFERRED.value, lit(0))
  .withColumn(DeviceIOIDS.INSERTDATETIME.value, current_timestamp())
  .withColumn(DeviceIOIDS.INSERTDATE.value, current_date())
  .write
  .mode("Append")
  .mongo(
    WriteConfig(
      "mongo.dbname".getConfigValue,
      "mongo.devicelocationtransferredstatus".getConfigValue
    )
  )

batchDF.unpersist()
}

 

 

 

 


Thanks for the help

1 ACCEPTED SOLUTION

avatar
Master Collaborator

Hi @nagababu 

There are couples of issues reported for similar kind of behaviour i.e SPARK-34790 , SPARK-18105 , SPARK-32658 

You can try the following things:

Step1: Change the compression codec and run the application. For example, spark.io.compression.codec=snappy

Step2: If step1 is not resolved then try to set spark.file.transferTo=false and rerun the application.

Step3: You can set the following parameter and rerun the application

--conf spark.sql.adaptive.fetchShuffleBlocksInBatch=false

Step4: If any of the above steps are not resolved your issue then you can set the following parameters true and false and rerun the application.

spark.network.crypto.enabled=true
spark.authenticate=true
spark.io.encryption.enabled=true

 Step5: If any of the above steps are not resolved your issue needs to tune the shuffle operation.

View solution in original post

4 REPLIES 4

avatar
Community Manager

@nagababu, Welcome to our community! To help you get the best possible answer, I have tagged in our Spark experts @RangaReddy  who may be able to assist you further.

Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Master Collaborator

Hi @nagababu 

There are couples of issues reported for similar kind of behaviour i.e SPARK-34790 , SPARK-18105 , SPARK-32658 

You can try the following things:

Step1: Change the compression codec and run the application. For example, spark.io.compression.codec=snappy

Step2: If step1 is not resolved then try to set spark.file.transferTo=false and rerun the application.

Step3: You can set the following parameter and rerun the application

--conf spark.sql.adaptive.fetchShuffleBlocksInBatch=false

Step4: If any of the above steps are not resolved your issue then you can set the following parameters true and false and rerun the application.

spark.network.crypto.enabled=true
spark.authenticate=true
spark.io.encryption.enabled=true

 Step5: If any of the above steps are not resolved your issue needs to tune the shuffle operation.

avatar
Explorer

Hi @RangaReddy , thank you very much for your response and suggestions. I tried the steps you recommended, and while they were helpful, I found that the issue was ultimately resolved by increasing the executor memory and by setting the spark.file.transferTo=false.

I appreciate your assistance.

avatar
Community Manager

@nagababu, Did the response assist in resolving your query? If it did, kindly mark the relevant reply as the solution, as it will aid others in locating the answer more easily in the future. 



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: