Reply
Highlighted
Explorer
Posts: 27
Registered: ‎05-15-2015

CDH 5.5, Spark 1.5.0 Generates negative file segment sizes during repartition for large numbers of p

We have a task that reads data from bzip2 formatted files and due to the expense of recomputing partitions, we wanted to go from about 400 partitions or so to a larger number of partitions (so I added the second parameter to do this when I load the textFile input).  I inadvertently used 20000 tasks/partitions (which is large relative to the number of records but large relative to the 500 executors allocated) and several executors have errors like the following:

 

2016:06:17:12:51:43.888 [Executor task launch worker-0] ERROR org.apache.spark.executor.Executor.logError:96 - Exception in task 96.3 in stage 0.0 (TID 471) 
java.lang.IllegalArgumentException: requirement failed: File segment length cannot be negative (got -30617298)
        at scala.Predef$.require(Predef.scala:233) ~[main.jar:na]
        at org.apache.spark.storage.FileSegment.<init>(FileSegment.scala:28) ~[main.jar:na]
        at org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:216) ~[main.jar:na]
        at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:697) ~[main.jar:na]
        at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:689) ~[main.jar:na]
        at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[main.jar:na]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[main.jar:na]
        at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:689) ~[main.jar:na]
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) ~[main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) ~[main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[main.jar:na]
        at org.apache.spark.scheduler.Task.run(Task.scala:88) ~[main.jar:na]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ~[main.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
2016:06:17:12:51:43.930 [sparkExecutor-akka.actor.default-dispatcher-5] ERROR o.apache.spark.rpc.akka.ErrorMonitor.logError:96 - AssociationError [akka.tcp://sparkExecutor@XXXX.myCompany.com:37245] -> [akka.tcp://sparkDriver@aa.bb.cc.dd:49709]: Error [Shut down address: akka.tcp://sparkDriver@aa.bb.cc.dd:49709] [

 

After correcting the number of tasks/partitions to 4000, the problem seems to have gone away, but I must admit I did not expect this symptom, and in fact, for some inputs of similar scale, the original 20000 tasks/partitions worked. 

In spite of my user error, the 20000 partitions should work for the size of the inputs we have, so this could also be a spark bug.

New Contributor
Posts: 1
Registered: ‎06-22-2016

Re: CDH 5.5, Spark 1.5.0 Generates negative file segment sizes during repartition for large numbers

Hi All,

 
I am running Spark Application with 1.8TB of data (which is stored in Hive tables format).  I am reading the data using HiveContect and processing it. 
The cluster has 5 nodes total, 25 cores per machine and 250Gb per node. I am launching the application with 25 executors with 5 cores each and 45GB per executor. Also, specified the property spark.yarn.executor.memoryOverhead=2024.
 
During the execution, tasks are lost and ShuffleMapTasks are re-submitted. I am seeing that tasks are failing with the following message -
 
 
java.lang.IllegalArgumentException: requirement failed: File segment length cannot be negative (got -27045427)
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.storage.FileSegment.<init>(FileSegment.scala:28)
at org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:220)
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:184)
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:398)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:206)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
Announcements