Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to maintain a true aggregation in Kudu ingesting via Impala query

Highlighted

How to maintain a true aggregation in Kudu ingesting via Impala query

Contributor

Historically we have kept our daily aggregation tables in Parquet, and maintained them by doing INSERT INTO hourly, which means they are not true aggregations as there is up to 24 entries for a given dim combination in a full day. 

So we 'compact' them every now and then by re-aggregating entire monthly partitions, which is extremely resource intensive and time consuming.

We are trying to switch some of these aggregations to Kudu, fully expecting that we could update existing records in place, but sadly it's not working for us and I'm hoping the community has some ideas.  This is what we have tried:


1) UPSERTing with something like this:

 

 

UPSERT INTO agg_table 
SELECT 
agged_facts.dimcol1,
agged_facts.dimcol2,
CAST(agged_facts.some_count + IFNULL(agg_table.some_count, 0) AS INT) some_count
FROM (
SELECT
IFNULL(dimcol1, 1) dimcol1,
IFNULL(dimcol2, '') dimcol2,
CAST(SUM(IFNULL(some_count, 0)) as INT) some_count,
FROM fact_table
WHERE ...
GROUP BY 1,2
) agged_facts
LEFT JOIN agg_table ON
agged_facts.dimcol1 = agg_table.dimcol2 AND
agged_facts.dimcol2 = agg_table.dimcol2

 

 

Which works fine initially, but as the table grows, the join becomes slower and slower (and uses more memory). We thought dynamic filtering would fix the issue, so changed the LEFT JOIN to agg_table to:

 

 

LEFT JOIN (
SELECT *
FROM agg_table
WHERE 
dimcol1 IN (
SELECT DISTINCT dimcol1
FROM fact_table fact
WHERE ...
) AND
dimcol2 IN (
SELECT DISTINCT dimcol2
FROM fact_table fact
WHERE ...
) 
) agg_table ON

 

 

But that only helped marginally, because sadly dynamic filtering against Kudu scans uses min/max filters, not Bloom filters, so it doesn't filter scan to specific values of dimcol1 and dimcol2, but rather to any in the range between the min and max values seen, which in our case is usually pretty much all of them.


2) tried to avoid joining altogether:

 

 

UPSERT INTO agg_table 
SELECT 
agged_facts.dimcol1,
agged_facts.dimcol2,
CAST(agged_facts.some_count + IFNULL(agg_table.some_count, 0) AS INT) some_count
FROM (
SELECT
IFNULL(dimcol1, 1) dimcol1,
IFNULL(dimcol2, '') dimcol2,
CAST(SUM(IFNULL(some_count, 0)) as INT) some_count,
FROM fact_table
WHERE ...
GROUP BY 1,2
) agged_facts

 

 

But that's incorrect syntax: can't sum columns from source and target tables


3) Do updates additively, and compact.

 

Yes, like for Parquet aggs, but big difference is we don’t need to rewrite entire partitions/tablets, just the specific records with multiple entries, so much less resource intensive and can be done more often.

To enable this we add a 'version' column so we can have multiple rows for same combination of dimension ids, and a 'delete' column to identify those ready for cleanup, so we'd end up with records like:

 

dimcol1dimcol2versionto_deletesome_count
1X1false50 
1X999false30

 

And after compaction aggregation would look like:

 

dimcol1dimcol2versionto_deletesome_count
1X1true0
1X999false80

 

And finally we delete zeroed out records:

 

dimcol1dimcol2versionto_deletesome_count
1X999false80


This works and we may go with it, BUT we can't figure out a way to make it entirely reliable, because a compaction query failure could update only one of the 2 records above, leaving data incorrect. If we adopt this it's only because we think due to ordering of operations, a failure would affect at most one entry per tablet and in our use case that may be OK.

Don't have an account?
Coming from Hortonworks? Activate your account here