Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here. Want to know more about what has changed? Check out the Community News blog.

spark sql poor join performance

spark sql poor join performance

New Contributor

I'm using SparkSQL to make fact table out of 5 dimensions. I'm facing performance issue (job is taking several hours to complete), and even after exhaustive googleing I see no solution. These are settings I have tried turing, but no sucess.

sqlContext.sql("set spark.sql.shuffle.partitions=10"); // varied between 10 and 5000
sqlContext.sql("set spark.sql.autoBroadcastJoinThreshold=500000000"); // 500 MB, tried 1 GB

Most of RDDs are nicely parittions (500 partitions each), however largest dimension is not partitioned at all (images). Maybe this can lead to solution ? Below is code I have used for making fact table.

resultDmn1.registerTempTable("Dmn1");
    resultDmn2.registerTempTable("Dmn2");
    resultDmn3.registerTempTable("Dmn3");
    resultDmn4.registerTempTable("Dmn4");
    resultDmn5.registerTempTable("Dmn5");

    DataFrame resultFact = sqlContext.sql("SELECT DISTINCT\n" +
            "    0 AS FactId,\n" +
            "    rs.c28 AS c28,\n" +
            "    dop.DmnId AS dmn_id_dim4,\n" +
            "    dh.DmnId AS dmn_id_dim5,\n" +
            "    op.DmnId AS dmn_id_dim3,\n" +
            "    du.DmnId AS dmn_id_dim2,\n" +
            "    dc.DmnId AS dmn_id_dim1\n" +
            "FROM\n" +
            "    t10 rs\n" +
            "        JOIN\n" +
            "    t11 r ON rs.c29 = r.id\n" +
            "        JOIN\n" +
            "    Dmn4 dop ON dop.c26 = r.c25\n" +
            "        JOIN\n" +
            "    Dmn5 dh ON dh.Date = r.c27\n" +
            "        JOIN\n" +
            "    Dmn3 du ON du.c9 = r.c16\n" +
            "        JOIN\n" +
            "    t1 d ON r.c5 = d.id\n" +
            "        JOIN\n" +
            "    t2 di ON d.id = di.c5\n" +
            "        JOIN\n" +
            "    t3 s ON d.c6 = s.id\n" +
            "        JOIN\n" +
            "    t4 p ON s.c7 = p.id\n" +
            "        JOIN\n" +
            "    t5 o ON p.c8 = o.id\n" +
            "        JOIN\n" +
            "    Dmn1 op ON op.c1 = di.c1\n" +
            "        JOIN\n" +
            "    t9 ci ON ci.id = r.c24\n" +
            "        JOIN\n" +
            "    Dmn3 dc ON dc.c18 = ci.c23\n" +
            "WHERE\n" +
            "    op.c2 = di.c2\n" +
            "        AND o.name = op.c30\n" +
            "        AND di.c3 = op.c3\n" +
            "        AND di.c4 = op.c4").toSchemaRDD();

     resultFact.count();
     resultFact.cache();

Dmn1 has 56 rows, dmn2 11, dmn3 10, dmn4 12, and dmn5 1275533 rows prior this join. Everything is running on one master + 2 slaves.

Here is result of explain: http://pastebin.com/ZRUdUuYT