- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Spark driver memory keeps growing
- Labels:
-
Apache Spark
Created ‎08-04-2016 01:01 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi,
I'm running a job on Spark 1.5.2 and I get OutOfMemoryError on broadcast variables access. The thing is I am not sure to understand why the broadcast keeps growing and why it does at this place of code.
Basically, I have a large input file, each line having a key. I group by key my lines to have one object with all data related to a given key. Then I am doing a map to iterate over my objects, and for each object, I iterate over a collection which is a broadcast variable. The exception is thrown when iterating on the broadcast variable.
Here is a quick example:
Input file :
key1,row1 key2,row1 key1,row2 key2,row2
Broadcast variable is a list List(1,2,3)
I group by key my input :
key1, (row1,row2) key2, (row1,row2)
Then I do a map
myRdd.map( object -> for(item <- myBroadcast.value) executeFunction(object, item) )
I know the groupByKey is a very costly operation but I am not sure I can avoid it since the 'executeFunction' needs to have all the lines for a given key to be executed. Besides the stage where the groupByKey is performed is successfully completed when the exception is thrown.
Here is an extract from the logs:
16/08/04 03:17:50 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.3 GB so far) 16/08/04 03:18:37 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.3 GB so far) 16/08/04 03:19:22 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.4 GB so far) 16/08/04 03:20:07 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.5 GB so far) 16/08/04 03:20:53 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.6 GB so far) 16/08/04 03:21:11 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.7 GB so far) 16/08/04 03:21:15 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.5 GB so far) 16/08/04 03:44:22 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 3.4 GB so far) 16/08/04 03:53:03 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 3.8 GB so far) 16/08/04 04:02:00 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 4.1 GB so far) 16/08/04 04:20:52 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM 16/08/04 04:20:52 ERROR executor.Executor: Exception in task 1.0 in stage 62.1 (TID 1109) java.lang.OutOfMemoryError: Java heap space at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) at java.util.IdentityHashMap.put(IdentityHashMap.java:445) at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:229) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:165) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:550) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:168) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
... at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.util.collection.CompactBuffer.map(CompactBuffer.scala:30)
Any suggestion regarding what could explain this behavior?
Thanks!
Created ‎08-04-2016 02:13 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Broadcast is done with blocks of data. See the spark.broadcast.blockSize property here. This explains why the value grows in the log output.
How big is the file you are broadcasting? You can use the SizeEstimator to get a sense of what your object will really occupy. Then make sure your "--driver-memory" and "--executor-memory" has enough breathing room. Guidance for tuning can be found here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_spark-guide/content/ch_tuning-spark.html
Created ‎08-04-2016 02:42 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Not sure to understand your point. I probably miss something in how Spark works with broadcasts but my understanding is that broadcasts are immutable read-only objects, why would it keep growing? what is doing Spark with broadcasts?
I used the SizeEstimator (thanks for the tip) and what I am broadcasting is a couple of MB.
Created ‎08-08-2016 04:33 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
In what mode are you running spark application? spark shell or yarn client etc?
Created ‎08-08-2016 05:47 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The job is launched in yarn cluster mode.
Created ‎08-08-2016 09:03 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Pierre,
How is Object defined and serialized?
If fields of your object refers to the RDD, it copies the full RDD and shuffles it.
Would you be able to do a persist/cache before the broadcast join and get the Spark UI DAGS and Storage pages.
Cheers.
Amit
Created on ‎08-08-2016 01:24 PM - edited ‎08-18-2019 04:23 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The job is coded in scala and uses default serialization parameters.
The object has no reference to the RDD itself.
What do you mean by "broadcast join"?
Here is the current DAG:
Created ‎08-09-2016 07:35 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
HI Pierre,
We would need to look at the code.
Can you a do a persist just before stage 63 and before stage 65 check the spark UI storage tab and executor tab for data skew. If there is data skew, you will need to add a salt key to your key.
You could also look at creating a dataframe from the RDD rdd.toDF() and apply UDF on it. DF manage memory more efficiently.
Best,
Amit
