Support Questions

Find answers, ask questions, and share your expertise

Spark driver memory keeps growing

avatar

Hi,

I'm running a job on Spark 1.5.2 and I get OutOfMemoryError on broadcast variables access. The thing is I am not sure to understand why the broadcast keeps growing and why it does at this place of code.

Basically, I have a large input file, each line having a key. I group by key my lines to have one object with all data related to a given key. Then I am doing a map to iterate over my objects, and for each object, I iterate over a collection which is a broadcast variable. The exception is thrown when iterating on the broadcast variable.

Here is a quick example:

Input file :

key1,row1

key2,row1

key1,row2

key2,row2

Broadcast variable is a list List(1,2,3)

I group by key my input :

key1, (row1,row2)

key2, (row1,row2)

Then I do a map

myRdd.map( object -> for(item <- myBroadcast.value) executeFunction(object, item) )

I know the groupByKey is a very costly operation but I am not sure I can avoid it since the 'executeFunction' needs to have all the lines for a given key to be executed. Besides the stage where the groupByKey is performed is successfully completed when the exception is thrown.

Here is an extract from the logs:

16/08/04 03:17:50 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.3 GB so far) 16/08/04 03:18:37 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.3 GB so far) 16/08/04 03:19:22 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.4 GB so far) 16/08/04 03:20:07 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.5 GB so far) 16/08/04 03:20:53 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.6 GB so far) 16/08/04 03:21:11 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.7 GB so far) 16/08/04 03:21:15 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.5 GB so far) 16/08/04 03:44:22 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 3.4 GB so far) 16/08/04 03:53:03 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 3.8 GB so far) 16/08/04 04:02:00 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 4.1 GB so far) 16/08/04 04:20:52 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM 16/08/04 04:20:52 ERROR executor.Executor: Exception in task 1.0 in stage 62.1 (TID 1109) java.lang.OutOfMemoryError: Java heap space at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) at java.util.IdentityHashMap.put(IdentityHashMap.java:445) at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:229) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:165) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:550) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:168) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

... at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.util.collection.CompactBuffer.map(CompactBuffer.scala:30)

Any suggestion regarding what could explain this behavior?

Thanks!

7 REPLIES 7

avatar
Super Collaborator

Broadcast is done with blocks of data. See the spark.broadcast.blockSize property here. This explains why the value grows in the log output.

How big is the file you are broadcasting? You can use the SizeEstimator to get a sense of what your object will really occupy. Then make sure your "--driver-memory" and "--executor-memory" has enough breathing room. Guidance for tuning can be found here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_spark-guide/content/ch_tuning-spark.html

avatar

Not sure to understand your point. I probably miss something in how Spark works with broadcasts but my understanding is that broadcasts are immutable read-only objects, why would it keep growing? what is doing Spark with broadcasts?

I used the SizeEstimator (thanks for the tip) and what I am broadcasting is a couple of MB.

avatar
Rising Star

In what mode are you running spark application? spark shell or yarn client etc?

avatar

The job is launched in yarn cluster mode.

avatar
Rising Star

Hi Pierre,

How is Object defined and serialized?

If fields of your object refers to the RDD, it copies the full RDD and shuffles it.

Would you be able to do a persist/cache before the broadcast join and get the Spark UI DAGS and Storage pages.

Cheers.

Amit

avatar

The job is coded in scala and uses default serialization parameters.

The object has no reference to the RDD itself.

What do you mean by "broadcast join"?

Here is the current DAG:

6472-dag.png

avatar
Rising Star

HI Pierre,

We would need to look at the code.

Can you a do a persist just before stage 63 and before stage 65 check the spark UI storage tab and executor tab for data skew. If there is data skew, you will need to add a salt key to your key.

You could also look at creating a dataframe from the RDD rdd.toDF() and apply UDF on it. DF manage memory more efficiently.

Best,

Amit