Historically we have kept our daily aggregation tables in Parquet, and maintained them by doing INSERT INTO hourly, which means they are not true aggregations as there is up to 24 entries for a given dim combination in a full day.
So we 'compact' them every now and then by re-aggregating entire monthly partitions, which is extremely resource intensive and time consuming.
We are trying to switch some of these aggregations to Kudu, fully expecting that we could update existing records in place, but sadly it's not working for us and I'm hoping the community has some ideas. This is what we have tried:
1) UPSERTing with something like this:
UPSERT INTO agg_table SELECT agged_facts.dimcol1, agged_facts.dimcol2, CAST(agged_facts.some_count + IFNULL(agg_table.some_count, 0) AS INT) some_count FROM ( SELECT IFNULL(dimcol1, 1) dimcol1, IFNULL(dimcol2, '') dimcol2, CAST(SUM(IFNULL(some_count, 0)) as INT) some_count, FROM fact_table WHERE ... GROUP BY 1,2 ) agged_facts LEFT JOIN agg_table ON agged_facts.dimcol1 = agg_table.dimcol2 AND agged_facts.dimcol2 = agg_table.dimcol2
Which works fine initially, but as the table grows, the join becomes slower and slower (and uses more memory). We thought dynamic filtering would fix the issue, so changed the LEFT JOIN to agg_table to:
LEFT JOIN ( SELECT * FROM agg_table WHERE dimcol1 IN ( SELECT DISTINCT dimcol1 FROM fact_table fact WHERE ... ) AND dimcol2 IN ( SELECT DISTINCT dimcol2 FROM fact_table fact WHERE ... ) ) agg_table ON
But that only helped marginally, because sadly dynamic filtering against Kudu scans uses min/max filters, not Bloom filters, so it doesn't filter scan to specific values of dimcol1 and dimcol2, but rather to any in the range between the min and max values seen, which in our case is usually pretty much all of them.
2) tried to avoid joining altogether:
UPSERT INTO agg_table SELECT agged_facts.dimcol1, agged_facts.dimcol2, CAST(agged_facts.some_count + IFNULL(agg_table.some_count, 0) AS INT) some_count FROM ( SELECT IFNULL(dimcol1, 1) dimcol1, IFNULL(dimcol2, '') dimcol2, CAST(SUM(IFNULL(some_count, 0)) as INT) some_count, FROM fact_table WHERE ... GROUP BY 1,2 ) agged_facts
But that's incorrect syntax: can't sum columns from source and target tables
3) Do updates additively, and compact.
Yes, like for Parquet aggs, but big difference is we don’t need to rewrite entire partitions/tablets, just the specific records with multiple entries, so much less resource intensive and can be done more often.
To enable this we add a 'version' column so we can have multiple rows for same combination of dimension ids, and a 'delete' column to identify those ready for cleanup, so we'd end up with records like:
And after compaction aggregation would look like:
And finally we delete zeroed out records:
This works and we may go with it, BUT we can't figure out a way to make it entirely reliable, because a compaction query failure could update only one of the 2 records above, leaving data incorrect. If we adopt this it's only because we think due to ordering of operations, a failure would affect at most one entry per tablet and in our use case that may be OK.
Updating here myself for others' benefit: tracked it down to the fact that bloom filters are not supported in dynamic filters when joining to a Kudu table, only min/max filters, which are not much help when joining by many different ids/keys at a time. The relevant work to add support for bloom filters is in KUDU-2483 and IMPALA-3741 . Some contributors expressed interest in picking this up and I've heard Cloudera may have some people working on it soon, but nothing concrete at the moment.