Cascades cost-based optimizer for distributed query plans#86353
Cascades cost-based optimizer for distributed query plans#86353davenger wants to merge 262 commits into
Conversation
852ec15 to
8da0548
Compare
…ut under randomized settings Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…zero weights Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…un on a worker Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…s, derive join width from output header Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ther on L*node_count Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…p to fix clang-tidy Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…agment keeps its stream cap Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
|
||
| ExpressionStatistics StatisticsDerivation::deriveFilterStatistics(const FilterStep & filter_step, const ExpressionStatistics & input_statistics) | ||
| { | ||
| ExpressionStatistics result_statistics = input_statistics; |
There was a problem hiding this comment.
deriveFilterStatistics still models every FilterStep as selectivity = 1: it remaps NDVs but leaves estimated_row_count and max_row_count unchanged. After the earlier pushdown pass there are still realistic filters on computed columns or post-join expressions, so broadcast-vs-shuffle and downstream exchange costs can ignore a large cardinality reduction entirely and pick the opposite plan. Please derive at least a heuristic row-count reduction here (or reuse the existing selectivity estimator when the filter still sits on a ReadFromMergeTree path) instead of copying the input cardinality unchanged.
…pper bound in prepopulation Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… mismatched-type USING join swap in 04332 Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…cades check for in-order aggregation Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
| writeStringBinary(part_name, ctx.out); | ||
| } | ||
|
|
||
| bool has_input_order = query_info.input_order_info != nullptr; |
There was a problem hiding this comment.
Now that read-in-order metadata is serialized for distributed reads, distributed_read_part_names also becomes the worker-side execution-order contract for requestReadingInOrder: deserialization rebuilds result.parts_with_ranges in this exact sequence, and spreadMarkRangesAmongStreamsWithOrder then consumes that vector from the back to decide which ranges each ordered reader sees first. The std::sort(part_names...) above changes the coordinator's selectRangesToRead() order into lexicographic part-name order, which is not the same thing when the ORDER BY key is unrelated to part naming or partitioning. A distributed SortedRead can therefore claim sorted output while actually reading parts in the wrong order. Please preserve the coordinator's original parts_with_ranges order instead of sorting the names.
… clone, note rule-based-planner scope in distributed_plan setting descriptions Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
… steps to one node, keep query sort settings and serialize the sort limit, price bounded sorts on input rows Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…mixed work units Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
… cost formulas onto strategies Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
… and price exchanges on the selected child Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…e cost config Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…d format-reparse check and crashed CI on the LOCAL JOIN test Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
| @@ -4876,9 +4883,6 @@ void ReadFromMergeTree::serialize(Serialization & ctx) const | |||
| /// non-bucket read is rebuilt and re-optimized on the worker, which re-derives them. | |||
| if (distributed_read_bucket_count > 0) | |||
There was a problem hiding this comment.
assertFragmentSerializable still trusts ReadFromMergeTree::isSerializable(), but that method always returns true. These new bucketed-read rejections therefore slip past the pre-check and only fail later when serializeQueryPlan walks the fragment. That regresses the intended fail-closed path not only for projections, but also for deferred FINAL filters and direct text-index tasks. Please either make isSerializable() reflect these bucketed-read constraints, or move the checks into the earlier distributed-read support pass so make_distributed_plan rejects them before building the distributed plan.
…he swapped join tag Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
… and keep 04310 stat hints authoritative Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
LLVM Coverage ReportChanged lines: Changed C/C++ lines covered: 2410/2681 (89.89%) · Uncovered code |

A Cascades-style cost-based optimizer that chooses distribution strategies for multi-stage distributed query plans (#106020). It explores alternatives in a memo with top-down, goal-directed search and picks the cheapest plan satisfying the required distribution and sorting properties, inserting exchange operators as needed.
Implemented:
ReplicatedRead— every worker repeats the same read of a small table instead of a network broadcast, assuming shared storage where all workers see the same data), local join.Gather/Shuffle/Broadcast/ScatterExchangeandSortenforcers.work,network, andsequentialcomponents with configurable weights, a fixed per-exchange overhead, broadcast costed per receiving node, and statistics clamped to join kind and strictness semantics.Design, a worked example on TPC-H data (a simplified 3-table query traced through the memo), and current limitations are documented in
src/Processors/QueryPlan/Optimizations/Cascades/ARCHITECTURE.md. Plan-shape tests cover the actual TPC-H queries (03836_tpch_join_order_plans).Disabled by default. Requires the analyzer and the multi-stage distributed execution configuration (stateless workers):
For tests,
param__internal_cascades_cluster_node_countoverrides the cluster size,param__internal_cascades_cost_configoverrides the cost weights,param__internal_join_table_stat_hintsinjects table statistics, anddistributed_plan_execute_locally = 1runs the distributed stages in-process.Related: #106020
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Added an experimental Cascades cost-based optimizer for distributed query plans, enabled by
enable_cascades_optimizer = 1together withmake_distributed_plan = 1. It chooses between shuffle, broadcast, and local join strategies, two-phase, shuffle, and local aggregation, two-stage distributed top-N, and parallel and replicated reads by estimated cost, inserting exchange operators as needed.Documentation entry for user-facing changes