Support Questions

Find answers, ask questions, and share your expertise

Memory Issues in while accessing files in Spark

avatar
Explorer

Hi,

 

I am working on a spark cluster with more than 20 worker nodes and each node with a memory of 512 MB.

 

I am trying to acces file in HDFS in Spark.

 

I am facing issues even when accessing files of size around 250 MB using spark (both with and without caching). No other Spark processes

are running when I am doing this and no upper limit is set for Spark Resources in YARN.

 

I am accessing spark through spark shell using the command “spark-shell --master yarn-client”

 

Below are the error messages and the corresponding code. Can you provide some direction on how to solve this? Thanks!

 

Error -1
--------

val k = sc.textFile("/user/usera/dira/masterbigd")
k.collect()

14/09/04 09:55:47 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:2271)
        at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 4 times, most recent failure: TID 6 on host host1.com failed for unknown reason
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Error  - 2 
-----------

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
case class Master(sor: String,accnum: Int, accnumkey: String)
val masterrdd = sc.textFile("/user/usera/dira/masterbigd").map(_.split(",")).map(k => Master(k(0),k(1).toInt,k(2)))
masterrdd.registerAsTable("masterTBL")
val entrdd = sql("Select sor, accnum, accnumkey from masterTBL")          
entrdd.collect()


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:1 failed 4 times, most recent failure: TID 6 on host host2.com failed for unknown reason
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Error - 3 (Caching used)
------------------------

val k = sc.textFile("/user/usera/dira/masterbigd")
val cached = k.cache()
cached.collect()

java.net.SocketTimeoutException: 75000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.122.123.14:42968 remote=/10.123.123.345:1004]
        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
        at java.io.FilterInputStream.read(FilterInputStream.java:133)
        at java.io.DataInputStream.read(DataInputStream.java:149)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at org.apache.hadoop.security.SaslInputStream.readMoreData(SaslInputStream.java:96)
        at org.apache.hadoop.security.SaslInputStream.read(SaslInputStream.java:201)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1986)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:796)

14/09/04 10:55:31 WARN DFSClient: Error Recovery for block BP-822007753-10.25.36.66-1400458079994:blk_1074486472_783590 in pipeline 10.123.123.32:1004, 10.123.123.33:1004, 10.123.123.34:1004: bad datanode 10.123.123.35:1004

 

1 ACCEPTED SOLUTION

avatar
Master Collaborator

You can see all of the methods of RDD in http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.package and the PairRDDFunctions class. Look at what does and doesn't return an RDD.

View solution in original post

17 REPLIES 17

avatar
Explorer

 

I really appreciate all the answers given by you today! It clarifies a lot ! Thanks!

 

Just one final question - I believe collect() , take() and print() are there only functions that put load upon the driver?

 

Is my understanding correct? Or is there any other documentation on this?

avatar
Master Collaborator

If you mean which functions don't return an RDD, there are more. All of the count* functions and take* functions, first(), print() I suppose, and reduce(). Anyting which conceptually should return a small value returns to the driver. I wouldn't describe it as putting load on the driver necessarily, but it of course returns a value into memory in the driver.

avatar
Master Collaborator

You can see all of the methods of RDD in http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.package and the PairRDDFunctions class. Look at what does and doesn't return an RDD.

avatar
Explorer

This might not be the relevant topic, but I think its right people.

I am having some issue with caching dataframe in spark.

 

(step 1). I am reading hive table as a dataframe. Lets say we have the count 2.

(step 2). I am caching this dataframe.

(step 3). I am adding 2 additional records to the hive table.

(step 4). I am doing count on the cached dataframe again.

 

If caching is working as I am expecting, the count in step 1 and step 4 should be 2. This is working when I am adding additional records to the table from outside the spark application. However it is not working if I am doing step 3 from within the application. I AM NOT UNDERSTANIDNG WHY. 

 

I I do step 3 from the same application I am getting step 4 count as 4. But why??

 

I think I am missing something.

 

 

avatar
Master Collaborator

Yes, I don't think this is related, but the quick answer is that "cache" just means "cache this thing whenever you get around to computing it", and you are adding 2 records before it is computed. Hence count is 4, not 2.

avatar
Explorer

Hello srowen,

 

I am doing count in step 1 as well. (right after caching the dataframe). So my expectation is that dataframe should have only 2 records even if are inserting records the table in between. If that is true then when we do count on the cached dataframe at the end. It should be 2, but why is it 4. This is what is confusing me.

 

Thanks in advance

avatar
Master Collaborator

(Please start a separate thread.) My last response explained why it's 4.

avatar
New Contributor

Thanks for this clarification. I also had the same qurery ragrding memory issue while loading data. Here you cleared doubt about file loading from HDFS. 

I have a similar question but the source is a local server or Cloud storage where the data size is more than driver memory ( let's say 1 GB in this case where the driver memory is 250 MB).  If I fire command

val file_rdd = sc.textFile("/path or local or S3")

 

shoud Spark load the data or as you mentioned above will throgh exception?

 

Also, is there a way to print driver available memroy in Terminal?

 

Many Thanks,

Siddharth Saraf