Created 02-10-2020 01:23 PM
Hi community
I am trying to debug a simple query in spark SQL that is returning incorrect data.
In this instance the query is a simple join between two hive tables .. The issue seems tied to the fact that a the physical plan that spark has generated (with catalyst optimization) looks to be corrupted where some of the steps in the physical plan have not been assigned an evaluation order id and thus all evaluation on the right side of the join is not completed in the spark query
This error is on a HDP3.1.4 cluster running
>>> sc.version
u'2.3.2.3.1.4.0-315'
here is the example query ..
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
filter_1 = hive.executeQuery('select * from 03_score where scores = 5 or scores = 6')
filter_2 = hive.executeQuery('select * from 03_score where scores = 8')
joined_df = filter_1.alias('o').join(filter_2.alias('po'), filter_1.encntr_id == filter_2.encntr_id, how='inner')
joined_df.count() ### shows incorrect value ###
joined_df.explain(True)
and here is the output of plan evaluation
== Physical Plan ==
SortMergeJoin [encntr_id#0], [encntr_id#12], Inner
:- *(2) Sort [encntr_id#0 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(encntr_id#0, 200)
: +- *(1) Filter isnotnull(encntr_id#0)
: +- *(1) DataSourceV2Scan [encntr_id#0, scores_datetime#1, scores#2], com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader@a6df563
+- Sort [encntr_id#12 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(encntr_id#12, 200)
+- Filter isnotnull(encntr_id#12)
+- DataSourceV2Scan [encntr_id#12, dateofbirth#13, postcode#14, event_desc#15, event_performed_dt_tm#16], com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader@60dd22d9
Note that all datasourceV2scan , filter exchange and sort on the right side of the join have not been assigned an order id and therefore never computed.
Can anyone shed some light on this issue for me .. Why would the physical plan which looks correct not be assigned an evaluation order id ?
Created 02-12-2020 02:25 AM
Figured this out internally .
Turns out the spark optimization routine can be affected by the configuration setting
spark.sql.codegen.Maxfields
which can have implications in how spark will optimize the read from 'fat' tables .
In my case the setting was set low which means DAG stages of the read from the right side of the join (the "fat" table) were performed without being assigned to a wholestage codegen .
Important to note that the read of the hive data in either case returned the same results just with a different optimization applied to the physical plan .
Created 02-12-2020 02:25 AM
Figured this out internally .
Turns out the spark optimization routine can be affected by the configuration setting
spark.sql.codegen.Maxfields
which can have implications in how spark will optimize the read from 'fat' tables .
In my case the setting was set low which means DAG stages of the read from the right side of the join (the "fat" table) were performed without being assigned to a wholestage codegen .
Important to note that the read of the hive data in either case returned the same results just with a different optimization applied to the physical plan .