<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Spark driver memory keeps growing in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/Spark-driver-memory-keeps-growing/m-p/165005#M127372</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I'm running a job on Spark 1.5.2 and I get OutOfMemoryError on broadcast variables access. The thing is I am not sure to understand why the broadcast keeps growing and why it does at this place of code.&lt;/P&gt;&lt;P&gt;Basically, I have a large input file, each line having a key. I group by key my lines to have one object with all data related to a given key. Then I am doing a map to iterate over my objects, and for each object, I iterate over a collection which is a broadcast variable. The exception is thrown when iterating on the broadcast variable.&lt;/P&gt;&lt;P&gt;Here is a quick example:&lt;/P&gt;&lt;P&gt;Input file :&lt;/P&gt;&lt;PRE&gt;key1,row1

key2,row1

key1,row2

key2,row2&lt;/PRE&gt;&lt;P&gt;Broadcast variable is a list List(1,2,3)&lt;/P&gt;&lt;P&gt;I group by key my input :&lt;/P&gt;&lt;PRE&gt;key1, (row1,row2)

key2, (row1,row2)&lt;/PRE&gt;&lt;P&gt;Then I do a map &lt;/P&gt;&lt;PRE&gt;myRdd.map( object -&amp;gt; for(item &amp;lt;- myBroadcast.value) executeFunction(object, item) )&lt;/PRE&gt;&lt;P&gt;I know the groupByKey is a very costly operation but I am not sure I can avoid it since the 'executeFunction' needs to have all the lines for a given key to be executed. Besides the stage where the groupByKey is performed is successfully completed when the exception is thrown.&lt;/P&gt;&lt;P&gt;Here is an extract from the logs:&lt;/P&gt;&lt;P&gt;16/08/04 03:17:50 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.3 GB so far)
16/08/04 03:18:37 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.3 GB so far)
16/08/04 03:19:22 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.4 GB so far)
16/08/04 03:20:07 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.5 GB so far)
16/08/04 03:20:53 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.6 GB so far)
16/08/04 03:21:11 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.7 GB so far)
16/08/04 03:21:15 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.5 GB so far)
16/08/04 03:44:22 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 3.4 GB so far)
16/08/04 03:53:03 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 3.8 GB so far)
16/08/04 04:02:00 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 4.1 GB so far)
16/08/04 04:20:52 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
16/08/04 04:20:52 ERROR executor.Executor: Exception in task 1.0 in stage 62.1 (TID 1109)
java.lang.OutOfMemoryError: Java heap space
  at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
  at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:229)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
  at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:165)
  at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:550)
  at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
  at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:168)
  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175)
  at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
  at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
  at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
  at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
  at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)&lt;/P&gt;&lt;P&gt;...
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at org.apache.spark.util.collection.CompactBuffer.map(CompactBuffer.scala:30)&lt;/P&gt;&lt;P&gt;Any suggestion regarding what could explain this behavior?&lt;/P&gt;&lt;P&gt;Thanks!&lt;/P&gt;</description>
    <pubDate>Thu, 04 Aug 2016 20:01:49 GMT</pubDate>
    <dc:creator>pvillard</dc:creator>
    <dc:date>2016-08-04T20:01:49Z</dc:date>
  </channel>
</rss>

