Sharded Aggregation for high cardinality data#99581
Conversation
| for (size_t i = 0; i < num_rows; ++i) | ||
| { | ||
| size_t row = row_indices[i]; | ||
| size_t current_offset = offsets[static_cast<ssize_t>(row) - 1]; |
There was a problem hiding this comment.
❌ addBatchArrayForRows reads offsets[row - 1] without handling row == 0.
row_indices can include the first row of the chunk, so for row = 0 this becomes offsets[-1], which is an out-of-bounds read (undefined behavior).
Please guard the first-row case explicitly, e.g.:
size_t current_offset = (row == 0) ? 0 : offsets[row - 1];| throw Exception( | ||
| ErrorCodes::LOGICAL_ERROR, | ||
| "Sharded aggregation requires prealloc serialized methods, but row_sizes is empty. " | ||
| "Sharded aggregation should be avoided for non-prealloc methods should be in AggregatingStep"); |
There was a problem hiding this comment.
💡 The exception text has a grammar typo and reads awkwardly:
Sharded aggregation should be avoided for non-prealloc methods should be in AggregatingStep
Please reword it to something clearer, for example:
Sharded aggregation should be avoided for non-prealloc methods in AggregatingStep.
|
Our MVP grew into a MegaVP. Let's break it into a few steps that will be easier to review and discuss in detail. Here is the plan we came up with: I'd perhaps limit ourselves to 4 or even 3 stages instead of 6, but it is up to you. My only big ask is to move iteratively. |
|
|
||
| - [GROUP BY optimization](/sql-reference/statements/select/group-by#group-by-optimization-depending-on-table-sorting-key) | ||
| )", 0) \ | ||
| DECLARE(Bool, optimize_aggregation_by_sharding, true, R"( |
There was a problem hiding this comment.
optimize_aggregation_by_sharding) with new behavior, but the PR still has the "Documentation is written" checkbox unchecked.
Please add user docs for this setting (when to enable it, limitations such as lack of external aggregation spill support, and expected trade-offs), then mark the template checkbox accordingly.
LLVM Coverage Report
Changed lines: 80.38% (762/948) | lost baseline coverage: 15 line(s) · Uncovered code |

Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
New
GROUP BYoptimization for high cardinality keys that distributes rows across threads by hashing the grouping key, so each thread aggregates a disjoint subset of keys without a merge phase. Setoptimize_aggregation_by_sharding = 1to enable it.Documentation entry for user-facing changes