Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark Broadcast Hash Join failing on 800+ million to 1.5 million

avatar
Expert Contributor

I'm a beginner in Spark, trying to join a 1.5 million data set (100.3 MB) with 800+ million data set (15.6 GB) join using broadcast hash join with Spark Data frame API. The application completes in about 5 seconds with 80 tasks. As I try to run a "joinDF.show()" or "collect" command at the very last step, the application tasks fully completes but my console hangs right after and I get all these errors after some time.

The first line in the error:

Exception in thread "broadcast-hash-join-0" java.lang.OutOfMemoryError: GC overhead limit exceeded

Full Error log:

https://www.dropbox.com/s/te59cxnm4j5rb3p/log.txt?dl=0

Spark-Shell (Fire up command):

spark-shell \
  --executor-memory 16G \
  --num-executors 800 \

Spark-Scala Code:

case class Small(col_1: String, col_2:String, col_3:String, col_4:Int, col_5:Int, col_6:String)

val sm_data = sc.textFile("/small_hadoop_data")

val smallDataframe = sm_data.map(_.split("\\|")).map(attr => Small(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt, attr(5).toString)).toDF()

smallDataframe.registerTempTable("Small")  // Row Count 1,518,933


val lg_data = sc.textFile("/very_large_hadoop_data")

case class Large(col_1: Int, col_2: String, col_3: Int)

val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => Large(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()

LargeDataFrame.registerTempTable("Very_Large") // Row Count: 849,064,470

val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")
1 ACCEPTED SOLUTION

avatar
Master Guru

How big is your cluster? Sounds like you may need more RAM.

That's a big join when they come together.

What does the history UI show.

Try 895 executors and 32 G of RAM.

How many nodes in the cluster do you have? How big are these files in Gigabytes?

How much RAM is available on the cluster?

Do NOT run from the shell. Run this as compiled code and submit to yarn-cluster. This is running in a shell, not designed for large jobs. Shell is more for developing and testing parts of your code.

Can you upgrade to 1.6.2? Newer Spark is faster and more efficient.

Here are some settings to do.

https://community.hortonworks.com/articles/34209/spark-16-tips-in-code-and-submission.html

View solution in original post

8 REPLIES 8

avatar
New Contributor

Quick question. Have you tried without the broadcast? 1.5 million records is not that small, that you should send it to 800 executors.

Also shouldn't you be doing something with the joinDF, e.g. at least a joinDF.count()?

The join by "key" looks interesting as well. Have you considered trying you logic with a smaller dataset first, say 8000 records?

avatar
Expert Contributor

I tried without a broadcast. Gets divided into 895 tasks which will take aroud 10 mins to finish.

Spark UI screen shot:http://2.1m.yt/e_tnx0K.png

But it actually doesn't, i get "out of space" memory error.

avatar
Master Guru

How big is your cluster? Sounds like you may need more RAM.

That's a big join when they come together.

What does the history UI show.

Try 895 executors and 32 G of RAM.

How many nodes in the cluster do you have? How big are these files in Gigabytes?

How much RAM is available on the cluster?

Do NOT run from the shell. Run this as compiled code and submit to yarn-cluster. This is running in a shell, not designed for large jobs. Shell is more for developing and testing parts of your code.

Can you upgrade to 1.6.2? Newer Spark is faster and more efficient.

Here are some settings to do.

https://community.hortonworks.com/articles/34209/spark-16-tips-in-code-and-submission.html

avatar
Expert Contributor

Thanks for all your help, I'll try 32 gigs and 895 executors and run as compiled code and let u know.

And we are running Spark 1.5.2 here in our company.

Here is the cluster config.

8937-cluster-matrics.png

avatar
Expert Contributor

Regarding the size of the data, the small dataset is only 100.3 mb while the larger dataset is 15.6 gb

avatar
Expert Contributor

Late reply but running it on a cluster and increasing memory worked like a charm!

avatar
Master Guru

Run it on a cluster so you have more RAM . Running on one machine won't support that data size

16/10/25 18:30:41 INFO BlockManager: Reporting 4 blocks to the master.
Exception in thread "qtp1394524874-84" java.lang.OutOfMemoryError: GC overhead limit exceeded
       	at java.util.HashMap$KeySet.iterator(HashMap.java:912)
       	at java.util.HashSet.iterator(HashSet.java:172)
       	at sun.nio.ch.Util$2.iterator(Util.java:243)
       	at org.spark-project.jetty.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:600)
       	at org.spark-project.jetty.io.nio.SelectorManager$1.run(SelectorManager.java:290)
       	at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
       	at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
       	at java.lang.Thread.run(Thread.java:745)

avatar
Expert Contributor

Yes If you check my config above, I am running from a cluster. 🙂