Support Questions

Find answers, ask questions, and share your expertise

Impala on Kudu SQL with inner joins multiple times to same table

Explorer

Query:

 


SELECT
SUM(fpe.measure_EncounterInstanceCount) AS TOTAL_INSTANCE_COUNT
FROM
fact_PatientEncounter fpe
inner join
(
SELECT DISTINCT
CC1.DispatchedDate_Encoded,
CC1.FactPatientEncounter_HashKey
FROM
dim_PatientEncounter_CategorizedCollections cc1
WHERE
((cc1.dispatcheddate_encoded >= 20170101) and (cc1.dispatcheddate_encoded <= 20180417))
AND cc1.Collection = 'TEST'
AND ((CC1.Value = 'val1') OR (CC1.Value = 'val2'))
) M
ON fPE.DispatchedDate_Encoded = M.DispatchedDate_Encoded
AND fPE.FactPatientEncounter_HashKey = M.FactPatientEncounter_HashKey
INNER JOIN
(
SELECT DISTINCT
cc2.DispatchedDate_Encoded,
cc2.FactPatientEncounter_HashKey
FROM
dim_PatientEncounter_CategorizedCollections cc2
WHERE
((cc2.dispatcheddate_encoded >= 20170101) and (cc2.dispatcheddate_encoded <= 20180417))
AND cc2.Collection = 'TEST'
AND ((cc2.Value = 'val3') OR (cc2.Value = 'val4'))
) N
ON fpe.DispatchedDate_Encoded = N.DispatchedDate_Encoded
AND fpe.FactPatientEncounter_HashKey = N.FactPatientEncounter_HashKey

where
((fpe.RecordStatusDeleted IS NULL) and (fpe.RecordStatus IS NULL))
AND ((fpe.dispatcheddate_encoded >= 20170101) and (fpe.dispatcheddate_encoded <= 20180417))

 

Returns nulls. The same query with the same dataset returns data fine on SQL Server. Also, each component of the query if run separately also works fine. Is there any Impala/kudu bugs with joining the same table multiple times?

 

Thanks.

7 REPLIES 7

Expert Contributor

Hi,

 

Is it possible to reproduce this with a smaller sample data set that you can share? I'm not aware of any such bugs but it's possible you've discovered something new.

 

-Todd

Master Collaborator

Might be a known Impala planner bug. What version of Impala are you running? Can you share the EXPLAIN output?

Explorer
Using Cloudera 5.14 and whatever Impala comes with it.


