Add PartialAggregateCache for part-level aggregate caching.#93757
Add PartialAggregateCache for part-level aggregate caching.#93757zex-hyd wants to merge 20 commits into
Conversation
ffeb3a4 to
8a4aebe
Compare
|
Hi @alexey-milovidov @rschu1ze, This PR is related to my feature request #93756 for partial aggregate caching (from Intern Tasks 2025/2026). I've started with a skeleton implementation to show the proposed cache structure. Would appreciate feedback on the approach! Happy to adjust based on your suggestions. |
|
Workflow [PR], commit [4f92f09] Summary: ❌
AI ReviewSummaryThis PR introduces an experimental Findings❌ Blockers
Tests
Final VerdictStatus: ❌ Block
|
7bce5fe to
9fa7ee0
Compare
9fa7ee0 to
9a03c03
Compare
|
Hi ClickHouse Team (@alexey-milovidov @rschu1ze @Avogar @clickhouse-admin), PR is ready for review. The failing checks are unrelated to this PR. All relevant checks pass: builds (amd/arm), style, stateless tests (including 03800_partial_aggregate_cache), and unit tests. Failures are in integration test infrastructure, fuzzer edge cases, and unrelated features. Stack traces show nothing related to aggregation or caching. The partial aggregate cache test passes. PR is ready for review. Thanks in advance for any comments, suggestions, or approval. |
9a03c03 to
66170ba
Compare
| /// produce the same partial aggregates on every execution, so we cache and reuse them. | ||
| /// | ||
| /// Enable with: SET use_partial_aggregate_cache = 1 | ||
| class PartialAggregateCache |
There was a problem hiding this comment.
The new cache will live side-by-side to the regular query cache which isn't really great. I would rather like to see the normal query cache develop into an intermediate result cache.
Would you please check if the existing query result cache can be extended into this direction? Perhaps we can have just another key/entry type.
There was a problem hiding this comment.
Thanks for your review! I spent some time looking into this.
So bascially the QueryResultCache and PartialAggregateCache both use CacheBase, but the key/entry and invalidation story is quite different: the query cache is AST + user/roles with TTL, partial aggregates are semantic hash + part + mutation version, and they hook into the pipeline in different places. Folding that into QueryResultCache as “just another entry kind” would need a proper design pass, not a small tweak.
I suggest kept them split here to keep the PR focused. And possible in the future open a tracking issue for a unified intermediate-result cache if that direction still makes sense.
|
@rschu1ze Thank you so much for the code review! This really helps a lot! I will be working on them based on the feedback in the next few days and continuously making contribution to ClickHouse! |
005c8f2 to
d17e874
Compare
| @@ -264,6 +265,16 @@ MergeTreeSelectProcessor::readCurrentTask(MergeTreeReadTask & current_task, IMer | |||
| res.read_mark_ranges)); | |||
| } | |||
|
|
|||
| if (reader_settings.use_partial_aggregate_cache) | |||
There was a problem hiding this comment.
We can know the set of parts to read on the planning stage. This logic is both weird and inefficient, because we shouldn't spend time reading this data at all.
There was a problem hiding this comment.
plan-time decisions now happen in ReadFromMergeTree using the known part set, and MergeTreeSelectProcessor::buildPartialAggregateInfoFromCurrentTask only attaches part identity to chunks for the execution-time path.
nickitat
left a comment
There was a problem hiding this comment.
Please outline the entire design, and let's first make sure it will work. Then we can talk about the implementation.
Thank you for reviewing this! I am currently working on it and will upload the design soon! |
@nickitat @rschu1ze @alexey-milovidov I agree that making the reuse decision in Goal The original goal for this PR is to reuse per-part intermediate GROUP BY states for eligible MergeTree queries. For the same semantic query key, immutable parts produce the same intermediate aggregate states, so cached states can be reused instead of re-scanning those parts. Design There are two places involved in my design:
Correctness rules
Cache key
Invalidation is key-based: merges and mutations produce new part names or mutation versions, so stale entries are not reused for changed data. Parallelism When execution-time cache population is enabled, the aggregation pipeline is resized to one stream so one transform observes the complete input for each part before writing it to cache. If Disabled cases Currently disabled for in-order aggregation, grouping sets at plan-time, Tests The tests cover the legacy path, analyzer path, plan-time hits, execution-time population, grouping sets disabling, parallel-stream behavior, early-flush / skip-cache-put for incomplete states, and the Relevant tests: We would really appreciate a check on whether this design matches your expectations! |
LLVM Coverage Report
Changed lines: 92.90% (746/803) | lost baseline coverage: 22 line(s) · Uncovered code |
| logging_context = context; | ||
| /// Skip internal functions (e.g. `_CAST` from the analyzer); user query lists `CAST` and logging both is noisy. | ||
| if (logging_context && !logging_context->isGlobalContext() && logging_context->getSettingsRef()[Setting::log_queries] | ||
| && !name.starts_with('_')) |
There was a problem hiding this comment.
FunctionFactory logging skips all underscore-prefixed functions
Low Severity
The new filter !name.starts_with('_') suppresses addQueryFactoriesInfo logging for ALL functions whose resolved name starts with underscore, not just the internal _CAST. This changes existing query log behavior — previously all resolved functions were logged. Any user-defined or built-in function beginning with _ will silently disappear from query_log.used_functions, affecting observability and audit trails.
Reviewed by Cursor Bugbot for commit 5e94dd5. Configure here.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 2 potential issues.
There are 3 total unresolved issues (including 1 from previous review).
Reviewed by Cursor Bugbot for commit 4f92f09. Configure here.
| key.part_mutation_version = part_info->part_mutation_version; | ||
|
|
||
| if (partial_aggregate_cache_parts_served.contains(key)) | ||
| return; |
There was a problem hiding this comment.
Execution-time hit silently drops chunk data without tracking
Low Severity
When partial_aggregate_cache_parts_served.contains(key) is true (duplicate chunk from an already-served part), consume returns immediately without updating src_rows, src_bytes, or rows_before_aggregation. This makes the aggregation progress/logging counters inaccurate, under-reporting the volume of data processed. The same issue applies to the plan-hit deduplication path.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 4f92f09. Configure here.
| } | ||
|
|
||
| partial_aggregate_miss_buffers.clear(); | ||
| partial_aggregate_miss_buffered_input_bytes = 0; |
There was a problem hiding this comment.
Early flush puts incomplete per-part states into global variants separately
Medium Severity
tryFlushPartialAggregateMissBuffersIfNeeded calls flushPartialAggregateMissBuffers(false) which clears partial_aggregate_miss_buffers. Subsequent chunks from the same part create a new buffer entry. While the skip-cache-put guard prevents caching incomplete states, this means a single part's data is split across multiple mergeOnBlock calls with separately-aggregated intermediate states. For non-commutative aggregation edge cases or if mergeOnBlock triggers is_consume_finished on the first partial merge, the remaining chunks for that part still arrive and create a new local aggregator, potentially producing subtly wrong final results when no_more_keys becomes true mid-part.
Reviewed by Cursor Bugbot for commit 4f92f09. Configure here.
| const auto & settings = context->getSettingsRef(); | ||
| const bool partial_cache_compatible_with_group_by_limits | ||
| = settings[Setting::max_rows_to_group_by] == 0 || settings[Setting::group_by_overflow_mode] == OverflowMode::THROW; | ||
| const bool can_plan_partial_aggregate_cache_hits = build_pipeline_settings.partial_aggregate_cache_query_hash.has_value() |
There was a problem hiding this comment.
❌ Plan-time partial-cache hits are unsafe when pre-aggregation JOIN is present.
ReadFromMergeTree can remove hit parts and replace them with zero-row PartialAggregatePlanHitInfo chunks, but JoiningTransform::transform drops non-virtual zero-row chunks (it sets output_chunk only when res.rows() > 0, and only preserves ChunkInfos on the has_virtual_row branch).
So for queries like SELECT ... FROM t JOIN d ... GROUP BY ..., a hit part can be excluded from reads while its cached state metadata is lost before AggregatingTransform, which drops that part's contribution from the result.
Can we gate plan-time probing to query shapes without pre-aggregation joins (or otherwise guarantee PartialAggregatePlanHitInfo survives all pre-aggregation transforms, not only FilterTransform)?
|
Dear @nickitat, you haven't been active on this PR for 30 days. You will be unassigned. Will you continue working on it? If so, please feel free to reassign yourself. |



Add experimental part-level partial aggregate cache for
GROUP BYonMergeTreetables, with planning-stage and execution-stage integration. Motivation: avoid re-reading unchanged parts by reusing mergeable intermediate aggregate states keyed by query semantics and part identity.Related issue: #93756
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Add experimental setting
use_partial_aggregate_cacheforMergeTreeGROUP BYqueries to cache per-part intermediate aggregate states and reuse them either at planning stage inReadFromMergeTreeor at execution stage inAggregatingTransform; include a correctness fix so planning-stage hit chunks carryingPartialAggregatePlanHitInfoare preserved through pre-aggregationFilterTransformand are not dropped before aggregation.Documentation entry for user-facing changes
How it works
Two integration points:
ReadFromMergeTree): probePartialAggregateCacheby part before readers are built. On hit, emitPartialAggregatePlanHitChunkSourceand exclude the part from the read pipeline.AggregatingTransform): on miss, buffer rows per part and put mergeable states into cache when complete; on hit, merge cached states viamergeOnBlockand skip subsequent chunks from that part.Cache stores mergeable intermediate states, not finalized results.
Cache key:
(query_hash, table_uuid, part_name, part_mutation_version)Invalidation is key-based: merged/mutated parts naturally produce new keys.
Current limits / behavior
GROUPING SETSuse execution-stage caching only.AggregatingInOrderTransform).partial_aggregate_cache_allow_parallel_aggregation_streams=1keeps parallelism and disables only execution-stage cache (planning-stage hits still apply).Tests
03800_partial_aggregate_cache.sql04064_partial_aggregate_cache_analyzer.sql04065_partial_aggregate_cache_grouping_sets.sql04131_prometheus_query_parseris reproducible without random settings and appears unrelated to this change set.Note
High Risk
Touches core query planning/execution paths (
ReadFromMergeTree,AggregatingStep,AggregatingTransform) and adds new caching behavior and invalidation logic, which risks subtle correctness/performance regressions despite being gated behind experimental settings.Overview
Adds an experimental partial aggregate cache for repeated
GROUP BYqueries onMergeTree, caching per-part intermediate aggregate states and reusing them across semantically identical queries.Integrates the cache at both planning time (probing in
ReadFromMergeTreeto skip reading parts on cache hits viaPartialAggregatePlanHitChunkSource) and execution time (buffering per-part misses inAggregatingTransform, then merging andput-ing completed intermediate states). Cache keys incorporate query semantics (incl.current_database,apply_deleted_mask, and guards for subqueries/row policies/additional filters) plus part identity (table_uuid,part_name,part_mutation_version), with hit/miss metrics/events added.Introduces new settings (
use_partial_aggregate_cache,partial_aggregate_cache_allow_parallel_aggregation_streams), server config knobs/defaults for cache sizing,SYSTEM CLEAR|DROP AGGREGATE CACHEsupport with a new privilege, and documentation/tests (including analyzer and grouping-sets coverage); also tightens function-factory query logging and makes query-cache nondeterminism checks robust to gated/deprecated functions.Reviewed by Cursor Bugbot for commit 4f92f09. Bugbot is set up for automated code reviews on this repo. Configure here.