Naive Sharded Aggregation for high cardinality data#104233
Conversation
| params.group_by_two_level_threshold_bytes = 0; | ||
|
|
||
| /// Sharded aggregation does not implement temporary-file spill/merge yet. | ||
| params.max_bytes_before_external_group_by = 0; |
There was a problem hiding this comment.
Disabling params.max_bytes_before_external_group_by here means the sharded path ignores external aggregation and always keeps hash tables in memory. With optimize_aggregation_by_sharding enabled by default, this is a behavior change for queries that previously spilled to disk: they can now hit memory-limit exceptions instead.
Please either (1) keep sharded aggregation disabled by default until spill support exists, or (2) do not take the sharded path when external aggregation settings are configured, so existing memory-safety behavior is preserved.
There was a problem hiding this comment.
params.max_bytes_before_external_group_by is still unconditionally reset to 0 in the sharded path (AggregatingStep::transformPipeline, line 344 in current head), so the earlier concern is still live.
The regression is now visible in this PR itself: multiple integration tests had to explicitly set enable_sharding_aggregator = 0 to keep external GROUP BY behavior (test_max_bytes_ratio_before_external_order_group_by_for_server, test_temporary_data_in_cache, test_tmp_policy, etc.). That means enabling sharded aggregation by default currently removes an existing spill-to-disk safety mechanism for eligible queries.
Please keep sharded aggregation off unless external aggregation is unsupported for that query shape, or gate this path when external-aggregation settings are active.
tbh i noticed this pr while working on perf comparison dashboard, and it looks like we have a lot of degradations, especially in quantile and some group by , some by 5-10x |
Yes, this is expected especially for low cardinality and skewed data. My comment was mainly comparing difference between this PR and #99581 (which is this PR + some optimizations). Both have many degradations. |
|
|
||
| /// Try to pull a new input chunk. | ||
| input.setNeeded(); | ||
| if (input.hasData()) |
There was a problem hiding this comment.
We need to limit the max queue length and don't accept new chunks while already at this limit.
There was a problem hiding this comment.
Yes, good catch. Actually, the original optimized sharded aggregation had it but later I removed it for this naive version because I thought since we split the chunks into smaller parts the memory will be equivalent whether the content is in hash table or in IColumn chunks. But then realized, for low cardinality case, the memory difference would be quite high between queued chunks in BufferedShardByHashTransform.cpp and their memory in hash tables after aggregation.
I have added a limit of 10 chunks at most. Maybe we can also put a limit in terms of rows or bytes.
| /// causing them to cluster into a small subset of hash table buckets. | ||
| /// The golden ratio constant ensures thorough bit mixing with a single multiply. | ||
| /// Combine the mix with Lemire fastrange to map into [0, num_shards) without a divide. | ||
| static constexpr size_t fibonacci_hash_multiplier = 0x9e3779b97f4a7c15ULL; |
There was a problem hiding this comment.
We already have similar logic implemented in scatterBlockByHashGeneric. Maybe we could reuse it.
There was a problem hiding this comment.
I could not use scatterBlockByHashGeneric directly because the signature requires Block and but we only have Chunk and there would be conversion overhead if used. I decided to reuse only JoinCommon::hashToSelector to avoid doing hashing manually.
nickitat
left a comment
There was a problem hiding this comment.
All good, modulo a few comments.
LLVM Coverage Report
Changed lines: 89.34% (218/244) · Uncovered code |
4775f2d
…gation Naive Sharded Aggregation for high cardinality data





Simplified version of #99581 that does not have many of the optimizations like no materialization during scatter, single hashing and reuse of hash, single serialization and reuse of serialized bytes.
There are mostly regressions since data is not usually high cardinality and evenly distributed but If enabled for perf tests, some tests have sped up:









Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
New
GROUP BYoptimization for high cardinality evenly distributed keys that scatters rows across threads by hashing the grouping key, so each thread aggregates a disjoint subset of keys without a merge phase. Setenable_sharding_aggregator = 1to enable it.Documentation entry for user-facing changes
Version info
26.6.1.128