Created 12-06-2016 05:33 PM
Just wondering if we write a inner join query in plain sql or if we use dataframe api to perform join, do we get the same performance? I can see that the auery and dataframe are pushed to catalyst optimiser from the diagram but just wanted to confirm if i write plain sql queries can i use it for bigdata production usecases or shall i use dataframe for better performance? thank u
Created 12-08-2016 09:55 AM
You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing).
I would recommend to do the "explain" for your own query, see the example below
1) With SQL query
spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explain
Result:
== Physical Plan == *Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49] +- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight :- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight : :- *Filter isnotnull(emp_no#38) : : +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46)) : +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *Filter isnotnull(dept_no#44) +- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d
2) Read via hive and then execute dataframe join
val e = spark.sql("select * from employees.employees") val d = spark.sql("select * from employees.departments") val de = spark.sql("select * from employees.dept_emp") d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain
Result:
== Physical Plan == *Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248] +- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight :- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268] : +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight : :- *Filter isnotnull(dept_no#254) : : +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false])) : +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265)) : +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *Filter isnotnull(emp_no#243) +- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employees
It is the same
3) Pure dataframes:
val d = spark.read.format("orc") .load("/apps/hive/warehouse/employees.db/departments") .toDF(Seq("dept_no", "dept_name"):_*) val e = spark.read.format("orc") .load("/apps/hive/warehouse/employees.db/employees") .toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*) val de = spark.read.format("orc") .load("/apps/hive/warehouse/employees.db/departments") .toDF(Seq("emp_no", "dept_no"):_*) d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain
Result:
== Physical Plan == *Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119] +- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight :- *Project [dept_no#96, dept_name#97, emp_no#137] : +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight : :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97] : : +- *Filter isnotnull(_col0#91) : : +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true])) : +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138] : +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132)) : +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double))) +- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119] +- *Filter isnotnull(_col0#101) +- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string>
Very similar, just the read is done differently. Especially it is the same type of joins
Created 12-08-2016 09:55 AM
You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing).
I would recommend to do the "explain" for your own query, see the example below
1) With SQL query
spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explain
Result:
== Physical Plan == *Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49] +- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight :- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight : :- *Filter isnotnull(emp_no#38) : : +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46)) : +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *Filter isnotnull(dept_no#44) +- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d
2) Read via hive and then execute dataframe join
val e = spark.sql("select * from employees.employees") val d = spark.sql("select * from employees.departments") val de = spark.sql("select * from employees.dept_emp") d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain
Result:
== Physical Plan == *Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248] +- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight :- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268] : +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight : :- *Filter isnotnull(dept_no#254) : : +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false])) : +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265)) : +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *Filter isnotnull(emp_no#243) +- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employees
It is the same
3) Pure dataframes:
val d = spark.read.format("orc") .load("/apps/hive/warehouse/employees.db/departments") .toDF(Seq("dept_no", "dept_name"):_*) val e = spark.read.format("orc") .load("/apps/hive/warehouse/employees.db/employees") .toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*) val de = spark.read.format("orc") .load("/apps/hive/warehouse/employees.db/departments") .toDF(Seq("emp_no", "dept_no"):_*) d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain
Result:
== Physical Plan == *Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119] +- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight :- *Project [dept_no#96, dept_name#97, emp_no#137] : +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight : :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97] : : +- *Filter isnotnull(_col0#91) : : +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true])) : +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138] : +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132)) : +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double))) +- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119] +- *Filter isnotnull(_col0#101) +- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string>
Very similar, just the read is done differently. Especially it is the same type of joins
Created 12-08-2016 10:34 AM
@Bernhard Walter thanks for the reply.
Created 12-08-2016 11:02 AM
If it solves your question, please mark it as accepted so that it shows es resolved in the overview. Thanks