Support Questions
Find answers, ask questions, and share your expertise

Spark SQL 2.0 - performance of Plain SQL query in spark vs Dataframe or Dataset

Solved Go to solution

Spark SQL 2.0 - performance of Plain SQL query in spark vs Dataframe or Dataset

Contributor

Just wondering if we write a inner join query in plain sql or if we use dataframe api to perform join, do we get the same performance? I can see that the auery and dataframe are pushed to catalyst optimiser from the diagram but just wanted to confirm if i write plain sql queries can i use it for bigdata production usecases or shall i use dataframe for better performance? thank u

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Spark SQL 2.0 - performance of Plain SQL query in spark vs Dataframe or Dataset

You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing).

I would recommend to do the "explain" for your own query, see the example below

1) With SQL query

spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explain

Result:

== Physical Plan ==
*Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49]
+- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight
   :- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight
   :  :- *Filter isnotnull(emp_no#38)
   :  :  +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   :     +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46))
   :        +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *Filter isnotnull(dept_no#44)
         +- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d

2) Read via hive and then execute dataframe join

val e = spark.sql("select * from employees.employees")
val d = spark.sql("select * from employees.departments")
val de = spark.sql("select * from employees.dept_emp")
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248]
+- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight
   :- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268]
   :  +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight
   :     :- *Filter isnotnull(dept_no#254)
   :     :  +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]))
   :        +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265))
   :           +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- *Filter isnotnull(emp_no#243)
         +- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employees

It is the same

3) Pure dataframes:

val d = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/departments")
             .toDF(Seq("dept_no", "dept_name"):_*)
val e = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/employees")
             .toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*)
val de = spark.read.format("orc")
              .load("/apps/hive/warehouse/employees.db/departments")
              .toDF(Seq("emp_no", "dept_no"):_*)

d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119]
+- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight
   :- *Project [dept_no#96, dept_name#97, emp_no#137]
   :  +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight
   :     :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97]
   :     :  +- *Filter isnotnull(_col0#91)
   :     :     +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
   :        +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138]
   :           +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132))
   :              +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double)))
      +- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119]
         +- *Filter isnotnull(_col0#101)
            +- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string>

Very similar, just the read is done differently. Especially it is the same type of joins

View solution in original post

3 REPLIES 3

Re: Spark SQL 2.0 - performance of Plain SQL query in spark vs Dataframe or Dataset

You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing).

I would recommend to do the "explain" for your own query, see the example below

1) With SQL query

spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explain

Result:

== Physical Plan ==
*Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49]
+- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight
   :- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight
   :  :- *Filter isnotnull(emp_no#38)
   :  :  +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   :     +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46))
   :        +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *Filter isnotnull(dept_no#44)
         +- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d

2) Read via hive and then execute dataframe join

val e = spark.sql("select * from employees.employees")
val d = spark.sql("select * from employees.departments")
val de = spark.sql("select * from employees.dept_emp")
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248]
+- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight
   :- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268]
   :  +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight
   :     :- *Filter isnotnull(dept_no#254)
   :     :  +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]))
   :        +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265))
   :           +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- *Filter isnotnull(emp_no#243)
         +- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employees

It is the same

3) Pure dataframes:

val d = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/departments")
             .toDF(Seq("dept_no", "dept_name"):_*)
val e = spark.read.format("orc")
             .load("/apps/hive/warehouse/employees.db/employees")
             .toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*)
val de = spark.read.format("orc")
              .load("/apps/hive/warehouse/employees.db/departments")
              .toDF(Seq("emp_no", "dept_no"):_*)

d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain

Result:

== Physical Plan ==
*Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119]
+- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight
   :- *Project [dept_no#96, dept_name#97, emp_no#137]
   :  +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight
   :     :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97]
   :     :  +- *Filter isnotnull(_col0#91)
   :     :     +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
   :        +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138]
   :           +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132))
   :              +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double)))
      +- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119]
         +- *Filter isnotnull(_col0#101)
            +- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string>

Very similar, just the read is done differently. Especially it is the same type of joins

View solution in original post

Re: Spark SQL 2.0 - performance of Plain SQL query in spark vs Dataframe or Dataset

Contributor

@Bernhard Walter thanks for the reply.

Re: Spark SQL 2.0 - performance of Plain SQL query in spark vs Dataframe or Dataset

If it solves your question, please mark it as accepted so that it shows es resolved in the overview. Thanks