Created 07-29-2016 08:45 AM
I am processing a Scala script on top of Spark and I get the next error message:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 40, localhost): java.lang.OutOfMemoryError: GC overhead limit exceeded at sun.reflect.GeneratedSerializationConstructorAccessor103.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at java.io.ObjectStreamClass.newInstance(ObjectStreamClass.java:967) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1785) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.collect(RDD.scala:926) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:741) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:740) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:740) at org.apache.spark.mllib.tree.DecisionTree$.findBestSplits(DecisionTree.scala:651) at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:233) at org.apache.spark.mllib.tree.RandomForest$.trainClassifier(RandomForest.scala:289) at org.apache.spark.mllib.tree.RandomForest$.trainClassifier(RandomForest.scala:331) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:61) at $iwC$$iwC$$iwC.<init>(<console>:63) at $iwC$$iwC.<init>(<console>:65) at $iwC.<init>(<console>:67) at <init>(<console>:69) at .<init>(<console>:73) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:709) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:673) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:666) at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:295) at org.apache.zeppelin.scheduler.Job.run(Job.java:171) at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at sun.reflect.GeneratedSerializationConstructorAccessor103.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at java.io.ObjectStreamClass.newInstance(ObjectStreamClass.java:967) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1785) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
This is the executed code:
import org.apache.spark.mllib.tree.RandomForest import org.apache.spark.mllib.tree.model.RandomForestModel import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.evaluation.MulticlassMetrics val unparseddata = sc.textFile("hdfs:///tmp/new_data_binary_5.csv") val data = unparseddata.map { line => val parts = line.split(',').map(_.toDouble) LabeledPoint(parts.last%2, Vectors.dense(parts.slice(0, parts.length - 1))) } // Load and parse the data file. //val data = MLUtils.loadLibSVMFile(sc, "/tmp/sample_libsvm_data.txt") // Split the data into training and test sets (30% held out for testing) val splits = data.randomSplit(Array(0.7, 0.3)) val (trainingData, testData) = (splits(0), splits(1)) // Train a RandomForest model. // Empty categoricalFeaturesInfo indicates all features are continuous. val numClasses = 3 val categoricalFeaturesInfo = Map[Int, Int]() val numTrees = 3 // Use more in practice. val featureSubsetStrategy = "auto" // Let the algorithm choose. val impurity = "gini" val maxDepth = 4 val maxBins = 32 val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) println("Learned classification forest model:\n" + model.toDebugString) // Compute raw scores on the test set val predictionAndLabels = test.map { case LabeledPoint(label, features) => val prediction = model.predict(features) (prediction, label) } // Instantiate metrics object val metrics = new MulticlassMetrics(predictionAndLabels) // Confusion matrix println("Confusion matrix:") println(metrics.confusionMatrix) // Overall Statistics val precision = metrics.precision val recall = metrics.recall // same as true positive rate val f1Score = metrics.fMeasure println("Summary Statistics") println(s"Precision = $precision") println(s"Recall = $recall") println(s"F1 Score = $f1Score") // Precision by label val labels = metrics.labels labels.foreach { l => println(s"Precision($l) = " + metrics.precision(l)) } // Recall by label labels.foreach { l => println(s"Recall($l) = " + metrics.recall(l)) } // False positive rate by label labels.foreach { l => println(s"FPR($l) = " + metrics.falsePositiveRate(l)) } // F-measure by label labels.foreach { l => println(s"F1-Score($l) = " + metrics.fMeasure(l)) } // Weighted stats println(s"Weighted precision: ${metrics.weightedPrecision}") println(s"Weighted recall: ${metrics.weightedRecall}") println(s"Weighted F1 score: ${metrics.weightedFMeasure}") println(s"Weighted false positive rate: ${metrics.weightedFalsePositiveRate}")
We are running it from a Zeppelin Notebook.
I changed various parameters of Yarn and Spark configuration but still getting this message. Can some one help me solve this?
Thank you.
Created 07-29-2016 09:15 AM
Please provide your code and command to execute it
Created 07-29-2016 11:14 AM
message updated, thank you.
Created 08-01-2016 02:15 AM
If I had to guess your using Spark 1.5.2 or earlier. What is happening is you run out of memory. I think youre running out of executor memory, so you're probably doing a map-side aggregate. How many keys do you have? I think we can fix this pretty simply. Are you caching data? If not set spark.suffle.fraction to a number higher than .4.
Created 08-01-2016 02:37 AM
How much memory do you have? How much is assigned to Spark? Do you have logging on so you can check logs and history UI?
Turn off everything else you can.
For debugging run through the Spark shell, Zeppelin adds over head and takes a decent amount of YARN resources and RAM.
Run on Spark 1.6 / HDP 2.4.2 if you can. Allocate as much memory as possible. Spark is an all memory beast.
sparkConf.set("spark.cores.max", "16") // all the cores you can |
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName) |
sparkConf.set("spark.sql.tungsten.enabled", "true") |
sparkConf.set("spark.eventLog.enabled", "true") |
sparkConf.set("spark.app.id", "YourID") |
sparkConf.set("spark.io.compression.codec", "snappy") |
sparkConf.set("spark.rdd.compress", "true") |
I like to maximize my resources and performance.