Support Questions

Find answers, ask questions, and share your expertise

Does Broadcast variable works for Dataframe

Hi All,

Can some one please clarify how to create broadcast variable for a Dataframe.

Also help me with an example which would be great.

Thanks in advance.

Regards,

Vijay

5 REPLIES 5

Explorer

Hi @Vijay Kumar J,

You can't create broadcast variable for a DataFrame. If you would like to do broadcast joins, however, if you are using Spark 1.5 or newer, you can still do that like following:

from pyspark.sql import SQLContext
from pyspark.sql.functions import broadcast

sqlContext = SQLContext(sc)
df_tiny = sqlContext.sql('select * from tiny_table')
df_large = sqlContext.sql('select * from massive_table')
df3 = df_large.join(broadcast(df_tiny), df_large.some_sort_of_key == df_tiny.key)

By using broadcast function on your DataFrame, you will mark it as small enough for use in broadcast joins.

@ymoiseev

Thanks for the response. Infact i am using Hive context and joining the two dataframes and doing analytical functions like ranking and row over partition.

Can you please help me how to use this broadcast variable for these dataframes using the java code so that it would be helpful.

Expert Contributor

Broadcast joins are done automatically in Spark. There is a parameter is "spark.sql.autoBroadcastJoinThreshold" which is set to 10mb by default. You should be able to do the join as you would normally and increase the parameter to the size of the smaller dataframe.

@Joe Widen

Thanks for the response. I did tried to increase but there is no change in performance.

Infact please find the below parameters which i have used. I have two datasets where one dataset size will be ~200MB daily and Master dataset will be ~20GB. Please note The Master dataset size increasing daily.

conf.set("spark.shuffle.blockTransferService", "nio"); 
conf.set("spark.files.overwrite","true"); 
conf.set("spark.kryoserializer.buffer", "70"); 
conf.set("spark.driver.extraJavaOptions", "-XX:+UseG1GC");
conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
conf.set("spark.broadcast.compress", "true"); 
conf.set("spark.shuffle.compress", "true"); 
conf.set("spark.shuffle.spill.compress", "true");
conf.set("spark.io.compression.codec","org.apache.spark.io.LZ4CompressionCodec");
conf.set("spark.sql.inMemoryColumnarStorage.compressed", "true"); 
conf.set("spark.sql.autoBroadcastJoinThreshold","100485760");




Kindly suggest me for the above scenario

Expert Contributor

Your auto broadcast join is set to 90mb. Run the code below and then check in the spark ui env tab that its getting set correctly.

conf.set("spark.sql.autoBroadcastJoinThreshold", 1024*1024*200)