Created 10-26-2016 07:13 PM
I'm a beginner in Spark, trying to join a 1.5 million data set (100.3 MB) with 800+ million data set (15.6 GB) join using broadcast hash join with Spark Data frame API. The application completes in about 5 seconds with 80 tasks. As I try to run a "joinDF.show()" or "collect" command at the very last step, the application tasks fully completes but my console hangs right after and I get all these errors after some time.
The first line in the error:
Exception in thread "broadcast-hash-join-0" java.lang.OutOfMemoryError: GC overhead limit exceeded
Full Error log:
https://www.dropbox.com/s/te59cxnm4j5rb3p/log.txt?dl=0
Spark-Shell (Fire up command):
spark-shell \ --executor-memory 16G \ --num-executors 800 \
Spark-Scala Code:
case class Small(col_1: String, col_2:String, col_3:String, col_4:Int, col_5:Int, col_6:String) val sm_data = sc.textFile("/small_hadoop_data") val smallDataframe = sm_data.map(_.split("\\|")).map(attr => Small(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt, attr(5).toString)).toDF() smallDataframe.registerTempTable("Small") // Row Count 1,518,933 val lg_data = sc.textFile("/very_large_hadoop_data") case class Large(col_1: Int, col_2: String, col_3: Int) val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => Large(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF() LargeDataFrame.registerTempTable("Very_Large") // Row Count: 849,064,470 val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")
Created 10-27-2016 06:43 PM
How big is your cluster? Sounds like you may need more RAM.
That's a big join when they come together.
What does the history UI show.
Try 895 executors and 32 G of RAM.
How many nodes in the cluster do you have? How big are these files in Gigabytes?
How much RAM is available on the cluster?
Do NOT run from the shell. Run this as compiled code and submit to yarn-cluster. This is running in a shell, not designed for large jobs. Shell is more for developing and testing parts of your code.
Can you upgrade to 1.6.2? Newer Spark is faster and more efficient.
Here are some settings to do.
https://community.hortonworks.com/articles/34209/spark-16-tips-in-code-and-submission.html
Created 10-26-2016 08:42 PM
Quick question. Have you tried without the broadcast? 1.5 million records is not that small, that you should send it to 800 executors.
Also shouldn't you be doing something with the joinDF, e.g. at least a joinDF.count()?
The join by "key" looks interesting as well. Have you considered trying you logic with a smaller dataset first, say 8000 records?
Created 10-27-2016 05:53 PM
I tried without a broadcast. Gets divided into 895 tasks which will take aroud 10 mins to finish.
Spark UI screen shot:http://2.1m.yt/e_tnx0K.png
But it actually doesn't, i get "out of space" memory error.
Created 10-27-2016 06:43 PM
How big is your cluster? Sounds like you may need more RAM.
That's a big join when they come together.
What does the history UI show.
Try 895 executors and 32 G of RAM.
How many nodes in the cluster do you have? How big are these files in Gigabytes?
How much RAM is available on the cluster?
Do NOT run from the shell. Run this as compiled code and submit to yarn-cluster. This is running in a shell, not designed for large jobs. Shell is more for developing and testing parts of your code.
Can you upgrade to 1.6.2? Newer Spark is faster and more efficient.
Here are some settings to do.
https://community.hortonworks.com/articles/34209/spark-16-tips-in-code-and-submission.html
Created on 10-28-2016 02:09 AM - edited 08-18-2019 05:52 AM
Thanks for all your help, I'll try 32 gigs and 895 executors and run as compiled code and let u know.
And we are running Spark 1.5.2 here in our company.
Here is the cluster config.
Created 10-28-2016 02:46 AM
Regarding the size of the data, the small dataset is only 100.3 mb while the larger dataset is 15.6 gb
Created 02-21-2017 07:52 PM
Late reply but running it on a cluster and increasing memory worked like a charm!
Created 10-27-2016 06:44 PM
Run it on a cluster so you have more RAM . Running on one machine won't support that data size
16/10/25 18:30:41 INFO BlockManager: Reporting 4 blocks to the master. Exception in thread "qtp1394524874-84" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.HashMap$KeySet.iterator(HashMap.java:912) at java.util.HashSet.iterator(HashSet.java:172) at sun.nio.ch.Util$2.iterator(Util.java:243) at org.spark-project.jetty.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:600) at org.spark-project.jetty.io.nio.SelectorManager$1.run(SelectorManager.java:290) at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:745)
Created 10-28-2016 02:10 AM
Yes If you check my config above, I am running from a cluster. 🙂