Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Spark SQL 2.0 - performance of Plain SQL query in spark vs Dataframe or Dataset

avatar
Expert Contributor

Just wondering if we write a inner join query in plain sql or if we use dataframe api to perform join, do we get the same performance? I can see that the auery and dataframe are pushed to catalyst optimiser from the diagram but just wanted to confirm if i write plain sql queries can i use it for bigdata production usecases or shall i use dataframe for better performance? thank u

1 ACCEPTED SOLUTION

avatar

You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing).

I would recommend to do the "explain" for your own query, see the example below

1) With SQL query

spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explain

Result:

== Physical Plan ==
*Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49]
+- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight
   :- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight
   :  :- *Filter isnotnull(emp_no#38)
   :  :  +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   :     +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46))
   :        +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *Filter isnotnull(dept_no#44)
         +- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d

2) Read via hive and then execute dataframe join

val e = spark.sql("select * from employees.employees")
val d = spark.sql("select * from employees.departments")
val de = spark.sql("select * from employees.dept_emp")
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248]
+- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight
   :- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268]
   :  +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight
   :     :- *Filter isnotnull(dept_no#254)
   :     :  +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]))
   :        +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265))
   :           +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- *Filter isnotnull(emp_no#243)
         +- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employees

It is the same

3) Pure dataframes:

val d = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/departments")
             .toDF(Seq("dept_no", "dept_name"):_*)
val e = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/employees")
             .toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*)
val de = spark.read.format("orc")
              .load("/apps/hive/warehouse/employees.db/departments")
              .toDF(Seq("emp_no", "dept_no"):_*)

d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119]
+- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight
   :- *Project [dept_no#96, dept_name#97, emp_no#137]
   :  +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight
   :     :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97]
   :     :  +- *Filter isnotnull(_col0#91)
   :     :     +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
   :        +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138]
   :           +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132))
   :              +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double)))
      +- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119]
         +- *Filter isnotnull(_col0#101)
            +- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string>

Very similar, just the read is done differently. Especially it is the same type of joins

View solution in original post

3 REPLIES 3

avatar

You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing).

I would recommend to do the "explain" for your own query, see the example below

1) With SQL query

spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explain

Result:

== Physical Plan ==
*Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49]
+- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight
   :- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight
   :  :- *Filter isnotnull(emp_no#38)
   :  :  +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   :     +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46))
   :        +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *Filter isnotnull(dept_no#44)
         +- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d

2) Read via hive and then execute dataframe join

val e = spark.sql("select * from employees.employees")
val d = spark.sql("select * from employees.departments")
val de = spark.sql("select * from employees.dept_emp")
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248]
+- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight
   :- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268]
   :  +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight
   :     :- *Filter isnotnull(dept_no#254)
   :     :  +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]))
   :        +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265))
   :           +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- *Filter isnotnull(emp_no#243)
         +- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employees

It is the same

3) Pure dataframes:

val d = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/departments")
             .toDF(Seq("dept_no", "dept_name"):_*)
val e = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/employees")
             .toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*)
val de = spark.read.format("orc")
              .load("/apps/hive/warehouse/employees.db/departments")
              .toDF(Seq("emp_no", "dept_no"):_*)

d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119]
+- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight
   :- *Project [dept_no#96, dept_name#97, emp_no#137]
   :  +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight
   :     :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97]
   :     :  +- *Filter isnotnull(_col0#91)
   :     :     +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
   :        +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138]
   :           +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132))
   :              +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double)))
      +- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119]
         +- *Filter isnotnull(_col0#101)
            +- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string>

Very similar, just the read is done differently. Especially it is the same type of joins

avatar
Expert Contributor

@Bernhard Walter thanks for the reply.

avatar

If it solves your question, please mark it as accepted so that it shows es resolved in the overview. Thanks