Support Questions

Find answers, ask questions, and share your expertise

Spark SQL 2.0 - performance of Plain SQL query in spark vs Dataframe or Dataset

avatar
Expert Contributor

Just wondering if we write a inner join query in plain sql or if we use dataframe api to perform join, do we get the same performance? I can see that the auery and dataframe are pushed to catalyst optimiser from the diagram but just wanted to confirm if i write plain sql queries can i use it for bigdata production usecases or shall i use dataframe for better performance? thank u

1 ACCEPTED SOLUTION

avatar

You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing).

I would recommend to do the "explain" for your own query, see the example below

1) With SQL query

spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explain

Result:

== Physical Plan ==
*Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49]
+- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight
   :- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight
   :  :- *Filter isnotnull(emp_no#38)
   :  :  +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   :     +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46))
   :        +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *Filter isnotnull(dept_no#44)
         +- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d

2) Read via hive and then execute dataframe join

val e = spark.sql("select * from employees.employees")
val d = spark.sql("select * from employees.departments")
val de = spark.sql("select * from employees.dept_emp")
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248]
+- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight
   :- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268]
   :  +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight
   :     :- *Filter isnotnull(dept_no#254)
   :     :  +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]))
   :        +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265))
   :           +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- *Filter isnotnull(emp_no#243)
         +- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employees

It is the same

3) Pure dataframes:

val d = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/departments")
             .toDF(Seq("dept_no", "dept_name"):_*)
val e = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/employees")
             .toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*)
val de = spark.read.format("orc")
              .load("/apps/hive/warehouse/employees.db/departments")
              .toDF(Seq("emp_no", "dept_no"):_*)

d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119]
+- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight
   :- *Project [dept_no#96, dept_name#97, emp_no#137]
   :  +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight
   :     :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97]
   :     :  +- *Filter isnotnull(_col0#91)
   :     :     +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
   :        +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138]
   :           +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132))
   :              +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double)))
      +- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119]
         +- *Filter isnotnull(_col0#101)
            +- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string>

Very similar, just the read is done differently. Especially it is the same type of joins

View solution in original post

3 REPLIES 3

avatar

You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing).

I would recommend to do the "explain" for your own query, see the example below

1) With SQL query

spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explain

Result:

== Physical Plan ==
*Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49]
+- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight
   :- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight
   :  :- *Filter isnotnull(emp_no#38)
   :  :  +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   :     +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46))
   :        +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *Filter isnotnull(dept_no#44)
         +- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d

2) Read via hive and then execute dataframe join

val e = spark.sql("select * from employees.employees")
val d = spark.sql("select * from employees.departments")
val de = spark.sql("select * from employees.dept_emp")
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248]
+- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight
   :- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268]
   :  +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight
   :     :- *Filter isnotnull(dept_no#254)
   :     :  +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]))
   :        +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265))
   :           +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- *Filter isnotnull(emp_no#243)
         +- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employees

It is the same

3) Pure dataframes:

val d = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/departments")
             .toDF(Seq("dept_no", "dept_name"):_*)
val e = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/employees")
             .toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*)
val de = spark.read.format("orc")
              .load("/apps/hive/warehouse/employees.db/departments")
              .toDF(Seq("emp_no", "dept_no"):_*)

d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119]
+- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight
   :- *Project [dept_no#96, dept_name#97, emp_no#137]
   :  +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight
   :     :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97]
   :     :  +- *Filter isnotnull(_col0#91)
   :     :     +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
   :        +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138]
   :           +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132))
   :              +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double)))
      +- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119]
         +- *Filter isnotnull(_col0#101)
            +- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string>

Very similar, just the read is done differently. Especially it is the same type of joins

avatar
Expert Contributor

@Bernhard Walter thanks for the reply.

avatar

If it solves your question, please mark it as accepted so that it shows es resolved in the overview. Thanks