Implement fused TopNAggregation operator for GROUP BY ... ORDER BY aggregate LIMIT K#98607
Implement fused TopNAggregation operator for GROUP BY ... ORDER BY aggregate LIMIT K#98607murphy-4o wants to merge 85 commits into
Conversation
…gregate LIMIT K Add `TopNAggregatingStep` and `TopNAggregatingTransform` that fuse aggregation, sorting, and limiting into a single pass for queries like: SELECT key, max(ts) AS m FROM t GROUP BY key ORDER BY m DESC LIMIT 5 The optimization replaces the Aggregating → Sorting → Limit plan chain with a single `TopNAggregating` step. Two modes are supported: - Mode 1 (sorted input): when the table's ORDER BY key matches the aggregate argument, the operator requests reverse reading from `ReadFromMergeTree` and stops after K unique groups — O(K) memory, minimal rows read. - Mode 2 (unsorted input): processes all rows, aggregates per group, then partial-sorts to select the top K — eliminates the full sort. Supported aggregates: `min`, `max`, `any`, `argMin`, `argMax`. Each declares its compatibility via a new `getTopKAggregateInfo` virtual method on `IAggregateFunction`. The optimization is gated by the `optimize_topn_aggregation` setting (default off). Made-with: Cursor
1. Block optimization when OFFSET > 0 — the optimizer now checks `LimitStep::getOffset` and bails out, preventing incorrect results for `LIMIT K OFFSET N` queries. 2. Eliminate per-row arena growth in key serialization — replaced `serializeValueIntoArena` (which allocated into the arena on every row) with a pure `SipHash`-128 key. Arena is now only used for aggregate state allocation, bounded by number of groups. 3. Preserve full `SortColumnDescription` — the optimizer now copies the original `nulls_direction` and `collator` from the `SortingStep` instead of rebuilding with defaults. Mode 2's `compareAt` uses the correct `nulls_direction` for NULL ordering. 4. Document mode-2 as intentional v1 simplification — the header comment now states that threshold-based pruning is not yet implemented. 5. Add test coverage for OFFSET (negative case) and NULLS FIRST/LAST (correctness comparison between optimized and unoptimized paths). Made-with: Cursor
1. Collision-safe group keys: Replace hash-only `std::string` key with `UInt128` SipHash + exact column comparison via `unordered_multimap`. On hash match, compares actual key column values against stored groups in `result_columns`, eliminating the theoretical collision risk. 2. Collation-aware compare in Mode 2: `generateMode2` now checks for a `collator` in `SortColumnDescription` and uses `compareAtWithCollation` when present, ensuring correct ordering for string ORDER BY with explicit collation (e.g., COLLATE 'en'). 3. Mode 1 safety for argMin/argMax/any: Add `output_ordered_by_sort_key` flag to `TopKAggregateInfo`. Only min/max set it to true (their output is monotonically related to the sort key). The optimizer now only enables Mode 1 (early termination with sorted input) when this flag is true, preventing incorrect results for argMin/argMax where the output column differs from the sort key. 4. Extended test coverage: - Collation-sensitive ordering (COLLATE 'en') optimized vs reference - Mode 1 EXPLAIN PLAN + correctness with small LIMIT - argMin/argMax ASC/DESC correctness parity - Tie-heavy dataset (100 groups, deterministic values) - Multi-aggregate (max + argMax) correctness - EXPLAIN PLAN for argMin optimization Made-with: Cursor
The `.cpp` was out of sync with the `.h` header due to a file-write tool failure in the previous session. The header declared the new collision-safe API (`hashGroupKey`, `findGroupIndex`, `unordered_multimap<UInt128, size_t, UInt128Hash>`), but the `.cpp` still contained the old `serializeGroupKey` returning `std::string` with hash-only identity. This commit makes `.cpp` consistent with `.h`: - `hashGroupKey`: returns `UInt128` SipHash digest of key columns. - `findGroupIndex`: on hash match, performs exact column-by-column comparison against stored keys in `result_columns` via `compareAt`, eliminating the hash-collision correctness risk. - Both `consumeMode1` and `consumeMode2` now use the new `unordered_multimap::emplace` / `equal_range` API. Made-with: Cursor
Key changes: 1. Periodic threshold refresh: After each chunk, `refreshThresholdFromStates` calls `insertResultInto` for all groups' ORDER BY aggregate, uses `IColumn::getPermutation` with limit K to find the K-th best actual aggregate value, and sets a tight boundary. This raises the threshold from the ~1st percentile (first-seen values) to the ~99.99th percentile (actual group maxes), enabling 95-99% of subsequent rows to be skipped. Capped at `limit * 10000` groups to prevent O(N_groups) blowup. 2. Mode 2 gating: Only applies when a `__topKFilter` prewhere can be pushed into `ReadFromMergeTree` (numeric, non-nullable aggregate argument, no existing prewhere). Without prewhere, falls through to the standard Aggregator pipeline which is faster due to type-dispatched hashing. 3. Mode 1 gating fix: Correctly requires `output_ordered_by_sort_key` for Mode 1 (early termination), preventing incorrect results for `argMin`/ `argMax` whose output has different sort order than the sort key. Performance results on 10M rows, 100K groups: - Mode 2 unsorted (uniform): 7ms vs 13ms baseline (1.8x faster, was 3.7x slower) - Mode 2 skewed (50M rows): 9ms vs 26ms baseline (2.9x faster) - Mode 2 memory: 4 MiB vs 271 MiB (67x less) - High cardinality (5M+ groups): 4-5x slower (known limitation) - Non-MergeTree: zero regression (falls through to standard pipeline) Made-with: Cursor
…, comparator caching Major changes to the TopNAggregation unsorted-input path (Mode 2): - Replace old `consumeMode2` (find + insert, per-row threshold check from aggregate states, periodic `refreshThresholdFromStates`) with a simpler direct loop using `HashMap::emplace`, per-chunk threshold estimation from raw input data, and `TopKThresholdTracker::testAndSet`. - Accumulate keys in dedicated `mode2_accumulated_keys` columns instead of reusing `result_columns`, avoiding confusion between Mode 1 and Mode 2. - `generateMode2Partial` emits only local top-K groups as intermediate aggregate state columns (ColumnAggregateFunction), drastically reducing fan-in to the merge transform. - Rewrite `TopNAggregatingMergeTransform`: pre-compute `agg_state_offsets`, `key_column_indices`, `agg_column_indices`; use `ArenaPtr` instead of value `Arena`; inline hash/create/destroy instead of separate methods. - Simplify destructor to unconditionally sweep `group_states`. - Remove unused `refreshThresholdFromStates`, `isBelowThreshold`, `order_agg_arg_col_idx`, `threshold_active` members. Mode 1 improvements: - Use `MergingSortedTransform` to merge N sorted streams instead of `pipeline.resize(1)`, preserving parallelism up to the merge point. - Pass `order_arg_col_name` through `TopNAggregatingStep` for sort desc. Optimizer (`optimizeTopNAggregation`): - Add `group_by_matches_sorting_prefix` guard: skip Mode 2 when GROUP BY keys match a prefix of the table sorting key (standard Aggregator is faster in this case due to two-level hash table with sorted output). - Add `unqualifyColumnName` helper for robust key matching. `FunctionTopKFilter`: - Cache the built comparator function + converted threshold, keyed by a monotonic version counter on `TopKThresholdTracker`. Rebuilds only when the threshold actually changes, avoiding redundant `build` + `convertFieldToType`. - Fix comparator to use `lessOrEquals`/`greaterOrEquals` (was `less`/`greater`), so rows exactly equal to the threshold are kept (required for correctness when the boundary group's argument equals the threshold). `TopKThresholdTracker`: - Add `version` atomic counter, incremented on every `testAndSet` that actually updates the threshold. Enables lock-free staleness detection. Made-with: Cursor
Made-with: Cursor
1. Collision-safe group identity: replace `HashMap<UInt128, size_t>` (SipHash-only key) with `HashMapWithSavedHash<std::string_view, size_t>` backed by `IColumn::serializeValueIntoArena`. This gives exact key comparison with automatic arena rollback for non-new keys via `SerializedKeyHolder`, eliminating the theoretical hash-collision correctness risk. 2. Safe threshold pruning: remove the per-chunk row-level threshold update from `consumeMode2`. Row-level K-th values can be stricter than the true group-level K-th aggregate, which would incorrectly filter rows needed by groups in the final top K. The threshold is now updated only from group-level aggregates in `generateMode2Partial`, which is provably safe (a partial's K-th group aggregate is always <= the global K-th). 3. Fix Mode 2 gating: remove the `group_by_matches_sorting_prefix` check that blocked Mode 2 even when Mode 1 was inapplicable (e.g. GROUP BY matches the sorting key but the ORDER BY aggregate argument is a different column). The `!sorted_input` guard already suffices. 4. Add documentation comments explaining why Mode 2 is intentionally not applied as a generic unsorted fallback, and noting that Mode 2 memory is O(unique groups) without in-stream eviction. 5. Add stress test for large composite keys, skewed ties, and parallel Mode 2 threshold pruning. Made-with: Cursor
The test uses `COLLATE 'en'` which requires ICU. The fast test build is compiled without ICU, causing `SUPPORT_IS_DISABLED` error. Made-with: Cursor
Add a new `topn_aggregation_pruning_level` (UInt64, 0-2) setting to control Mode 2 optimizations independently: - Level 0: direct compute only (no threshold, no filter) - Level 1: + in-transform threshold pruning - Level 2: + dynamic `__topKFilter` prewhere pushdown The dynamic filter (level 2) now respects `use_top_k_dynamic_filtering`, which defaults to false, so it is no longer injected unconditionally. Re-add in-transform threshold pruning (`refreshThresholdFromStates`, `isBelowThreshold`) to `TopNAggregatingTransform` for level 1+ support. Test changes: - Add correctness tests for all three pruning levels on MergeTree table - Add EXPLAIN tests verifying plan differences across levels - Existing EXPLAIN prewhere test now explicitly sets `use_top_k_dynamic_filtering = 1` Made-with: Cursor
The static analyzer cannot prove that `enable_threshold_pruning` being true guarantees `order_arg_col` is non-null. Check the pointer directly instead of the boolean flag to satisfy clang-analyzer-core.NonNullParamChecker. Made-with: Cursor
- Setting description for `topn_aggregation_pruning_level` now explicitly states that level 2 requires `use_top_k_dynamic_filtering` to inject the prewhere, and that level 0 is not recommended (slower than baseline). - Mode 2 now requires `pruning_level >= 1` to activate. Level 0 (direct compute without threshold) is known to regress vs the standard Aggregator pipeline, so it should not silently activate. - EXPLAIN reference for level 0 updated: now shows the standard Aggregating + Sorting + Limit pipeline (no TopNAggregating). Made-with: Cursor
Pass 0 (no limit) to `MergingSortedTransform` instead of the user's LIMIT K. K refers to the number of distinct groups, not total rows. With the previous code, if K consecutive rows belonged to the same group, MergingSorted would stop after K rows and the downstream transform would see fewer than K distinct groups. The backpressure from `TopNAggregatingTransform::consumeMode1` calling `finishConsume` is sufficient to stop reading once K groups are found. Made-with: Cursor
Header: regroup members into logical sections (configuration, column indices, state layout, per-group state, Mode 1/2 columns, threshold). Implementation: extract shared helpers to eliminate duplication: - `computeStateLayout`: state offset/alignment (was duplicated in both constructors) - `findOrderByAggIndex`: ORDER BY aggregate lookup (was duplicated in both constructors and `initColumnIndices`) - `sortAndLimitColumns`: sort-permute-limit pattern (was triplicated in `generateMode2`, `generateMode2Partial`, merge `generate`) - `prepareArgColumnPtrs`: arg column pointer setup (was duplicated in `consumeMode1` and `consumeMode2`) Also make `consumeMode1` use the shared aggregate state helpers (`createAggregateStates`/`addRowToAggregateStates`/etc.) instead of inline state management, and deduplicate the merge loop in `TopNAggregatingMergeTransform::consume`. Net: -20 lines, fewer copy-paste surfaces for future changes. Made-with: Cursor
Avoid large-`LIMIT` regressions by adding a conservative `topn_aggregation_max_limit` gate, and add `EXPLAIN` coverage that verifies default fallback and explicit override behavior. Also reduce threshold refresh overhead in `TopNAggregatingTransform` with adaptive cadence and document `GroupState` intent for future per-group metadata. Made-with: Cursor
Add `buildThresholdKeepMask` to `TopNAggregatingTransform` which uses `IColumn::compareColumn` for vectorized threshold filtering in Mode 2 instead of per-row `isBelowThreshold` calls. This amortizes virtual call overhead and enables SIMD-friendly comparison paths. Add "Design tradeoffs" and "Future improvements" sections to `topngroupby.md` covering: serialized-key HashMap vs type-dispatched tables, threshold refresh cost and adaptive backoff, dynamic filter effectiveness by data distribution, partial top-K correctness, and planned improvements (type-dispatched tables, group eviction, mixed-aggregate two-pass, distributed support, etc.). Made-with: Cursor
Made-with: Cursor
The test uses EXPLAIN to verify query plan structure, which changes when ParallelReplicas is enabled (adds MergingAggregated/Union/ ReadFromRemoteParallelReplicas nodes). This caused the reference output mismatch in the ParallelReplicas stateless test CI job. Made-with: Cursor
|
@cursor review |
…lumns The companion aggregate compatibility loop only checked `determined_by_first_row_direction` but never verified that each companion's determining argument column matches the ORDER BY aggregate's argument column. This allowed, e.g., `argMax(payload, val)` alongside `max(ts)` (val != ts) to be incorrectly optimized: Mode 1 only processes the first row per group sorted by `ts`, missing the row with max `val`; Mode 2 threshold pruning on `ts` can skip rows carrying the true max `val`. Add a check that every non-`any` companion aggregate's determining argument (`argument_names.back()`) equals the ORDER BY aggregate's determining argument. `any()` (`INT_MAX`) remains exempt since it accepts any row by definition. Made-with: Cursor
clang-tidy cppcoreguidelines-init-variables flagged uninitialized `it` (LookupResult) and `group_idx` declarations as warnings treated as errors. Initialize them to `nullptr` and `0` respectively. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
# Conflicts: # src/Processors/QueryPlan/ReadFromLocalReplica.h
Three issues reported by the AI reviewer / Cursor Bugbot: 1. Mode 1 (sorted-input early termination) ignored the MergeTree sorting-key reverse flags. `required_direction` is the SQL direction relative to the column value, while `requestReadingInOrder` expects the direction relative to the storage sorting key. For a descending sorting key (e.g. `ORDER BY val DESC`), this requested the wrong read direction, so `TopNSortedAggregatingTransform` could treat the wrong end of each group as its extreme and stop after the wrong K groups. Now multiply by `getSortingKeyReverseFlags()[0]`, mirroring the standard read-in-order optimizer. 2. The rewrite bypasses `Aggregator`, so it cannot honor the GROUP BY overflow limit (`max_rows_to_group_by` with `group_by_overflow_mode`) nor external aggregation (`max_bytes_before_external_group_by`). Bail out to the standard pipeline when either is configured, instead of silently changing semantics and memory behavior. 3. `TopNAggregatingStep` does not implement plan serialization, so an optimized plan would throw `NOT_IMPLEMENTED` under `serialize_query_plan`. Disable the rewrite when serialized plans are required. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The previous commit gated the rewrite on both `max_rows_to_group_by` and `max_bytes_before_external_group_by`. The latter is non-zero by default: it is derived from `max_bytes_ratio_before_external_group_by` (default 0.5), so `Aggregator::Params::max_bytes_before_external_group_by` resolves to a large value (e.g. ~80 GB) for ordinary queries. Gating on it therefore disabled the optimization for virtually every query. Keep only the `max_rows_to_group_by` guard, which is the genuine semantic gap (`group_by_overflow_mode = 'throw'`/`'break'` cannot be honored by the TopN transforms). Verified that the rewrite now triggers again, that `max_rows_to_group_by` with `throw` correctly falls back and throws `TOO_MANY_ROWS`, and that the descending sorting-key Mode 1 path returns correct results. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Covers the three issues fixed in the previous commits:
1. `Mode 1` early termination on a descending MergeTree sorting key
(`ORDER BY val DESC`) returns the correct top-K.
2. `max_rows_to_group_by` with `group_by_overflow_mode = 'throw'` skips
the rewrite and enforces the limit (`TOO_MANY_ROWS`).
3. `serialize_query_plan = 1` skips the rewrite and runs without a
`NOT_IMPLEMENTED` exception.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Picked this up to get CI green and address the outstanding review blockers. Pushed
Merged Fixed the three review blockers flagged by the AI review / Cursor Bugbot:
New regression test Note: the PR description still says I did not approve, since this isn't my PR. |
|
📊 Cloud Performance Report 🔴 AI verdict: 4 regressed query(s). K_source=6 K_base=30 flagged=8/65 clickbench🔴 3 regressed · 🟢 2 improved · Flagged queries (6 of 43)
q-value = BH-FDR adjusted p; smaller is stronger evidence. MIRAI flags a query when q < fdr_q (default 0.10) — the value the verdict is based on. tpch_adapted_1_official🔴 1 regressed · 🟢 2 improved Flagged queries (3 of 22)
q-value = BH-FDR adjusted p; smaller is stronger evidence. MIRAI flags a query when q < fdr_q (default 0.10) — the value the verdict is based on. Debug info
|
…gregation Two correctness gaps found in review of the TopNAggregation optimization: 1. `Mode 1` (sorted-input early termination) did not guard floating-point determining columns. A `NaN` sorts at one physical end of the MergeTree data, but `ORDER BY ... NULLS FIRST/LAST` may expect it at the other end, so a reverse read could surface `NaN` groups first and early termination after `K` groups would return the wrong top-K. Mirror the standard `optimizeReadInOrder` guard, which treats floats like nullable keys when `nulls_direction == -1`. 2. The parallel-replica partial path rejected only plain `Nullable(T)` via `isNullable`, which is always false for `DataTypeLowCardinality`. So `LowCardinality(Nullable(UInt64))` enabled local threshold pruning and could drop all-null groups that SQL ordering places first. Use `isNullableOrLowCardinalityNullable` to match the single-node `Mode 2` gate. Add `05000_topn_aggregation_float_nan_mode1` covering the float `NaN` guard for both ascending and descending (reverse-key) float sorting keys. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Pushed bb38714 (after merging CI failures resolved. The two Addressed two open review correctness threads:
Added The plan-serialization thread ( |
The optimizer bailed out of the `TopNAggregating` rewrite whenever `max_rows_to_group_by` was nonzero. The CI test profile (`tests/config/users.d/limits.yaml`) sets `max_rows_to_group_by: 10G` globally, so the optimization was disabled for every query in the test environment - all `TopNAggregating` EXPLAIN assertions in `03467_topn_aggregation_explain` and `04312_topn_aggregation_review_regressions` returned 0 in the Fast test. The same would happen on any deployment with a profile-level safety limit. Instead of bailing out, pass `max_rows_to_group_by` / `group_by_overflow_mode` from `Aggregator::Params` into the TopN transforms and enforce them there, mirroring `Aggregator::checkLimits`: `throw` raises a `TOO_MANY_ROWS` exception, `break` stops consuming input, `any` keeps aggregating existing groups without creating new ones. The count is checked per transform (= per stream), like the per-thread check in `Aggregator`, and counts only groups actually created: Mode 1 early termination and Mode 2 threshold pruning can legitimately stay under a limit the standard pipeline would exceed, in the same way rows skipped by a primary-key index do not count toward `max_rows_to_read`. CI: https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=98607&sha=bb3871493c3a57cd8ca6145984d2bac827e05b7a&name_0=PR&name_1=Fast%20test Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
Pushed
Verified locally: all 8 runnable TopN stateless tests pass against their references with |
The TopNAggregation rewrite is intentionally disabled under `serialize_query_plan` because `TopNAggregatingStep` does not implement plan serialization yet. The `distributed plan` CI configuration enables `serialize_query_plan` globally via `tests/config/users.d/distributed_plan.xml`, so the EXPLAIN assertions that expect `TopNAggregating` to be present (`03467_topn_aggregation_explain`, `03469_topn_aggregation_mismatched_companion_arg`, `03470_topn_aggregation_parallel_replicas`, `04312_topn_aggregation_review_regressions`) saw the rewrite disabled and failed with `0` instead of `1`. Pin `serialize_query_plan = 0` at the session level in these tests so the optimization is exercised regardless of the CI configuration. Section (3) of `04312` keeps its explicit per-query `serialize_query_plan = 1` override, which still verifies the fallback path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
LLVM Coverage Report
Changed lines: Changed C/C++ lines covered by tests: 861/994 (86.62%) | Lost baseline coverage: none · Uncovered code |
Found during review: `optimizeTopNAggregation` did not honor `LimitStep::alwaysReadTillEnd`. When `exact_rows_before_limit = 1` (and in the totals-preservation cases) the `LimitTransform` must keep consuming upstream data after producing `K` rows to report the full pre-limit row count and preserve totals semantics. The rewrite drops the `LimitStep`, and `Mode 1` calls `finishConsume` after `K` groups, so an eligible query would stop reading early and violate that contract. Skip the optimization when `limit_step->alwaysReadTillEnd()` is true, the same guard `limitPushDown` and `topKThroughJoin` already use. Added `04337_topn_aggregation_exact_rows_before_limit` as a regression test. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Found during review: Mode 2 threshold pruning was enabled for floating-point determining columns even when the query orders `NULLS FIRST` (`nulls_direction != direction`). The `min`/`max` update rule ranks `NaN` at the worst end — any numeric value replaces a `NaN` accumulator (`SingleValueDataFixed::setIfGreater`/`setIfSmaller`) — while `NULLS FIRST` ordering ranks `NaN` best. Once the K-th boundary became `NaN`, every numeric row was "below the boundary" and got pruned, including rows that should replace an existing group's `NaN` state, so the group incorrectly kept `NaN` and could stay in the top-K. At level 2 the same bad boundary was published to the `__topKFilter` prewhere. Reject floating-point Mode 2 when `nulls_direction != direction`, in both the single-node gate and the parallel-replica per-replica partial path, mirroring the existing `Mode 1` `NaN` guard. Added `04338_topn_aggregation_float_nan_nulls_first_mode2` with a group that receives `NaN` first and a numeric value in a later block. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Found during review: under parallel replicas the per-replica partial rewrite mutates the inner plan of `ReadFromLocalParallelReplicaStep`, but that plan executes under the subquery context — the same SETTINGS shipped to the remote replicas. The rewrite used the initiator's `optimization_settings`, so a subquery-level `optimize_topn_aggregation = 0` or `serialize_query_plan = 1` (or a different `topn_aggregation_pruning_level` / `topn_aggregation_max_limit`) could still give the local replica `TopNAggregating(partial_only)` while the remotes ran the standard aggregation path. Derive the TopN gates from `local_replica->getContext()` before mutating the inner plan, and add the TopN fields to the local-plan override list in `optimizeTree` so the local replica follows the same subquery contracts as the remotes. Added `04339_topn_aggregation_parallel_replicas_subquery_settings`. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…termination) For queries of the form `GROUP BY ... ORDER BY aggregate LIMIT K`, insert a single `TopNAggregatingStep` (below a kept `LimitStep`) in place of the aggregation + sort when the MergeTree table is physically sorted by the ORDER BY aggregate's argument. Reading in physical order, each group's aggregate is determined by its first occurrence, so the scan stops after K distinct groups — O(K) rows and O(K) memory instead of a full aggregate + sort. Supports max/min as the ORDER BY aggregate with any/argMax/argMin companions on the same determining argument. Aggregation is delegated to the standard Aggregator / AggregatedDataVariants (type-dispatched and LowCardinality-aware hash methods, arena-managed states). The LimitStep is kept above the operator so `rows_before_limit_at_least` is still reported. Cardinality gate (early termination only pays off when K is small relative to the GROUP BY cardinality), controlled by two settings: - `topn_aggregation_max_ndv_ratio` (default 0.5): with column `uniq` statistics, apply when `LIMIT <= ndv * ratio`; 0 disables the statistics path. - `topn_aggregation_max_limit` (default 1000): without statistics, apply when `LIMIT <= max_limit`; 0 requires statistics. Falls back to the standard pipeline for nullable sorting-key columns, collation-sensitive ORDER BY, reverse-flagged (DESC) sorting keys, WITH TOTALS / exact_rows_before_limit, max_rows_to_group_by, grouping sets, OFFSET, WITH TIES, and non-MergeTree engines. External aggregation is disabled for the Mode-1 Aggregator (bounded-memory by ~K groups; max_memory_usage still applies). Controlled by `optimize_topn_aggregation` (enabled by default). This is the Mode 1 slice of ClickHouse#98607; Mode 2 (unsorted-input hash aggregation with threshold pruning and dynamic `__topKFilter` prewhere) follows in a separate PR. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…termination) For queries of the form `GROUP BY ... ORDER BY aggregate LIMIT K`, insert a single `TopNAggregatingStep` (below a kept `LimitStep`) in place of the aggregation + sort when the MergeTree table is physically sorted by the ORDER BY aggregate's argument. Reading in physical order, each group's aggregate is determined by its first occurrence, so the scan stops after K distinct groups — O(K) rows and O(K) memory instead of a full aggregate + sort. Supports max/min as the ORDER BY aggregate with any/argMax/argMin companions on the same determining argument. Aggregation is delegated to the standard Aggregator / AggregatedDataVariants (type-dispatched and LowCardinality-aware hash methods, arena-managed states). The LimitStep is kept above the operator so `rows_before_limit_at_least` is still reported. Cardinality gate (early termination only pays off when K is small relative to the GROUP BY cardinality), controlled by two settings: - `topn_aggregation_max_ndv_ratio` (default 0.5): with column `uniq` statistics, apply when `LIMIT <= ndv * ratio`; 0 disables the statistics path. - `topn_aggregation_max_limit` (default 1000): without statistics, apply when `LIMIT <= max_limit`; 0 requires statistics. Falls back to the standard pipeline for nullable sorting-key columns, collation-sensitive ORDER BY, reverse-flagged (DESC) sorting keys, WITH TOTALS / exact_rows_before_limit, max_rows_to_group_by, grouping sets, OFFSET, WITH TIES, and non-MergeTree engines. External aggregation is disabled for the Mode-1 Aggregator (bounded-memory by ~K groups; max_memory_usage still applies). Controlled by `optimize_topn_aggregation` (enabled by default). This is the Mode 1 slice of ClickHouse#98607; Mode 2 (unsorted-input hash aggregation with threshold pruning and dynamic `__topKFilter` prewhere) follows in a separate PR. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

Summary
This PR introduces a fused
TopNAggregatingquery plan operator for queries like:It replaces the
AggregatingStep -> SortingStep -> LimitStepchain with a singleTopNAggregatingStepand supports two execution modes:Kunique groups.__topKFiltertoPREWHEREfor storage-level skipping.Closes #75098
Newly introduced settings
optimize_topn_aggregation(default0): enablesTopNAggregationrewrite.topn_aggregation_pruning_level(default2): controls Mode 2 pruning stack:0: direct compute only1: + in-transform threshold pruning2: + dynamic__topKFilterpushdown (whenuse_top_k_dynamic_filtering=1)topn_aggregation_max_limit(default1000): conservative Mode 2 gate; for largerLIMITvalues optimizer falls back to standard aggregation/sort pipeline to avoid known large-Kregressions.Performance (short)
topn_aggregation_max_limit=1000defaults to conservative fallback and avoids worst cases.Changelog category:
Changelog entry:
Added
optimize_topn_aggregationfor fusedGROUP BY ... ORDER BY aggregate LIMIT Kexecution, withtopn_aggregation_pruning_levelcontrols for Mode 2 pruning andtopn_aggregation_max_limitfor conservative large-Kfallback.Documentation entry for user-facing changes
Made with Cursor
Note
High Risk
Adds a new query-plan rewrite and execution operator that changes aggregation/sorting behavior (including MergeTree PREWHERE injection) behind new settings, so correctness/perf regressions are possible across many queries. It also extends the
IAggregateFunctioninterface and touches parallel-replica planning paths.Overview
Introduces a fused
TopNAggregatingquery-plan optimization forGROUP BY ... ORDER BY <min/max/any/argMin/argMax> ... LIMIT K, replacing theAggregating→Sorting→Limitchain with a singleTopNAggregatingStepthat can either early-terminate on sorted MergeTree input or do hash aggregation with threshold pruning (optionally pushing a dynamic__topKFilterintoPREWHERE).Adds new settings (
optimize_topn_aggregation,topn_aggregation_pruning_level,topn_aggregation_max_limit), extends aggregates withgetTopKAggregateInfo(), and updates threshold/filter logic for float/NaN handling and wrapper columns (Const/Sparse/LowCardinality). Includes extensive stateless tests/EXPLAIN coverage, documentation updates for the optimization, and CI workflow additions (OptimizeClickHouse,NightlyUpload, plus a PR Mintlify docs check).Reviewed by Cursor Bugbot for commit b9065dc. Bugbot is set up for automated code reviews on this repo. Configure here.