Push ORDER BY from outer query into simple VIEWs for distributed optimization#94102
Conversation
…mization When querying a VIEW over a distributed table with ORDER BY + LIMIT, ClickHouse was performing a full sort on the coordinator instead of using the efficient 'merge sorted streams' optimization. The issue occurs because the VIEW's inner query has no ORDER BY clause. Without it, shards return unsorted data, forcing the coordinator to do a full sort rather than merging pre-sorted streams. This fix injects the outer query's ORDER BY into the VIEW's inner query AST before it's passed to the interpreter. The fix: 1. Detects when the outer query has ORDER BY and the table expression is a VIEW 2. Clones the VIEW's inner query AST 3. Builds ORDER BY elements from the outer query's sort columns 4. Injects the ORDER BY into the VIEW's inner query 5. Sets this modified query as view_query in SelectQueryInfo This enables: - Each shard to sort its data (or use read-in-order optimization) - The coordinator to merge sorted streams efficiently - Significant performance improvement for ORDER BY + LIMIT queries The optimization only applies to simple VIEWs without existing ORDER BY, GROUP BY, DISTINCT, LIMIT, HAVING, or WINDOW clauses. Example: CREATE VIEW v AS SELECT * FROM distributed_table; SELECT * FROM v ORDER BY col LIMIT 100; Before: Full sort on coordinator (O(N log N)) After: Sort on shards + merge sort on coordinator (more efficient)
|
Workflow [PR], commit [3b7bc18] Summary: ✅
AI ReviewSummaryThis PR pushes outer Final Verdict✅ Approve |
For simple views over a single table (no UNION, no JOIN), StorageView now delegates getQueryProcessingStage() to the underlying storage. This preserves the underlying storage's capabilities, such as: - Distributed tables can sort on shards with merge sort at coordinator - Other storages that support higher processing stages This fixes the bug where ORDER BY pushdown optimization was not applied to VIEWs over distributed tables, causing inefficient full sorts instead of merge sorted streams. The fix makes VIEW transparent with respect to query processing stage, which is a cleaner approach than injecting ORDER BY into the view's inner query.
40343e1 to
fdbc3ba
Compare
The previous commit made StorageView delegate getQueryProcessingStage() to the underlying storage for ORDER BY optimization. However, this broke queries with aggregation (e.g., SELECT sum(a) FROM view) because: - The Distributed table returns stages like WithMergeableState expecting partial aggregate function states from shards - But the VIEW's inner query produces raw columns, not aggregate states - This caused: "Conversion from UInt64 to AggregateFunction(sum, UInt64) is not supported" Fix: Only delegate to underlying storage when the outer query doesn't require aggregation. We check: - query_info.need_aggregate (captures GROUP BY and aggregate functions) - query_node->isDistinct() (DISTINCT has similar merging requirements) This preserves the ORDER BY merge-sort optimization while fixing the aggregation bug. Test results: - SELECT * FROM view ORDER BY x LIMIT 10 -> Uses merge sorted streams ✓ - SELECT sum(a) FROM view -> Works correctly (returns 52) ✓ - SELECT DISTINCT a FROM view -> Falls back to FetchColumns ✓
Add NOLINT comment to empty catch block in getQueryProcessingStage. The empty catch is intentional - it allows fallback to default behavior when table lookup fails during query processing stage determination. This follows the established pattern used throughout the codebase for intentional exception swallowing.
…r-distributed-table-order-by-pushdown
- Return FetchColumns early when query_tree is not available (analyzer disabled) - Use query tree for outer query analysis (aggregation, DISTINCT checks) - Simplify control flow with early returns instead of nested conditionals - Keep metadata-based approach for finding underlying table in view definition The optimization delegates getQueryProcessingStage to the underlying storage for simple views (single SELECT, no UNION, no JOIN), enabling ORDER BY optimizations like merge sorted streams for distributed tables.
When querying a VIEW over a Distributed table with ORDER BY, the optimization that merges pre-sorted streams from shards was not applied. This is because the ORDER BY clause wasn't visible to the underlying distributed table. This change pushes ORDER BY (and LIMIT) from the outer query into the VIEW's inner query AST, enabling the distributed table to see the sort requirement and apply the merge-sorted-streams optimization instead of doing a full sort after receiving all data. Also fix the test to use EXPLAIN instead of EXPLAIN PIPELINE, since "Merge sorted streams" is a step description that appears in query plan output, not pipeline output.
When querying a VIEW over a Distributed table with ORDER BY, the optimization that merges pre-sorted streams from shards was not applied. This is because the ORDER BY clause wasn't visible to the underlying distributed table. This change pushes ORDER BY (and LIMIT) from the outer query into the VIEW's inner query AST in PlannerJoinTree, enabling the distributed table to see the sort requirement and apply the merge-sorted-streams optimization instead of doing a full sort after receiving all data. Simplified approach: Instead of complex stage delegation in StorageView, we just push ORDER BY into view_query which is used by StorageView::read(). The underlying Distributed table then naturally sees the ORDER BY. Also fix the test to use EXPLAIN instead of EXPLAIN PIPELINE, since "Merge sorted streams" is a step description that appears in query plan output, not pipeline output.
…b.com:matanper/ClickHouse into view-over-distributed-table-order-by-pushdown
e926237 to
3a3d6db
Compare
3a3d6db to
d71d487
Compare
d71d487 to
513fb91
Compare
The setup dropped the `definer_*_04241` users before dropping any leftover `test_view_definer_*_04241` views from a previous interrupted run. Since the views depend on the users, `DROP USER IF EXISTS` would throw `HAVE_DEPENDENT_OBJECTS` while a leftover view still exists. Drop the dependent views before the users (both in the initial cleanup block and right before the `DROP USER` statements). Addresses review feedback on ClickHouse#94102 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Continued work on this PR (pushed `736e525ce64..e0449a6`): Merged latest `master` (the PR was conflicting). The only conflict was in `src/Planner/PlannerJoinTree.cpp`: this PR and master each added independent helper functions at the same spot (`pushOrderByIntoView`/`containsWindowFunction` here, `parallelReplicasEnabledForStorage`/`allowParallelReplicasForJoinTree` on master). Kept both; the net diff of the PR is unchanged (the same 3 files). Addressed both remaining review findings (AI Review “
Added a regression test (8250201): Local validation (against a The only CI failure on the prior commit was |
# Conflicts: # src/Planner/PlannerJoinTree.cpp
`arrayJoin` used as a projection function changes row cardinality after the source read: it expands each input row into one row per array element and drops rows whose array is empty. Unlike an `ARRAY JOIN` clause (which the analyzer lowers to a separate table expression, so the outer query is no longer a single-table read and never reaches the optimization), `arrayJoin` in the select list keeps `is_single_table_expression` true and slips past the existing guards in `pushOrderByIntoView`. Pushing `ORDER BY`/`LIMIT` into the view then truncates source rows before the expansion runs, so for a query like `SELECT arrayJoin(arr) FROM v ORDER BY ts DESC LIMIT 10` where the top ordered rows have empty arrays, the rewritten query would return too few rows instead of continuing to lower ordered rows to fill the `LIMIT`. Reject the optimization when the outer projection contains `arrayJoin`, mirroring the existing guard in `mainQueryNodeBlockSizeByLimit`, and add a focused regression to `04241_view_orderby_distributed_optimization`. Addresses AI Review finding on PlannerJoinTree.cpp:940. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Continued work on this PR (pushed Addressed the AI Review
The C++ change passes Note on the prior CI report for |
…ed-table-order-by-pushdown
|
Merged the latest
No unresolved review threads; the AI Review verdict is ✅ Approve. |
The merge to `master` made `explain_query_plan_default = 'pretty'` the default, so `EXPLAIN PLAN` now decorates each step with tree-drawing characters (`└──`/`├──`/`│`). The step-extraction query in `04241_view_orderby_distributed_optimization` stripped only leading whitespace, so the extracted step became `└──Sorting (Merge sorted streams after aggregation stage for ORDER BY)` instead of the reference `Sorting (Merge sorted streams ...)`. Strip leading whitespace and the tree-drawing prefix characters so the extracted step text is stable regardless of the step's depth and position in the plan tree (the surrounding plan shape can vary across builds, which is why the test matches only this one line). CI report: https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=94102&sha=c4f8f90b2482e0d92bfb192abca00c57171466e4&name_0=PR&name_1=Fast%20test Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ed-table-order-by-pushdown
|
Pushed Fixed the sole CI failure ( The regex now strips leading whitespace and the pretty tree-drawing prefix characters ( Also merged the latest No unresolved review threads; the AI Review verdict is ✅ Approve. The remaining |
|
The only non-green check on the current head
CI for the fresh- @groeneai, please confirm/track this failure (report) and link the in-progress fix here if one of the above PRs covers it. |
|
A second red appeared on the current head It is unrelated to this planner-only change (the PR touches only
So both non-green checks on
No unresolved review threads; AI Review verdict ✅ Approve. The rest of CI on the fresh- @groeneai, please confirm/track the Hung-check sighting and link the in-flight fix if one exists. |
|
Merged the latest
No unresolved review threads; the AI Review verdict is ✅ Approve. The remaining |
Address review: `pushOrderByIntoView` rebuilt the pushed-down inner query from the view's stored definition (`metadata->getSelectQuery().inner_query`), which does not carry the `SAMPLE`/`FINAL` modifiers applied to the view in the outer query. Pushing the outer `ORDER BY`/`LIMIT` into that inner query would order and truncate the full, unsampled, non-final row set below the point where the modifier applies, so a query such as `SELECT id FROM v SAMPLE 0.1 ORDER BY ts DESC LIMIT 10` could return the top-N of the whole view instead of the top-N of the sampled subset. Skip the optimization whenever the view table expression carries a `SAMPLE` or `FINAL` modifier, mirroring the existing modifier guard used for the trivial-count optimization in the same file. Added `FINAL`/`SAMPLE` rejection cases to `04241_view_orderby_distributed_optimization`. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Addressed the AI review blocker and pushed Fix — skip the pushdown when the view carries CI on the previous head
|
LLVM Coverage Report
Changed lines: Changed C/C++ lines covered: 111/118 (94.07%) · Uncovered code |

When querying a VIEW over a distributed table with ORDER BY + LIMIT, ClickHouse was performing a full sort on the coordinator instead of using the efficient 'merge sorted streams' optimization.
The root cause is that the Distributed table doesn't see the outer query's ORDER BY clause when reading through a VIEW, so it can't enable the optimization where shards sort their data and the coordinator merges sorted streams.
This fix pushes ORDER BY/LIMIT from the outer query into the VIEW's inner query AST in
PlannerJoinTree.cpp:view_queryso the Distributed table sees the ORDER BYThis enables:
The pushdown is intentionally conservative and is skipped whenever it could change results: outer-query filtering (
WHERE/PREWHERE/QUALIFY), aggregation/DISTINCT/window functions,LIMIT BY,OFFSET,WITH TIES,WITH FILL, fractionalLIMIT,extremes, row policies,additional_table_filters, view/inner-query window orQUALIFY, a column type that differs between the view's declared structure and the inner query, andlimit/offset/prefer_column_name_to_aliasset either in the view's ownSETTINGSclause or in the effective view context (e.g. through aSQL SECURITY DEFINERview's definer settings profile).Example:
Before: Full sort on coordinator (Sorting (Sorting for ORDER BY))
After: Sort on shards + merge sort on coordinator (Sorting (Merge sorted streams...))
Performance evidence
This optimization removes work on the coordinator: without it the view is an opaque subquery, so every shard streams all of its rows to the coordinator, which then performs the global
ORDER BY/LIMIT. With it, each shard appliesORDER BY/LIMITlocally and the coordinator only merges pre-sorted streams (Merge sorted streams), receiving just the per-shard top-n.Measured on
test_cluster_two_shards_localhost(2 shards, 10M rows each = 20M total),SELECT id FROM v ORDER BY ts DESC LIMIT 10,max_threads = 4, median of 7 runs. The A/B was done on the same binary by toggling only the optimization viaprefer_column_name_to_alias = 1— a pure analysis-time flag that disables the pushdown without changing execution; the two outputs are byte-identical (verified by hashing the result), so the comparison isolates this change with no other commits as confounds.(id, ts)rows ≈ 228.9 MiB transferred over the network pathOn a single machine the wall time is unchanged (equal within run-to-run noise):
test_cluster_two_shards_localhostruns its shards in-process (NetworkReceiveBytes = 0), so there is no transfer to save and the per-shard table scan — identical on both paths — dominates. This is exactly why the standard/cloud benchmark suites reportno_change: they contain no view-over-networked-DistributedORDER BY/LIMITscenario. The transferred-bytes figure (measured by forcing the real network path withprefer_localhost_replica = 0, where the coordinator ingests every shard's rows) quantifies what the optimization eliminates; the benefit scales with row count × shard count and with network latency/bandwidth on real multi-node clusters. The plan-level change (coordinator full sort →Merge sorted streams) is asserted by the stateless test04241_view_orderby_distributed_optimization.Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Queries with
ORDER BYandLIMITonVIEWs over distributed tables now use the merge-sorted-streams optimization. The outerORDER BY/LIMITis pushed into a simple view's inner query so each shard sorts its data locally and the coordinator merges pre-sorted streams, instead of performing a full sort on the coordinator.Closes: #92746
Note
Medium Risk
Touches query planning/AST rewriting for
VIEWreads; guarded by many semantic checks but regressions are possible in edge cases around filtering/limits/order modifiers.Overview
Enables an optimization where outer
ORDER BY+LIMITis pushed into a simpleVIEW’s inner SELECT (by rewritingSelectQueryInfo::view_query) so underlyingDistributedreads can produce merge-sorted streams instead of forcing a full coordinator sort.The pushdown is intentionally conservative: it only applies to transparent views (single SELECT, no JOIN/GROUP BY/DISTINCT, no existing ORDER BY/LIMIT) and is disabled for outer-query features that can change semantics (filters, OFFSET,
LIMIT BY,WITH TIES,WITH FILL, windows, row policies,additional_table_filters), with new stateless tests covering both the positive case and the main rejection cases.Reviewed by Cursor Bugbot for commit 7f63adb. Bugbot is set up for automated code reviews on this repo. Configure here.
Version info
26.7.1.492