- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Rdd/DataFrame/DataSet Performance Tuning
- Labels:
-
Apache Spark
Created 07-23-2016 03:33 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The first, df1, is very large (many gigabytes) compared to df2 (250 Mb).
Right now I'm running this on a cluster of 5 nodes, 16 cores each, 90 GB RAM each. It is taking me about 1 hour and 40 minutes to perform the groupBy, count, and join, which seems very slow to me. Currently I have set the following in my spark-defaults.conf:spark.executor.instances | 24 |
spark.executor.memory | 10g |
spark.executor.cores | 3 |
spark.driver.memory | 5g |
spark.sql.autoBroadcastJoinThreshold | 200Mb |
I have a couple of questions regarding tuning for performance as a beginner.
- Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even DataFrames) be much better?
- What if I used RDDs instead? I know that reduceByKey is better than groupByKey, and DataFrames don't have that method.
- I think I can do a broadcast join and have set a threshold. Do I need to set it above my second DataFrame size? Do I need to explicitly call broadcast(df2)?
- What's the point of driver memory?
- Can anyone point out something wrong with my tuning numbers, or any additional parameters worth checking out?
Thank you a lot!
Sincerely, Jestin
Created 07-26-2016 03:21 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Some thoughts/questions:
- What does the key distribution look like? If lumpy, perhaps a repartition would help? Looking at the Spark UI might give some insight into the bottleneck.
- Bumping up spark.sql.autoBroadcastJoinThreshold to 300M might help ensure that the map-side join (broadcast join) happens. Check here though because it notes "...that currently statistics are only supported for Hive Metastore tables where the command
ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan
has been run."
Other answers:
- Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even DataFrames) be much better?
- Doubt it. There's probably opportunities to tune with what you have that would be needed nonetheless in 2.0.
- What if I used RDDs instead? I know that reduceByKey is better than groupByKey, and DataFrames don't have that method.
- If you want to post more of your code, we can comment on that. Hard to tell if the RDD API's more granular control would help you without the bigger picture.
- I think I can do a broadcast join and have set a threshold. Do I need to set it above my second DataFrame size? Do I need to explicitly call broadcast(df2)?
- Yes, the threshold matters and should be above the data size. Think of this like a map-side join. No, you should not need to call broadcast explicitly. However, if you did you cannot broadcast the dataframe itself. It would have to be a collection loaded up in the driver. Check here for info on broadcast variables: https://spark.apache.org/docs/0.8.1/scala-programming-guide.html#broadcast-variables
- What's the point of driver memory?
- When performing something like "collect" it will bring results back to the driver. If you're collecting a lot of results, you'll need to worry about that driver-memory setting.
- Can anyone point out something wrong with my tuning numbers, or any additional parameters worth checking out?
- Looks good, but we could give more assistance if we have the full code. Also, look at the Spark UI and walk the DAG to see where the bottleneck is.
Created 07-23-2016 06:01 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I wonder if doing a filter would help rather than a join and achive the same results. So instead of join, is it possible to do something like this?
df1.filter(df2).groupBy(key).count().
Created 07-25-2016 10:17 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Unfortunately, I'm doing a full outer join, so I can't filter.
Created 07-23-2016 11:05 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Use a broadcast variable for the smaller table, to join it to the larger table. This will implement a broadcast join, the same as a mapside join and save you quite a bit of network IO and time.
Created 07-26-2016 03:21 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Some thoughts/questions:
- What does the key distribution look like? If lumpy, perhaps a repartition would help? Looking at the Spark UI might give some insight into the bottleneck.
- Bumping up spark.sql.autoBroadcastJoinThreshold to 300M might help ensure that the map-side join (broadcast join) happens. Check here though because it notes "...that currently statistics are only supported for Hive Metastore tables where the command
ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan
has been run."
Other answers:
- Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even DataFrames) be much better?
- Doubt it. There's probably opportunities to tune with what you have that would be needed nonetheless in 2.0.
- What if I used RDDs instead? I know that reduceByKey is better than groupByKey, and DataFrames don't have that method.
- If you want to post more of your code, we can comment on that. Hard to tell if the RDD API's more granular control would help you without the bigger picture.
- I think I can do a broadcast join and have set a threshold. Do I need to set it above my second DataFrame size? Do I need to explicitly call broadcast(df2)?
- Yes, the threshold matters and should be above the data size. Think of this like a map-side join. No, you should not need to call broadcast explicitly. However, if you did you cannot broadcast the dataframe itself. It would have to be a collection loaded up in the driver. Check here for info on broadcast variables: https://spark.apache.org/docs/0.8.1/scala-programming-guide.html#broadcast-variables
- What's the point of driver memory?
- When performing something like "collect" it will bring results back to the driver. If you're collecting a lot of results, you'll need to worry about that driver-memory setting.
- Can anyone point out something wrong with my tuning numbers, or any additional parameters worth checking out?
- Looks good, but we could give more assistance if we have the full code. Also, look at the Spark UI and walk the DAG to see where the bottleneck is.
