Created 12-06-2016 05:33 PM
Just wondering if we write a inner join query in plain sql or if we use dataframe api to perform join, do we get the same performance? I can see that the auery and dataframe are pushed to catalyst optimiser from the diagram but just wanted to confirm if i write plain sql queries can i use it for bigdata production usecases or shall i use dataframe for better performance? thank u
Created 12-08-2016 09:55 AM
You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing).
I would recommend to do the "explain" for your own query, see the example below
1) With SQL query
spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explainResult:
== Physical Plan ==
*Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49]
+- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight
:- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight
: :- *Filter isnotnull(emp_no#38)
: : +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
: +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46))
: +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *Filter isnotnull(dept_no#44)
+- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d
2) Read via hive and then execute dataframe join
val e = spark.sql("select * from employees.employees")
val d = spark.sql("select * from employees.departments")
val de = spark.sql("select * from employees.dept_emp")
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explainResult:
== Physical Plan ==
*Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248]
+- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight
:- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268]
: +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight
: :- *Filter isnotnull(dept_no#254)
: : +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]))
: +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265))
: +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *Filter isnotnull(emp_no#243)
+- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employeesIt is the same
3) Pure dataframes:
val d = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/departments")
.toDF(Seq("dept_no", "dept_name"):_*)
val e = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/employees")
.toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*)
val de = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/departments")
.toDF(Seq("emp_no", "dept_no"):_*)
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explainResult:
== Physical Plan ==
*Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119]
+- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight
:- *Project [dept_no#96, dept_name#97, emp_no#137]
: +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight
: :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97]
: : +- *Filter isnotnull(_col0#91)
: : +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
: +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138]
: +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132))
: +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double)))
+- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119]
+- *Filter isnotnull(_col0#101)
+- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string>Very similar, just the read is done differently. Especially it is the same type of joins
Created 12-08-2016 09:55 AM
You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing).
I would recommend to do the "explain" for your own query, see the example below
1) With SQL query
spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explainResult:
== Physical Plan ==
*Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49]
+- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight
:- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight
: :- *Filter isnotnull(emp_no#38)
: : +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
: +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46))
: +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *Filter isnotnull(dept_no#44)
+- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d
2) Read via hive and then execute dataframe join
val e = spark.sql("select * from employees.employees")
val d = spark.sql("select * from employees.departments")
val de = spark.sql("select * from employees.dept_emp")
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explainResult:
== Physical Plan ==
*Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248]
+- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight
:- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268]
: +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight
: :- *Filter isnotnull(dept_no#254)
: : +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]))
: +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265))
: +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *Filter isnotnull(emp_no#243)
+- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employeesIt is the same
3) Pure dataframes:
val d = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/departments")
.toDF(Seq("dept_no", "dept_name"):_*)
val e = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/employees")
.toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*)
val de = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/departments")
.toDF(Seq("emp_no", "dept_no"):_*)
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explainResult:
== Physical Plan ==
*Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119]
+- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight
:- *Project [dept_no#96, dept_name#97, emp_no#137]
: +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight
: :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97]
: : +- *Filter isnotnull(_col0#91)
: : +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
: +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138]
: +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132))
: +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double)))
+- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119]
+- *Filter isnotnull(_col0#101)
+- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string>Very similar, just the read is done differently. Especially it is the same type of joins
Created 12-08-2016 10:34 AM
@Bernhard Walter thanks for the reply.
Created 12-08-2016 11:02 AM
If it solves your question, please mark it as accepted so that it shows es resolved in the overview. Thanks