Created on 09-04-2014 11:07 PM - edited 09-16-2022 02:06 AM
Hi,
I am working on a spark cluster with more than 20 worker nodes and each node with a memory of 512 MB.
I am trying to acces file in HDFS in Spark.
I am facing issues even when accessing files of size around 250 MB using spark (both with and without caching). No other Spark processes
are running when I am doing this and no upper limit is set for Spark Resources in YARN.
I am accessing spark through spark shell using the command “spark-shell --master yarn-client”
Below are the error messages and the corresponding code. Can you provide some direction on how to solve this? Thanks!
Error -1 -------- val k = sc.textFile("/user/usera/dira/masterbigd") k.collect() 14/09/04 09:55:47 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 4 times, most recent failure: TID 6 on host host1.com failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Error - 2 ----------- val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class Master(sor: String,accnum: Int, accnumkey: String) val masterrdd = sc.textFile("/user/usera/dira/masterbigd").map(_.split(",")).map(k => Master(k(0),k(1).toInt,k(2))) masterrdd.registerAsTable("masterTBL") val entrdd = sql("Select sor, accnum, accnumkey from masterTBL") entrdd.collect() org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:1 failed 4 times, most recent failure: TID 6 on host host2.com failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Error - 3 (Caching used) ------------------------ val k = sc.textFile("/user/usera/dira/masterbigd") val cached = k.cache() cached.collect() java.net.SocketTimeoutException: 75000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.122.123.14:42968 remote=/10.123.123.345:1004] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at java.io.FilterInputStream.read(FilterInputStream.java:133) at java.io.DataInputStream.read(DataInputStream.java:149) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.hadoop.security.SaslInputStream.readMoreData(SaslInputStream.java:96) at org.apache.hadoop.security.SaslInputStream.read(SaslInputStream.java:201) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1986) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:796) 14/09/04 10:55:31 WARN DFSClient: Error Recovery for block BP-822007753-10.25.36.66-1400458079994:blk_1074486472_783590 in pipeline 10.123.123.32:1004, 10.123.123.33:1004, 10.123.123.34:1004: bad datanode 10.123.123.35:1004
Created 09-06-2014 04:07 AM
You can see all of the methods of RDD in http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.package and the PairRDDFunctions class. Look at what does and doesn't return an RDD.
Created 09-05-2014 12:01 AM
In the first example, you're collecting the entire data set into memory on your driver process. I don't know how much memory you gave it, but if your machines have 512MB memory (total?) a 250MB data set, accounting for Java overhead, probably blows through all of that, so an OOM is expected.
The second looks like some kind of error during execution of the Spark task. It's not clear from that log what the error is, just that a task failed repeatedly. It could be a Spark SQL problem, but you'd have to look at the task log to determine why that did not work. (There is the same potential problem with collecting all data to the driver, but it didn't get that far.)
The third instance also has a similar problem, but the error you see is an HDFS error. It sounds like datanodes are not working. Are these nodes trying to run Spark and HDFS in 512MB? or do you mean Spark has 512MB? I'd check the health of HDFS.
Created 09-05-2014 01:23 AM
Hi,
Thanks for the quick response.
For ths first error - Every worker has a 512 MB RAM allocated to it and I am working on a Spark gateway node and trying to load a 250 MB file.My understanding was that Spark would distribute this data workload across its workers (more than 20 workers each with 512 MB RAM).But I am getting an error. What do you think can be done here, should I increase the RAM allocated to each worker? In that case am I not limiited by the RAM on a single worker node even though I have have large number of worker nodes? Will this occuer even if I use Spark-Submit instead of Spark-Shell?
For the Second Error - I will look into the logs for this, but this one was also related to Memory.
For the third error - HDFS health is fine and the dates nodes have 16 GB RAM each. What i meant was every Spark worker has 512 MB RAM and I have more than 20 workers.
Thanks.
Created 09-05-2014 02:58 AM
Yes, but your call to collect() says "please copy all of the results into memory on the driver". I believe that's what is running out of memory. I don't see any evidence that the workers have a problem.
Created 09-05-2014 03:15 AM
Even "please copy all of the results into memory on the driver" ie - .collect() statement for a 250 MB file is supposed to work right in a Spark Cluster with 20 Worker Nodes of 512 MB each. Is the data not supposed to be distributed across all the worker nodes? If not , are we limited by the memory available node?
Or is there any other way in Spark to handle this 250 MB file and bigger files and cache them?
Created 09-05-2014 03:24 AM
I dont' follow... workers are irrelevant to the problem I am suggesting. You could have 1000 workers with 1TB memory and still fail if you try to copy 250MB into memory on your driver process, and the driver does not have enough memory.
Spark can certainly hold the data in memory on workers, but that is not what your code asks it to do.
Why are you calling collect()?
Created 09-05-2014 04:21 AM
I am calling .collect() just to display to see the data in the file.
So, what is the method to distribute the data to the workers instead of overloading the driver and then access the data?
Should I use any other function that .collect() ?
How can we cache a 1 GB file into memory in Spark?
Please let me know.
Created 09-05-2014 05:13 AM
Ah if you just want to see a bit of the data, try something like .take(10).foreach(println).
Data is already distributed by virtue of being in HDFS. Spark will send computation to the workers. So it's all inherently distributed. The exception are methods whose purpose is explicitly to return data to the driver, like collect().
You don't need to tell Spark to keep data in memory or not. It will manage without any intervention.
However you can call methods like .cache() to explicitly save the RDD's state into blocks in memory and break its lineage. (You can do the same and put it on disk, or in a combination of disk and memory.) This is appropriate when you are reusing an RDD many times, but otherwise not necessary for you to manage.
Created 09-05-2014 05:47 AM
Thanks a lot for the explanation!
So, what if there is a situation where in my final result data set is of big size?
Will I be always restrcted by the memory of the driver while getting the output?
Created 09-05-2014 06:16 AM
No, your data stays out on the cluster. What happens to it depends on what you want to do with it. For example if you want to save it to HDFS, you simply call saveAsHadoopFiles(). This writes the distributed data to the distributed file system.
You do not in general pull back data to the driver, certainly not a whole large data set. It's the cluster doing work, not the driver, in general.