Explain String
Max Per-Host Resource Reservation: Memory=11.75MB
Per-Host Resource Estimates: Memory=63.88MB
PLAN-ROOT SINK
|
15:AGGREGATE [FINALIZE]
| output: sum:merge(fpe.measure_EncounterInstanceCount)
|
14:EXCHANGE [UNPARTITIONED]
|
07:AGGREGATE
| output: sum(fpe.measure_EncounterInstanceCount)
|
"06:HASH JOIN [INNER JOIN, PARTITIONED]"
"| hash predicates: cc2.DispatchedDate_Encoded =
fpe.DispatchedDate_Encoded, cc2.FactPatientEncounter_HashKey =
fpe.FactPatientEncounter_HashKey"
"| runtime filters: RF002 <- fpe.DispatchedDate_Encoded, RF003 <-
fpe.FactPatientEncounter_HashKey"
|
"|--13:EXCHANGE
[HASH(fpe.DispatchedDate_Encoded,fpe.FactPatientEncounter_HashKey)]"
| |
"| 05:HASH JOIN [INNER JOIN, BROADCAST]"
"| | hash predicates: fPE.DispatchedDate_Encoded =
CC1.DispatchedDate_Encoded, fPE.FactPatientEncounter_HashKey =
CC1.FactPatientEncounter_HashKey"
"| | runtime filters: RF006 <- CC1.DispatchedDate_Encoded, RF007 <-
CC1.FactPatientEncounter_HashKey"
| |
| |--12:EXCHANGE [BROADCAST]
| | |
| | 11:AGGREGATE [FINALIZE]
"| | | group by: CC1.DispatchedDate_Encoded,
CC1.FactPatientEncounter_HashKey"
| | |
"| | 10:EXCHANGE
[HASH(CC1.DispatchedDate_Encoded,CC1.FactPatientEncounter_HashKey)]"
| | |
| | 02:AGGREGATE [STREAMING]
"| | | group by: CC1.DispatchedDate_Encoded,
CC1.FactPatientEncounter_HashKey"
| | |
| | 01:SCAN KUDU [kudu_40m.dim_patientencounter_categorizedcollections
cc1]
"| | kudu predicates: CC1.Value IN ('Morphine', 'Fentanyl'),
(cc1.dispatcheddate_encoded <= 20180417), (cc1.dispatcheddate_encoded >=
20170101), cc1.Collection = 'EMS_INTERVENTIONS'"
| |
| 00:SCAN KUDU [kudu_40m.fact_patientencounter fpe]
"| kudu predicates: (fpe.RecordStatus IS NULL),
(fpe.RecordStatusDeleted IS NULL), (fpe.dispatcheddate_encoded <=
20180417), (fpe.dispatcheddate_encoded >= 20170101)"
"| runtime filters: RF006 -> fPE.DispatchedDate_Encoded, RF007 ->
fPE.FactPatientEncounter_HashKey"
|
09:AGGREGATE [FINALIZE]
"| group by: cc2.DispatchedDate_Encoded, cc2.FactPatientEncounter_HashKey"
|
"08:EXCHANGE
[HASH(cc2.DispatchedDate_Encoded,cc2.FactPatientEncounter_HashKey)]"
|
04:AGGREGATE [STREAMING]
"| group by: cc2.DispatchedDate_Encoded, cc2.FactPatientEncounter_HashKey"
|
03:SCAN KUDU [kudu_40m.dim_patientencounter_categorizedcollections cc2]
" kudu predicates: cc2.Value IN ('Naloxone', 'Narcan'),
(cc2.dispatcheddate_encoded <= 20180417), (cc2.dispatcheddate_encoded >=
20170101), cc2.Collection = 'EMS_INTERVENTIONS'"
" runtime filters: RF002 -> cc2.dispatcheddate_encoded, RF003 ->
cc2.factpatientencounter_hashkey"

Master Collaborator

Thanks. The plan looks good to me. I'm not aware of any issue like this one.

 

Would you be able to help us debug the query? This might be helpful:

- Are the results deterministically wrong or are they non-deterministic?

- Do you think column value sare wrong or are we producing an incorrect number of rows somewhere?

- Looking at the rows produced by different operators in the SUMMARY of the query profile might help pinpoint where things are going wrong

- Try running the query with straight_join and broadcast hints and see if the query results change. Like this:

 

SELECT /* +straight_join */ SUM(...)

FROM fpe

 INNER JOIN /* +broadcast */ (SELECT DISTINCT ...) M ON (...)

 INNER JOIN /* +broadcast */ (SELECT DISTINCT ...) N ON (...)

WHERE ...

 

 

 

Explorer

Adding the Broadcast query hint did not help, still get null. Thanks.

Expert Contributor

Since Kudu does not support ACID transactions I wonder if it's possible you could be hitting issues with data consistency? Is the fact_PatientEncounter table actively receiving writes? If so, I wonder if fpe.measure_EncounterInstanceCount could be NULL for some row while fpe.RecordStatusDeleted or fpe.RecordStatus is also NULL (even for a short period of time).

New Contributor

Please check the query again.

 

Subqueries M and N have the exact same filters except that in M you filter for Value = 'val1' or 'val2' and in N you filter for Value = 'val3' or 'val4'.

 

So, this would make the the rows returned by M and N mutually exclusive, hence NULL when you do an inner join between them.