Sharded Aggregation for high cardinality data by nihalzp · Pull Request #99581 · ClickHouse/ClickHouse · GitHub
Skip to content

Sharded Aggregation for high cardinality data#99581

Open
nihalzp wants to merge 108 commits into
ClickHouse:masterfrom
nihalzp:sharded-aggregation
Open

Sharded Aggregation for high cardinality data#99581
nihalzp wants to merge 108 commits into
ClickHouse:masterfrom
nihalzp:sharded-aggregation

Conversation

@nihalzp

@nihalzp nihalzp commented Mar 16, 2026

Copy link
Copy Markdown
Member

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

New GROUP BY optimization for high cardinality keys that distributes rows across threads by hashing the grouping key, so each thread aggregates a disjoint subset of keys without a merge phase. Set optimize_aggregation_by_sharding = 1 to enable it.

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

@clickhouse-gh

clickhouse-gh Bot commented Mar 16, 2026

Copy link
Copy Markdown
Contributor

@clickhouse-gh clickhouse-gh Bot added the pr-performance Pull request with some performance improvements label Mar 16, 2026
Comment thread src/Core/Settings.cpp
for (size_t i = 0; i < num_rows; ++i)
{
size_t row = row_indices[i];
size_t current_offset = offsets[static_cast<ssize_t>(row) - 1];

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addBatchArrayForRows reads offsets[row - 1] without handling row == 0.

row_indices can include the first row of the chunk, so for row = 0 this becomes offsets[-1], which is an out-of-bounds read (undefined behavior).

Please guard the first-row case explicitly, e.g.:

size_t current_offset = (row == 0) ? 0 : offsets[row - 1];

throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Sharded aggregation requires prealloc serialized methods, but row_sizes is empty. "
"Sharded aggregation should be avoided for non-prealloc methods should be in AggregatingStep");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 The exception text has a grammar typo and reads awkwardly:

Sharded aggregation should be avoided for non-prealloc methods should be in AggregatingStep

Please reword it to something clearer, for example:

Sharded aggregation should be avoided for non-prealloc methods in AggregatingStep.

@nickitat nickitat self-assigned this May 5, 2026
@nickitat

nickitat commented May 5, 2026

Copy link
Copy Markdown
Member

Our MVP grew into a MegaVP. Let's break it into a few steps that will be easier to review and discuss in detail. Here is the plan we came up with:

