Created 11-18-2016 05:11 PM
Is there a way to broadcast a Dataframe/RDD without doing the collect first?
I am thinking this could avoid a copy to the driver first.
I did notice that there is a broadcast function that is used in the broadcast join for the DataFrame.
public static DataFrame broadcast(DataFrame df) //Marks a DataFrame as small enough for use in broadcast joins.The following example marks the right DataFrame for broadcast hash join using joinKey. // left and right are DataFrames left.join(broadcast(right), "joinKey") <code>
It seems that Sparks determines when the broadcasting is needed automatically when it finds that a join operation is needed.
What I am wondering is that if I wanted to use in some other more general context, does the above broadcast function still work, i.e., the broadcast still occurs.
The other thing is after the boradcasting, does the partition concept still exists for the dataframe, e.g. can I still apply functions like mapPartitions to the dataframe?
Thanks
Created 11-22-2016 07:24 PM
Sorry for the late response.
I'm not sure of a way to broadcast the data without collecting to the data driver first. Because we assume the size of hte broadcasted table is small, the collect and broadcast to and from the driver should be fairly quickly. You would have about the same network traffic if you were to someway skip the collect as we need a full copy of the smaller table on each worker anyways.
The join(broadcast(right),) is giving spark a hint to do a broadcast join. So this will override the spark.sql.autoBroadcastJoinThreshold, which is 10mb by default. Don't try to broadcast anything larger than 2gb, as this is the limit for a single block in Spark and you will get an OOM or Overflow exception. The data structure of the blocks are capped at 2gb. This autoBroadcastJoinThreshold only applies to hive tables right now that have statistics previously ran on them. So the broadcast hint is going to be used for dataframes not in Hive or one where statistics haven't been run.
The general Spark Core broadcast function will still work. In fact, underneath the hood, the dataframe is calling the same collect and broadcast that you would with the general api.
The concept of partitions is still there, so after you do a broadcast join, you're free to run mapPartitions on it.
Created 11-22-2016 07:24 PM
Sorry for the late response.
I'm not sure of a way to broadcast the data without collecting to the data driver first. Because we assume the size of hte broadcasted table is small, the collect and broadcast to and from the driver should be fairly quickly. You would have about the same network traffic if you were to someway skip the collect as we need a full copy of the smaller table on each worker anyways.
The join(broadcast(right),) is giving spark a hint to do a broadcast join. So this will override the spark.sql.autoBroadcastJoinThreshold, which is 10mb by default. Don't try to broadcast anything larger than 2gb, as this is the limit for a single block in Spark and you will get an OOM or Overflow exception. The data structure of the blocks are capped at 2gb. This autoBroadcastJoinThreshold only applies to hive tables right now that have statistics previously ran on them. So the broadcast hint is going to be used for dataframes not in Hive or one where statistics haven't been run.
The general Spark Core broadcast function will still work. In fact, underneath the hood, the dataframe is calling the same collect and broadcast that you would with the general api.
The concept of partitions is still there, so after you do a broadcast join, you're free to run mapPartitions on it.