- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Memory Issues in while accessing files in Spark
- Labels:
Apache Hadoop
Apache Spark
Apache YARN
Created on 09-04-2014 11:07 PM - edited 09-16-2022 02:06 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am working on a spark cluster with more than 20 worker nodes and each node with a memory of 512 MB.
I am trying to acces file in HDFS in Spark.
I am facing issues even when accessing files of size around 250 MB using spark (both with and without caching). No other Spark processes
are running when I am doing this and no upper limit is set for Spark Resources in YARN.
I am accessing spark through spark shell using the command “spark-shell --master yarn-client”
Below are the error messages and the corresponding code. Can you provide some direction on how to solve this? Thanks!
Error -1 -------- val k = sc.textFile("/user/usera/dira/masterbigd") k.collect() 14/09/04 09:55:47 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 4 times, most recent failure: TID 6 on host host1.com failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Error - 2 ----------- val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class Master(sor: String,accnum: Int, accnumkey: String) val masterrdd = sc.textFile("/user/usera/dira/masterbigd").map(_.split(",")).map(k => Master(k(0),k(1).toInt,k(2))) masterrdd.registerAsTable("masterTBL") val entrdd = sql("Select sor, accnum, accnumkey from masterTBL") entrdd.collect() org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:1 failed 4 times, most recent failure: TID 6 on host host2.com failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Error - 3 (Caching used) ------------------------ val k = sc.textFile("/user/usera/dira/masterbigd") val cached = k.cache() cached.collect() java.net.SocketTimeoutException: 75000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/ remote=/] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at java.io.FilterInputStream.read(FilterInputStream.java:133) at java.io.DataInputStream.read(DataInputStream.java:149) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.hadoop.security.SaslInputStream.readMoreData(SaslInputStream.java:96) at org.apache.hadoop.security.SaslInputStream.read(SaslInputStream.java:201) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1986) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:796) 14/09/04 10:55:31 WARN DFSClient: Error Recovery for block BP-822007753- in pipeline,, bad datanode
Created 09-06-2014 04:07 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You can see all of the methods of RDD in http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.package and the PairRDDFunctions class. Look at what does and doesn't return an RDD.
Created 09-05-2014 07:56 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I really appreciate all the answers given by you today! It clarifies a lot ! Thanks!
Just one final question - I believe collect() , take() and print() are there only functions that put load upon the driver?
Is my understanding correct? Or is there any other documentation on this?
Created 09-06-2014 04:06 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
If you mean which functions don't return an RDD, there are more. All of the count* functions and take* functions, first(), print() I suppose, and reduce(). Anyting which conceptually should return a small value returns to the driver. I wouldn't describe it as putting load on the driver necessarily, but it of course returns a value into memory in the driver.
Created 09-06-2014 04:07 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You can see all of the methods of RDD in http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.package and the PairRDDFunctions class. Look at what does and doesn't return an RDD.
Created 08-17-2016 09:05 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
This might not be the relevant topic, but I think its right people.
I am having some issue with caching dataframe in spark.
(step 1). I am reading hive table as a dataframe. Lets say we have the count 2.
(step 2). I am caching this dataframe.
(step 3). I am adding 2 additional records to the hive table.
(step 4). I am doing count on the cached dataframe again.
If caching is working as I am expecting, the count in step 1 and step 4 should be 2. This is working when I am adding additional records to the table from outside the spark application. However it is not working if I am doing step 3 from within the application. I AM NOT UNDERSTANIDNG WHY.
I I do step 3 from the same application I am getting step 4 count as 4. But why??
I think I am missing something.
Created 08-17-2016 09:28 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Yes, I don't think this is related, but the quick answer is that "cache" just means "cache this thing whenever you get around to computing it", and you are adding 2 records before it is computed. Hence count is 4, not 2.
Created 08-17-2016 09:42 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello srowen,
I am doing count in step 1 as well. (right after caching the dataframe). So my expectation is that dataframe should have only 2 records even if are inserting records the table in between. If that is true then when we do count on the cached dataframe at the end. It should be 2, but why is it 4. This is what is confusing me.
Thanks in advance
Created 08-17-2016 09:47 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
(Please start a separate thread.) My last response explained why it's 4.
Created on 09-05-2018 03:33 AM - edited 09-05-2018 03:36 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks for this clarification. I also had the same qurery ragrding memory issue while loading data. Here you cleared doubt about file loading from HDFS.
I have a similar question but the source is a local server or Cloud storage where the data size is more than driver memory ( let's say 1 GB in this case where the driver memory is 250 MB). If I fire command
val file_rdd = sc.textFile("/path or local or S3")
shoud Spark load the data or as you mentioned above will throgh exception?
Also, is there a way to print driver available memroy in Terminal?
Many Thanks,
Siddharth Saraf

- « Previous
- 1
- 2
- Next »