Can some one please clarify how to create broadcast variable for a Dataframe.
Also help me with an example which would be great.
Thanks in advance.
Hi @Vijay Kumar J,
You can't create broadcast variable for a DataFrame. If you would like to do broadcast joins, however, if you are using Spark 1.5 or newer, you can still do that like following:
from pyspark.sql import SQLContext from pyspark.sql.functions import broadcast sqlContext = SQLContext(sc) df_tiny = sqlContext.sql('select * from tiny_table') df_large = sqlContext.sql('select * from massive_table') df3 = df_large.join(broadcast(df_tiny), df_large.some_sort_of_key == df_tiny.key)
By using broadcast function on your DataFrame, you will mark it as small enough for use in broadcast joins.
Thanks for the response. Infact i am using Hive context and joining the two dataframes and doing analytical functions like ranking and row over partition.
Can you please help me how to use this broadcast variable for these dataframes using the java code so that it would be helpful.
Broadcast joins are done automatically in Spark. There is a parameter is "spark.sql.autoBroadcastJoinThreshold" which is set to 10mb by default. You should be able to do the join as you would normally and increase the parameter to the size of the smaller dataframe.
Thanks for the response. I did tried to increase but there is no change in performance.
Infact please find the below parameters which i have used. I have two datasets where one dataset size will be ~200MB daily and Master dataset will be ~20GB. Please note The Master dataset size increasing daily.
conf.set("spark.shuffle.blockTransferService", "nio"); conf.set("spark.files.overwrite","true"); conf.set("spark.kryoserializer.buffer", "70"); conf.set("spark.driver.extraJavaOptions", "-XX:+UseG1GC"); conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.broadcast.compress", "true"); conf.set("spark.shuffle.compress", "true"); conf.set("spark.shuffle.spill.compress", "true"); conf.set("spark.io.compression.codec","org.apache.spark.io.LZ4CompressionCodec"); conf.set("spark.sql.inMemoryColumnarStorage.compressed", "true"); conf.set("spark.sql.autoBroadcastJoinThreshold","100485760");
Kindly suggest me for the above scenario
Your auto broadcast join is set to 90mb. Run the code below and then check in the spark ui env tab that its getting set correctly.