Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

how to do the right partitioning with multiple joins on multiple keys

how to do the right partitioning with multiple joins on multiple keys

New Contributor

multiple join , with biggest dataframe -largeDF , smaller b,b_tag dataframes(950MB) , and much smaller other dataframes with :

result = largeDF.join(a,largeDF.first=a.key, "left_outer")
                .join(b,largeDF.second=b.b_key, "left_outer")
                .join(b_tag,largeDF.third = b.tag_key, "left_outer")
                .join(c,largeDF.forth=c.key, "left_outer")
                .join(d,largeDF.fifth=d.key, "left_outer")

where

b=  broadcast(bDF.select(*(col(x).alias('b_' + x) for x in bDF.columns)))
b_tag = broadcast(bDF.select(*(col(x).alias('tag_' + x) for x in bDF.columns)))

i have broadcasted all the small dataframes , but failed to broadcast b, b_tag dataframes.

How can i broadcast bDF dataframe once , cause all the difference with b , b_tag is the key they are joined with largeDF. The reason why I duplicated it is because Spark gives error of duplication of bDF columns.

When I tried to do it with broadcast once , and alias b as b_tag - it failed on error of "same linage" of the two dataframes . the other issue is the size (~950MB) of bDF dataframe. it is more than the size of spark.sql.autoBroadcastJoinThreshold (default is 10MB) , and trying to change this value didn't succeeded for the 2 usages of bDF (only when i broadcasted b , and not b_tag. should be because of the driver memory (i work in client mode) in addition - broadcast is done to every executor right ? so it will be quite heavy

The other thing I thought of is to repartition/coalesce the largeDF before the join. The question is to how many partitions , and on which columns . the data for the largeDF is very skewed , and the default partitioner is doing hashaggregation to all the keys in it , so i believe it doesn't help. will largeDF.partitionBy(hashFunction), that will hash random number, help or not?

can i used bDF once , maybe duplicate it's key column (how?) , and then join it once. the problem is that i need to take the value of bDF (it has key,value format) differently according to the join result , meaning that if largeDF.first=bDF.key it means one thing , and if largeDF.third = bDF.key it means other thing.

the meaning has impact on the resulting dataframe .

result2= result.select(bDF.value.alias("first_meaning"),
                       bDF.value.alias("second_meaning"))

 thanks