Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

CDH 5.5, Spark 1.5.0 Generates negative file segment sizes during repartition for large numbers of p

Highlighted

CDH 5.5, Spark 1.5.0 Generates negative file segment sizes during repartition for large numbers of p

Contributor

We have a task that reads data from bzip2 formatted files and due to the expense of recomputing partitions, we wanted to go from about 400 partitions or so to a larger number of partitions (so I added the second parameter to do this when I load the textFile input).  I inadvertently used 20000 tasks/partitions (which is large relative to the number of records but large relative to the 500 executors allocated) and several executors have errors like the following:

 

2016:06:17:12:51:43.888 [Executor task launch worker-0] ERROR org.apache.spark.executor.Executor.logError:96 - Exception in task 96.3 in stage 0.0 (TID 471) 
java.lang.IllegalArgumentException: requirement failed: File segment length cannot be negative (got -30617298)
        at scala.Predef$.require(Predef.scala:233) ~[main.jar:na]
        at org.apache.spark.storage.FileSegment.<init>(FileSegment.scala:28) ~[main.jar:na]
        at org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:216) ~[main.jar:na]
        at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:697) ~[main.jar:na]
        at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:689) ~[main.jar:na]
        at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[main.jar:na]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[main.jar:na]
        at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:689) ~[main.jar:na]
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) ~[main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) ~[main.jar:na]
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[main.jar:na]
        at org.apache.spark.scheduler.Task.run(Task.scala:88) ~[main.jar:na]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ~[main.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
2016:06:17:12:51:43.930 [sparkExecutor-akka.actor.default-dispatcher-5] ERROR o.apache.spark.rpc.akka.ErrorMonitor.logError:96 - AssociationError [akka.tcp://sparkExecutor@XXXX.myCompany.com:37245] -> [akka.tcp://sparkDriver@aa.bb.cc.dd:49709]: Error [Shut down address: akka.tcp://sparkDriver@aa.bb.cc.dd:49709] [

 

After correcting the number of tasks/partitions to 4000, the problem seems to have gone away, but I must admit I did not expect this symptom, and in fact, for some inputs of similar scale, the original 20000 tasks/partitions worked. 

In spite of my user error, the 20000 partitions should work for the size of the inputs we have, so this could also be a spark bug.

1 REPLY 1

Re: CDH 5.5, Spark 1.5.0 Generates negative file segment sizes during repartition for large numbers

New Contributor

Hi All,

 
I am running Spark Application with 1.8TB of data (which is stored in Hive tables format).  I am reading the data using HiveContect and processing it. 
The cluster has 5 nodes total, 25 cores per machine and 250Gb per node. I am launching the application with 25 executors with 5 cores each and 45GB per executor. Also, specified the property spark.yarn.executor.memoryOverhead=2024.
 
During the execution, tasks are lost and ShuffleMapTasks are re-submitted. I am seeing that tasks are failing with the following message -
 
 
java.lang.IllegalArgumentException: requirement failed: File segment length cannot be negative (got -27045427)
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.storage.FileSegment.<init>(FileSegment.scala:28)
at org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:220)
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:184)
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:398)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:206)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)