Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

spark physical plan not generating correct sql DAG

avatar
Contributor

Hi community 

I am trying to debug a simple query in spark SQL that is returning incorrect data.

In this instance the query is a simple join between two hive tables .. The issue seems tied to the fact that a the physical plan that spark has generated (with catalyst  optimization) looks to be corrupted where some of the steps in the physical plan have not been assigned an evaluation order id and thus all evaluation on the right side of the join is not completed in the spark query

This error is on a HDP3.1.4 cluster running

 

 

 

>>> sc.version
u'2.3.2.3.1.4.0-315'

 

 

 

here is the example query .. 

 

 

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()

filter_1 = hive.executeQuery('select * from 03_score where scores = 5 or scores = 6')
filter_2 = hive.executeQuery('select * from 03_score where scores = 8')


joined_df = filter_1.alias('o').join(filter_2.alias('po'), filter_1.encntr_id == filter_2.encntr_id, how='inner')
joined_df.count() ### shows incorrect value ### 
joined_df.explain(True)

 

 

and here is the output of plan evaluation

 

 

== Physical Plan ==
 SortMergeJoin [encntr_id#0], [encntr_id#12], Inner
:- *(2) Sort [encntr_id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(encntr_id#0, 200)
:     +- *(1) Filter isnotnull(encntr_id#0)
:        +- *(1) DataSourceV2Scan [encntr_id#0, scores_datetime#1, scores#2], com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader@a6df563
+-  Sort [encntr_id#12 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(encntr_id#12, 200)
      +-  Filter isnotnull(encntr_id#12)
         +- DataSourceV2Scan [encntr_id#12, dateofbirth#13, postcode#14, event_desc#15, event_performed_dt_tm#16], com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader@60dd22d9

 

 

Note that all datasourceV2scan , filter exchange and sort on the right side of the join have not been assigned an order id  and therefore never computed. 

Can anyone shed some light on this issue for me .. Why would the physical plan which looks correct not be assigned an evaluation order id ?

 

1 ACCEPTED SOLUTION

avatar
Contributor

Figured this out internally . 

Turns out the spark optimization routine can be affected by the configuration setting 

spark.sql.codegen.Maxfields

which  can have implications in how spark will optimize the  read  from 'fat' tables .

In my case the setting was set low which means DAG stages of the read from the right side of the join (the "fat" table) were performed without being assigned to a wholestage codegen .

Important to note that the read of the hive data in either case returned the same results just with a different optimization applied to the physical plan . 

 

View solution in original post

1 REPLY 1

avatar
Contributor

Figured this out internally . 

Turns out the spark optimization routine can be affected by the configuration setting 

spark.sql.codegen.Maxfields

which  can have implications in how spark will optimize the  read  from 'fat' tables .

In my case the setting was set low which means DAG stages of the read from the right side of the join (the "fat" table) were performed without being assigned to a wholestage codegen .

Important to note that the read of the hive data in either case returned the same results just with a different optimization applied to the physical plan .