Support Questions

Find answers, ask questions, and share your expertise

Rdd/DataFrame/DataSet Performance Tuning

avatar
Contributor
Hello,Right now I'm using DataFrames to perform a df1.groupBy(key).count() on one DataFrame and join with another, df2.

The first, df1, is very large (many gigabytes) compared to df2 (250 Mb).

Right now I'm running this on a cluster of 5 nodes, 16 cores each, 90 GB RAM each. It is taking me about 1 hour and 40 minutes to perform the groupBy, count, and join, which seems very slow to me. Currently I have set the following in my spark-defaults.conf:
spark.executor.instances24
spark.executor.memory10g
spark.executor.cores3
spark.driver.memory5g
spark.sql.autoBroadcastJoinThreshold200Mb

I have a couple of questions regarding tuning for performance as a beginner.

  1. Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even DataFrames) be much better?
  2. What if I used RDDs instead? I know that reduceByKey is better than groupByKey, and DataFrames don't have that method.
  3. I think I can do a broadcast join and have set a threshold. Do I need to set it above my second DataFrame size? Do I need to explicitly call broadcast(df2)?
  4. What's the point of driver memory?
  5. Can anyone point out something wrong with my tuning numbers, or any additional parameters worth checking out?

Thank you a lot!

Sincerely, Jestin

1 ACCEPTED SOLUTION

avatar
Super Collaborator

Some thoughts/questions:

  • What does the key distribution look like? If lumpy, perhaps a repartition would help? Looking at the Spark UI might give some insight into the bottleneck.
  • Bumping up spark.sql.autoBroadcastJoinThreshold to 300M might help ensure that the map-side join (broadcast join) happens. Check here though because it notes "...that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run."

Other answers:

  1. Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even DataFrames) be much better?
    1. Doubt it. There's probably opportunities to tune with what you have that would be needed nonetheless in 2.0.
  2. What if I used RDDs instead? I know that reduceByKey is better than groupByKey, and DataFrames don't have that method.
    1. If you want to post more of your code, we can comment on that. Hard to tell if the RDD API's more granular control would help you without the bigger picture.
  3. I think I can do a broadcast join and have set a threshold. Do I need to set it above my second DataFrame size? Do I need to explicitly call broadcast(df2)?
    1. Yes, the threshold matters and should be above the data size. Think of this like a map-side join. No, you should not need to call broadcast explicitly. However, if you did you cannot broadcast the dataframe itself. It would have to be a collection loaded up in the driver. Check here for info on broadcast variables: https://spark.apache.org/docs/0.8.1/scala-programming-guide.html#broadcast-variables
  4. What's the point of driver memory?
    1. When performing something like "collect" it will bring results back to the driver. If you're collecting a lot of results, you'll need to worry about that driver-memory setting.
  5. Can anyone point out something wrong with my tuning numbers, or any additional parameters worth checking out?
    1. Looks good, but we could give more assistance if we have the full code. Also, look at the Spark UI and walk the DAG to see where the bottleneck is.

View solution in original post

4 REPLIES 4

avatar
Super Guru

@jestin ma

I wonder if doing a filter would help rather than a join and achive the same results. So instead of join, is it possible to do something like this?

df1.filter(df2).groupBy(key).count().

avatar
Contributor

Unfortunately, I'm doing a full outer join, so I can't filter.

avatar
Super Collaborator

Use a broadcast variable for the smaller table, to join it to the larger table. This will implement a broadcast join, the same as a mapside join and save you quite a bit of network IO and time.

avatar
Super Collaborator

Some thoughts/questions:

  • What does the key distribution look like? If lumpy, perhaps a repartition would help? Looking at the Spark UI might give some insight into the bottleneck.
  • Bumping up spark.sql.autoBroadcastJoinThreshold to 300M might help ensure that the map-side join (broadcast join) happens. Check here though because it notes "...that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run."

Other answers:

  1. Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even DataFrames) be much better?
    1. Doubt it. There's probably opportunities to tune with what you have that would be needed nonetheless in 2.0.
  2. What if I used RDDs instead? I know that reduceByKey is better than groupByKey, and DataFrames don't have that method.
    1. If you want to post more of your code, we can comment on that. Hard to tell if the RDD API's more granular control would help you without the bigger picture.
  3. I think I can do a broadcast join and have set a threshold. Do I need to set it above my second DataFrame size? Do I need to explicitly call broadcast(df2)?
    1. Yes, the threshold matters and should be above the data size. Think of this like a map-side join. No, you should not need to call broadcast explicitly. However, if you did you cannot broadcast the dataframe itself. It would have to be a collection loaded up in the driver. Check here for info on broadcast variables: https://spark.apache.org/docs/0.8.1/scala-programming-guide.html#broadcast-variables
  4. What's the point of driver memory?
    1. When performing something like "collect" it will bring results back to the driver. If you're collecting a lot of results, you'll need to worry about that driver-memory setting.
  5. Can anyone point out something wrong with my tuning numbers, or any additional parameters worth checking out?
    1. Looks good, but we could give more assistance if we have the full code. Also, look at the Spark UI and walk the DAG to see where the bottleneck is.