● What gets removed if scatter materializes contiguous per-shard columns

  ┌──────────────────────────────────────────────────────────────────────────────────────────────────┬──────────────┐
  │                                            Component                                             │  LoC saved   │
  ├──────────────────────────────────────────────────────────────────────────────────────────────────┼──────────────┤
  │ *ForRows virtuals + default impls in IAggregateFunction.h                                        │ ~78          │
  ├──────────────────────────────────────────────────────────────────────────────────────────────────┼──────────────┤
  │ 8 function overrides (Sum, Avg, MinMax, Any, Bitwise, ArgMinMax, Quantile, -If)                  │ ~286         │
  ├──────────────────────────────────────────────────────────────────────────────────────────────────┼──────────────┤
  │ findExtreme*ForRows (h+cpp)                                                                      │ ~55          │
  ├──────────────────────────────────────────────────────────────────────────────────────────────────┼──────────────┤
  │ SingleValueData*ForRows (h+cpp)                                                                  │ ~164         │
  ├──────────────────────────────────────────────────────────────────────────────────────────────────┼──────────────┤
  │ Aggregator::executeForRows + executeImpl*ForRows family                                          │ ~200         │
  ├──────────────────────────────────────────────────────────────────────────────────────────────────┼──────────────┤
  │ ShardedChunkInfo::row_indices + selector-passing through processors                              │ ~10          │
  ├──────────────────────────────────────────────────────────────────────────────────────────────────┼──────────────┤
  │ ScatterByHashTransform simplification (no shared-payload trick, no per-row Selector bookkeeping) │ ~30-50       │
  ├──────────────────────────────────────────────────────────────────────────────────────────────────┼──────────────┤
  │ ShardedAggregatingTransform simplification (gets normal Chunks, can call executeOnBlock)         │ ~30-50       │
  ├──────────────────────────────────────────────────────────────────────────────────────────────────┼──────────────┤
  │ Subtotal                                                                                         │ ~850-900 LoC │
  └──────────────────────────────────────────────────────────────────────────────────────────────────┴──────────────┘

  If you also drop the precomputed-hash and pre-serialized-keys plumbing (separate optimizations, can be deferred):

  ┌────────────────────────────────────────────────────────────────────────────────────────────┬───────────┐
  │                                         Additional                                         │ LoC saved │
  ├────────────────────────────────────────────────────────────────────────────────────────────┼───────────┤
  │ StringHashTable.h dispatchImpl<Tag> refactor + PrecomputedHashTag                          │ ~210      │
  ├────────────────────────────────────────────────────────────────────────────────────────────┼───────────┤
  │ TwoLevelStringHashTable.h matching plumbing                                                │ ~7        │
  ├────────────────────────────────────────────────────────────────────────────────────────────┼───────────┤
  │ ColumnsHashingImpl.h emplaceKeyWithHash + LastElementCache::saved_hash                     │ ~114      │
  ├────────────────────────────────────────────────────────────────────────────────────────────┼───────────┤
  │ Aggregator.cpp consecutive-keys-with-hash + equal-keys range opt for sharding path         │ ~80       │
  ├────────────────────────────────────────────────────────────────────────────────────────────┼───────────┤
  │ SerializedKeyBuffer.h + HashMethodSerialized::external_serialized_keys hook                │ ~55       │
  ├────────────────────────────────────────────────────────────────────────────────────────────┼───────────┤
  │ Aggregator::prepareHashesAndKeysForSharding and serialization in prepareColumnsForSharding │ ~50       │
  ├────────────────────────────────────────────────────────────────────────────────────────────┼───────────┤
  │ Additional subtotal                                                                        │ ~516 LoC  │
  └────────────────────────────────────────────────────────────────────────────────────────────┴───────────┘

  Total removable: ~1350-1400 LoC out of ~2350 src LoC (≈ 60% of src). With tests untouched (~1444 LoC), the patch goes from ~3790 → ~2400 total, and src code from ~2350 → ~950-1000.

  What's left in the MVP

  ┌───────────────────────────────────────────────────────────────────────────────┬──────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
  │                                   Component                                   │   LoC    │                                                                           What it is                                                                           │
  ├───────────────────────────────────────────────────────────────────────────────┼──────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ ScatterByHashTransform.{h,cpp} (simplified, uses IColumn::scatter)            │ ~150     │ The 1→N processor: hash each row, partition with Fibonacci mixing, call IColumn::scatter on each column. Per-shard FIFO queues for back-pressure still needed. │
  ├───────────────────────────────────────────────────────────────────────────────┼──────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ ShardedAggregatingTransform.{h,cpp} (thin wrapper)                            │ ~80-100  │ One per shard. Receives normal Chunks, calls Aggregator::executeOnBlock (existing), emits aggregated chunks.                                                   │
  ├───────────────────────────────────────────────────────────────────────────────┼──────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ AggregatingStep.cpp integration                                               │ ~150     │ Eligibility checks, the Scatter → Resize(N→1) → ShardedAgg × N wiring.                                                                                         │
  ├───────────────────────────────────────────────────────────────────────────────┼──────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ Aggregator::chooseAggregationMethod static refactor + getShardedPayloadHeader │ ~25      │ Planner needs to probe method type to reject incompatible keys.                                                                                                │
  ├───────────────────────────────────────────────────────────────────────────────┼──────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ HashTablesStatistics.h ShardedStatsCollector + ShardedAggregationEntry        │ ~90      │ Per-shard prealloc hints survive the simplification — they work on key counts, not column layout.                                                              │
  ├───────────────────────────────────────────────────────────────────────────────┼──────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ Settings.cpp + serialization plumbing                                         │ ~25      │ The optimize_aggregation_by_sharding setting end-to-end.                                                                                                       │
  ├───────────────────────────────────────────────────────────────────────────────┼──────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ Planner.cpp / PlannerCorrelatedSubqueries.cpp / InterpreterSelectQuery.cpp    │ ~12      │ Pass setting into step.                                                                                                                                        │
  ├───────────────────────────────────────────────────────────────────────────────┼──────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ 8 test suites                                                                 │ ~1444    │ Unchanged — same correctness + EXPLAIN PIPELINE + negative-case + Distributed coverage.                                                                        │
  ├───────────────────────────────────────────────────────────────────────────────┼──────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ MVP src                                                                       │ ~530-550 │                                                                                                                                                                │
  ├───────────────────────────────────────────────────────────────────────────────┼──────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ MVP total                                                                     │ ~2000    │                                                                                                                                                                │
  └───────────────────────────────────────────────────────────────────────────────┴──────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

  Is this still substantial enough to be a meaningful MVP?

  Yes — and arguably it's a better MVP. Here's why:

  The architectural payoff is fully preserved. The whole point of sharded aggregation is "no merge phase" — each shard owns a disjoint key range. That's still there. The *ForRows machinery, the precomputed hashes, the pre-serialized keys are all
  micro-optimizations on top of that core idea. Stripping them doesn't reduce the qualitative win; it reduces the perf ceiling.

  What you're trading away (perf-wise):

  ┌──────────────────────────────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
  │             Optimization removed             │                                                             Expected perf cost on shard hot path                                                             │
  ├──────────────────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ Precomputed-hash reuse during emplace        │ ~5-15% throughput loss (hash recomputed twice instead of once)                                                                                               │
  ├──────────────────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ IColumn::scatter materialization cost        │ One extra column copy per chunk per shard. For numeric columns: cheap memcpy. For strings: O(total_bytes) gather.                                            │
  ├──────────────────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ *ForRows SIMD-aware MinMax                   │ Falls back to existing range-based vectorized findExtreme (which is fast — actually possibly faster than the scalar findExtreme*ForRows added in the branch) │
  ├──────────────────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ *ForRows ArgMinMax single-pass index search  │ Bigger loss — falls back to N×add calls with potential repeated arena allocations                                                                            │
  ├──────────────────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ *ForRows Any/AnyLast O(1) shortcut           │ Bigger loss — O(N) in fallback                                                                                                                               │
  ├──────────────────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ Pre-serialized keys for HashMethodSerialized │ Re-serializes during emplace. For long string keys, this is real (~10-20%).                                                                                  │
  └──────────────────────────────────────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

  So expect roughly 10-25% throughput regression on perf benchmarks vs. the full branch, depending on the query — but still a substantial speedup vs. master (which has full merge-phase overhead). The point of MVP is to ship the architecture; you reclaim those %
  via follow-up PRs.

  What you gain (review-wise):

  - The reviewer reads ~550 LoC of new src code instead of ~2350. Two days of review becomes two hours.
  - The StringHashTable.h dispatch refactor — the trickiest piece — is not in the MVP. It can land as its own focused PR with a unit test.
  - Each follow-up perf PR (ForRows overrides, precomputed hashes, pre-serialized keys) becomes individually reviewable: "here's the change, here's the perf number, OK or not OK". Decoupled benchmarks.
  - Bisect bandwidth: if a regression shows up after ship, you can identify which optimization caused it. With the current branch, anything wrong with sharding requires reverting the whole thing.

  What stays substantial:

  - A new query-plan path with non-trivial pipeline construction (Scatter → Resize → per-shard agg).
  - Two new processors (ScatterByHashTransform, ShardedAggregatingTransform) including a non-trivial back-pressure scheme.
  - A new statistics infrastructure for per-shard preallocation.
  - Eligibility logic that interacts with all the existing aggregation paths (in-order, two-level, external, grouping sets, overflow guards).
  - Eight test suites covering correctness + EXPLAIN + Distributed + edge cases.
  - A user-facing setting with serialization through query-plan IO.

  That's a complete, defensible feature. Not a "draft" or "stub". Ships real value, leaves a clean follow-up plan.

  Recommendation

  I'd go further than my earlier 3-stage proposal:

  PR-1   Sharded aggregation MVP via IColumn::scatter, default OFF    ~550 src + 1444 tests
  PR-2a  ForRows API in IAggregateFunction (just the framework)        ~78
  PR-2b  ForRows fast paths for must-keep (MinMax/Any/ArgMinMax)       ~250 (with perf #s)
  PR-2c  ForRows fast paths for nice-to-have (Sum/Avg/Bitwise)         ~100 (with perf #s)
  PR-3   Precomputed-hash plumbing + StringHashTable refactor          ~330 (with unit test + perf #s)
  PR-4   Pre-serialized keys                                            ~85  (with perf #s)
  PR-5   Per-shard prealloc stats integration                           ~90  (already in MVP, can be split)
  PR-6   Flip default to ON                                             ~1   (with perf-test PR link)

  Six small, focused PRs over one ~600-LoC monolith. Each one reviewable in <1 hour with a clear "what does this change buy us?" answer. The MVP carries the architectural complexity; each follow-up carries one optimization with a benchmark number attached.

  This is honestly how I'd ship a feature this size if I wanted it merged quickly with high reviewer confidence.

I'd perhaps limit ourselves to 4 or even 3 stages instead of 6, but it is up to you. My only big ask is to move iteratively.

Comment thread src/Core/Settings.cpp

- [GROUP BY optimization](/sql-reference/statements/select/group-by#group-by-optimization-depending-on-table-sorting-key)
)", 0) \
DECLARE(Bool, optimize_aggregation_by_sharding, true, R"(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ This introduces a new user-facing setting (optimize_aggregation_by_sharding) with new behavior, but the PR still has the "Documentation is written" checkbox unchecked.

Please add user docs for this setting (when to enable it, limitations such as lack of external aggregation spill support, and expected trade-offs), then mark the template checkbox accordingly.

@clickhouse-gh

clickhouse-gh Bot commented May 6, 2026

Copy link
Copy Markdown
Contributor

LLVM Coverage Report

Metric Baseline Current Δ
Lines 84.10% 84.00% -0.10%
Functions 91.10% 90.70% -0.40%
Branches 76.60% 76.40% -0.20%

Changed lines: 80.38% (762/948) | lost baseline coverage: 15 line(s) · Uncovered code

Full report · Diff report

@clickhouse-gh

clickhouse-gh Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

@nickitat nickitat self-assigned this Jun 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-performance Pull request with some performance improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants