Cascades cost-based optimizer for distributed query plans by davenger · Pull Request #86353 · ClickHouse/ClickHouse · GitHub
Skip to content

Cascades cost-based optimizer for distributed query plans#86353

Draft
davenger wants to merge 262 commits into
masterfrom
wip_cascades
Draft

Cascades cost-based optimizer for distributed query plans#86353
davenger wants to merge 262 commits into
masterfrom
wip_cascades

Conversation

@davenger

@davenger davenger commented Aug 28, 2025

Copy link
Copy Markdown
Member

A Cascades-style cost-based optimizer that chooses distribution strategies for multi-stage distributed query plans (#106020). It explores alternatives in a memo with top-down, goal-directed search and picks the cheapest plan satisfying the required distribution and sorting properties, inserting exchange operators as needed.

Implemented:

  • Join strategies: shuffle hash join, broadcast hash join (with ReplicatedRead — every worker repeats the same read of a small table instead of a network broadcast, assuming shared storage where all workers see the same data), local join.
  • Aggregation strategies: two-phase (partial + merge), shuffle by group keys, local.
  • Top-N: two-stage distributed top-N (per-node bounded sort, sorted-merge gather, coordinator limit).
  • Read strategies: parallel N-way read, replicated read, local read.
  • Properties and enforcers: distribution (node count, replication, partitioning columns with equivalence classes and hash cast types) and sorting, bridged by Gather/Shuffle/Broadcast/ScatterExchange and Sort enforcers.
  • Transformations: join commutativity, two-phase aggregation split, two-stage top-N split.
  • Cost model: work, network, and sequential components with configurable weights, a fixed per-exchange overhead, broadcast costed per receiving node, and statistics clamped to join kind and strictness semantics.
  • Fail-closed behavior: an unconvertible plan with exchange steps, an exhausted optimization task budget, and an invalid cost-config override all reject the query with a clear error instead of degrading silently.

Design, a worked example on TPC-H data (a simplified 3-table query traced through the memo), and current limitations are documented in src/Processors/QueryPlan/Optimizations/Cascades/ARCHITECTURE.md. Plan-shape tests cover the actual TPC-H queries (03836_tpch_join_order_plans).

Disabled by default. Requires the analyzer and the multi-stage distributed execution configuration (stateless workers):

SET enable_cascades_optimizer = 1, make_distributed_plan = 1;

For tests, param__internal_cascades_cluster_node_count overrides the cluster size, param__internal_cascades_cost_config overrides the cost weights, param__internal_join_table_stat_hints injects table statistics, and distributed_plan_execute_locally = 1 runs the distributed stages in-process.

Related: #106020

Changelog category (leave one):

  • Experimental Feature

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

Added an experimental Cascades cost-based optimizer for distributed query plans, enabled by enable_cascades_optimizer = 1 together with make_distributed_plan = 1. It chooses between shuffle, broadcast, and local join strategies, two-phase, shuffle, and local aggregation, two-stage distributed top-N, and parallel and replicated reads by estimated cost, inserting exchange operators as needed.

Documentation entry for user-facing changes

  • Documentation written in /docs

@davenger davenger marked this pull request as draft August 28, 2025 11:27
@clickhouse-gh

clickhouse-gh Bot commented Aug 28, 2025

Copy link
Copy Markdown
Contributor

@clickhouse-gh clickhouse-gh Bot added the pr-not-for-changelog This PR should not be mentioned in the changelog label Aug 28, 2025
@novikd novikd self-assigned this Aug 28, 2025
…ut under randomized settings

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Comment thread src/Processors/QueryPlan/Optimizations/Cascades/Cost.h
davenger and others added 6 commits July 1, 2026 08:38
…zero weights

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…un on a worker

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…s, derive join width from output header

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ther on L*node_count

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…p to fix clang-tidy

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…agment keeps its stream cap

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

ExpressionStatistics StatisticsDerivation::deriveFilterStatistics(const FilterStep & filter_step, const ExpressionStatistics & input_statistics)
{
ExpressionStatistics result_statistics = input_statistics;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deriveFilterStatistics still models every FilterStep as selectivity = 1: it remaps NDVs but leaves estimated_row_count and max_row_count unchanged. After the earlier pushdown pass there are still realistic filters on computed columns or post-join expressions, so broadcast-vs-shuffle and downstream exchange costs can ignore a large cardinality reduction entirely and pick the opposite plan. Please derive at least a heuristic row-count reduction here (or reuse the existing selectivity estimator when the filter still sits on a ReadFromMergeTree path) instead of copying the input cardinality unchanged.

davenger and others added 6 commits July 2, 2026 09:46
…pper bound in prepopulation

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… mismatched-type USING join swap in 04332

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…cades check for in-order aggregation

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
writeStringBinary(part_name, ctx.out);
}

bool has_input_order = query_info.input_order_info != nullptr;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that read-in-order metadata is serialized for distributed reads, distributed_read_part_names also becomes the worker-side execution-order contract for requestReadingInOrder: deserialization rebuilds result.parts_with_ranges in this exact sequence, and spreadMarkRangesAmongStreamsWithOrder then consumes that vector from the back to decide which ranges each ordered reader sees first. The std::sort(part_names...) above changes the coordinator's selectRangesToRead() order into lexicographic part-name order, which is not the same thing when the ORDER BY key is unrelated to part naming or partitioning. A distributed SortedRead can therefore claim sorted output while actually reading parts in the wrong order. Please preserve the coordinator's original parts_with_ranges order instead of sorting the names.

davenger and others added 7 commits July 2, 2026 21:05
… clone, note rule-based-planner scope in distributed_plan setting descriptions

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
… steps to one node, keep query sort settings and serialize the sort limit, price bounded sorts on input rows

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…mixed work units

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
… cost formulas onto strategies

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
… and price exchanges on the selected child

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…e cost config

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…d format-reparse check and crashed CI on the LOCAL JOIN test

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@@ -4876,9 +4883,6 @@ void ReadFromMergeTree::serialize(Serialization & ctx) const
/// non-bucket read is rebuilt and re-optimized on the worker, which re-derives them.
if (distributed_read_bucket_count > 0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertFragmentSerializable still trusts ReadFromMergeTree::isSerializable(), but that method always returns true. These new bucketed-read rejections therefore slip past the pre-check and only fail later when serializeQueryPlan walks the fragment. That regresses the intended fail-closed path not only for projections, but also for deferred FINAL filters and direct text-index tasks. Please either make isSerializable() reflect these bucketed-read constraints, or move the checks into the earlier distributed-read support pass so make_distributed_plan rejects them before building the distributed plan.

…he swapped join tag

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
davenger and others added 2 commits July 4, 2026 10:25
… and keep 04310 stat hints authoritative

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@clickhouse-gh

clickhouse-gh Bot commented Jul 4, 2026

Copy link
Copy Markdown
Contributor

LLVM Coverage Report

Metric Baseline Current Δ
Lines 85.60% 85.60% +0.00%
Functions 92.70% 92.70% +0.00%
Branches 77.80% 77.80% +0.00%

Changed lines: Changed C/C++ lines covered: 2410/2681 (89.89%) · Uncovered code

Full report · Diff report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-experimental Experimental Feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants