New Contributor
Posts: 1
Registered: ‎05-03-2018

how to do the right partitioning with multiple joins on multiple keys

multiple join , with biggest dataframe -largeDF , smaller b,b_tag dataframes(950MB) , and much smaller other dataframes with :

result = largeDF.join(a,largeDF.first=a.key, "left_outer")
                .join(b,largeDF.second=b.b_key, "left_outer")
                .join(b_tag,largeDF.third = b.tag_key, "left_outer")
                .join(c,largeDF.forth=c.key, "left_outer")
                .join(d,largeDF.fifth=d.key, "left_outer")


b=  broadcast(*(col(x).alias('b_' + x) for x in bDF.columns)))
b_tag = broadcast(*(col(x).alias('tag_' + x) for x in bDF.columns)))

i have broadcasted all the small dataframes , but failed to broadcast b, b_tag dataframes.

How can i broadcast bDF dataframe once , cause all the difference with b , b_tag is the key they are joined with largeDF. The reason why I duplicated it is because Spark gives error of duplication of bDF columns.

When I tried to do it with broadcast once , and alias b as b_tag - it failed on error of "same linage" of the two dataframes . the other issue is the size (~950MB) of bDF dataframe. it is more than the size of spark.sql.autoBroadcastJoinThreshold (default is 10MB) , and trying to change this value didn't succeeded for the 2 usages of bDF (only when i broadcasted b , and not b_tag. should be because of the driver memory (i work in client mode) in addition - broadcast is done to every executor right ? so it will be quite heavy

The other thing I thought of is to repartition/coalesce the largeDF before the join. The question is to how many partitions , and on which columns . the data for the largeDF is very skewed , and the default partitioner is doing hashaggregation to all the keys in it , so i believe it doesn't help. will largeDF.partitionBy(hashFunction), that will hash random number, help or not?

can i used bDF once , maybe duplicate it's key column (how?) , and then join it once. the problem is that i need to take the value of bDF (it has key,value format) differently according to the join result , meaning that if largeDF.first=bDF.key it means one thing , and if largeDF.third = bDF.key it means other thing.

the meaning has impact on the resulting dataframe .