05-03-2018 01:06 AM
multiple join , with biggest dataframe -largeDF , smaller b,b_tag dataframes(950MB) , and much smaller other dataframes with :
result = largeDF.join(a,largeDF.first=a.key, "left_outer") .join(b,largeDF.second=b.b_key, "left_outer") .join(b_tag,largeDF.third = b.tag_key, "left_outer") .join(c,largeDF.forth=c.key, "left_outer") .join(d,largeDF.fifth=d.key, "left_outer")
b= broadcast(bDF.select(*(col(x).alias('b_' + x) for x in bDF.columns))) b_tag = broadcast(bDF.select(*(col(x).alias('tag_' + x) for x in bDF.columns)))
i have broadcasted all the small dataframes , but failed to broadcast b, b_tag dataframes.
How can i broadcast bDF dataframe once , cause all the difference with b , b_tag is the key they are joined with largeDF. The reason why I duplicated it is because Spark gives error of duplication of bDF columns.
When I tried to do it with broadcast once , and alias b as b_tag - it failed on error of "same linage" of the two dataframes . the other issue is the size (~950MB) of bDF dataframe. it is more than the size of spark.sql.autoBroadcastJoinThreshold (default is 10MB) , and trying to change this value didn't succeeded for the 2 usages of bDF (only when i broadcasted b , and not b_tag. should be because of the driver memory (i work in client mode) in addition - broadcast is done to every executor right ? so it will be quite heavy
The other thing I thought of is to repartition/coalesce the largeDF before the join. The question is to how many partitions , and on which columns . the data for the largeDF is very skewed , and the default partitioner is doing hashaggregation to all the keys in it , so i believe it doesn't help. will largeDF.partitionBy(hashFunction), that will hash random number, help or not?
can i used bDF once , maybe duplicate it's key column (how?) , and then join it once. the problem is that i need to take the value of bDF (it has key,value format) differently according to the join result , meaning that if largeDF.first=bDF.key it means one thing , and if largeDF.third = bDF.key it means other thing.
the meaning has impact on the resulting dataframe .
result2= result.select(bDF.value.alias("first_meaning"), bDF.value.alias("second_meaning"))