Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Is there a way to broadcast a Dataframe/RDD without a collect first?

avatar
Contributor

Is there a way to broadcast a Dataframe/RDD without doing the collect first?

I am thinking this could avoid a copy to the driver first.

I did notice that there is a broadcast function that is used in the broadcast join for the DataFrame.

public static DataFrame broadcast(DataFrame df) //Marks a DataFrame as small enough for use in broadcast joins.The following example marks the right DataFrame for broadcast hash join using joinKey.

   // left and right are DataFrames
   left.join(broadcast(right), "joinKey")
<code>

It seems that Sparks determines when the broadcasting is needed automatically when it finds that a join operation is needed.

What I am wondering is that if I wanted to use in some other more general context, does the above broadcast function still work, i.e., the broadcast still occurs.

The other thing is after the boradcasting, does the partition concept still exists for the dataframe, e.g. can I still apply functions like mapPartitions to the dataframe?

Thanks

1 ACCEPTED SOLUTION

avatar
Super Collaborator

Sorry for the late response.

I'm not sure of a way to broadcast the data without collecting to the data driver first. Because we assume the size of hte broadcasted table is small, the collect and broadcast to and from the driver should be fairly quickly. You would have about the same network traffic if you were to someway skip the collect as we need a full copy of the smaller table on each worker anyways.

The join(broadcast(right),) is giving spark a hint to do a broadcast join. So this will override the spark.sql.autoBroadcastJoinThreshold, which is 10mb by default. Don't try to broadcast anything larger than 2gb, as this is the limit for a single block in Spark and you will get an OOM or Overflow exception. The data structure of the blocks are capped at 2gb. This autoBroadcastJoinThreshold only applies to hive tables right now that have statistics previously ran on them. So the broadcast hint is going to be used for dataframes not in Hive or one where statistics haven't been run.

The general Spark Core broadcast function will still work. In fact, underneath the hood, the dataframe is calling the same collect and broadcast that you would with the general api.

The concept of partitions is still there, so after you do a broadcast join, you're free to run mapPartitions on it.

View solution in original post

1 REPLY 1

avatar
Super Collaborator

Sorry for the late response.

I'm not sure of a way to broadcast the data without collecting to the data driver first. Because we assume the size of hte broadcasted table is small, the collect and broadcast to and from the driver should be fairly quickly. You would have about the same network traffic if you were to someway skip the collect as we need a full copy of the smaller table on each worker anyways.

The join(broadcast(right),) is giving spark a hint to do a broadcast join. So this will override the spark.sql.autoBroadcastJoinThreshold, which is 10mb by default. Don't try to broadcast anything larger than 2gb, as this is the limit for a single block in Spark and you will get an OOM or Overflow exception. The data structure of the blocks are capped at 2gb. This autoBroadcastJoinThreshold only applies to hive tables right now that have statistics previously ran on them. So the broadcast hint is going to be used for dataframes not in Hive or one where statistics haven't been run.

The general Spark Core broadcast function will still work. In fact, underneath the hood, the dataframe is calling the same collect and broadcast that you would with the general api.

The concept of partitions is still there, so after you do a broadcast join, you're free to run mapPartitions on it.