Reimplement reading in order for parallel replicas by nickitat · Pull Request #101434 · ClickHouse/ClickHouse · GitHub
Skip to content

Reimplement reading in order for parallel replicas#101434

Merged
nickitat merged 74 commits into
masterfrom
read_in_order
Jul 2, 2026
Merged

Reimplement reading in order for parallel replicas#101434
nickitat merged 74 commits into
masterfrom
read_in_order

Conversation

@nickitat

@nickitat nickitat commented Mar 31, 2026

Copy link
Copy Markdown
Member

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

Reading in-order with Parallel Replicas now uses the same logic of splitting the table into max_threads parts as the local reading for better parallelism.


Private test results.

Claude PR summary
  1. Protocol (src/Core/Protocol.h, ProtocolDefines.h, MergeTree/RequestResponse.{h,cpp})

  - New client packet MergeTreeAllRangesAnnouncementResponse (= 14), protocol bumped to DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 8 with gate
  DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_ANNOUNCEMENT_RESPONSE.
  - New struct InitialAllRangesAnnouncementResponse { parts, stream_id }: initiator's reply to a follower's announcement, carrying the authoritative parts list for that stream.
  - MergeTreeAllRangesCallback signature changes from void(...) → std::optional<...Response>(...). nullopt = old initiator, no response sent. Engaged-but-empty parts = stream
  doesn't exist on the coordinator (over-announce) → follower's pool finishes immediately.

  2. Coordinator (ParallelReplicasReadingCoordinator.{h,cpp})

  - handleInitialAllRangesAnnouncement now returns InitialAllRangesAnnouncementResponse.
  - New setSnapshotReplicaNum(replica_num): lets the initiator pin itself as the snapshot replica BEFORE any announcement arrives. Once pinned, only the snapshot replica can
  create a new stream coordinator; announcements from other replicas for unknown streams are silently dropped (empty parts returned).
  - Per-stream getRegisteredParts() virtual on ImplInterface: the post-normalization working set (InOrderCoordinator drops covered/covering parts at normalization time). Cached
  in stream_to_registered_parts on first announcement and echoed back to every subsequent announcer.
  - InOrderCoordinator::isReadingCompleted() newly implemented; pure virtual on the base.

  3. Planner (ReadFromMergeTree.{h,cpp}, ParallelReplicasLocalPlan.cpp, ClusterProxy/executeQuery.{h,cpp})

  - spreadMarkRangesAmongStreamsWithOrder refactored into three branches gated by isParallelReplicasLocalPlanForInitiator / isParallelReplicasLocalPlanForFollower:
    - Initiator + local plan: split ranges into num_streams per-stream pools (genuine splitting). Each split gets its own stream_id = <table>#split_{i}.
    - Follower + local plan: construct num_streams pools, each over ALL local parts. Each pool's announcement gets back from the coordinator the authoritative sub-set; non-owned
  parts are filtered out during source construction so no phantom consumers are built. Streams owning no parts on this follower produce empty pipes and are dropped.
    - parallel_replicas_local_plan=0: preserved old single-pool behavior.
  - make_per_split_pool_settings: divides pool_settings.threads evenly across splits to avoid min_marks_per_request = min_marks_per_task × threads being inflated num_splits-fold.
  - ParallelReplicasLocalPlan::createLocalPlanForParallelReplicas calls coordinator->setSnapshotReplicaNum(replica_number) before any announcement is sent.
  - New helper ClusterProxy::canUseLocalPlanForParallelReplicas(context): extracts the 4-clause check that gates the local-plan branch in executeQueryWithParallelReplicas
  (analyzer + parallel_replicas_local_plan + parallel_replicas_prefer_local_replica + _shard_num == 0). Now also called by ReadFromMergeTree so followers keep lock-step with the
  initiator's topology decision. This is the fix in the last two commits — without it, followers inside a Distributed sub-query took the 32-pool over-announce path while the
  initiator skipped local plan entirely, causing every split's coordinator to register the full part view and amplify reads ~`num_streams`×.

  4. Read pools (MergeTreeReadPoolParallelReplicas{,InOrder}.{h,cpp}, MergeTreeReadPoolBase.{h,cpp})

  - MergeTreeReadPoolBase::buildAnnouncementDescriptions(): lifted from the InOrder pool to be shared (fills in per-part min_marks_per_task). The announcement is now sent by
  ReadFromMergeTree directly (not the pool constructor), so the response can be consumed at the call site.
  - MergeTreeReadPoolParallelReplicasInOrder:
    - Steady-state task size now capped at min_marks_per_task (matching the Default pool) rather than max_block_size / index_granularity — fixes a long-standing over-small task
  issue while keeping warmup growth for early-LIMIT termination.
    - Response matching is now keyed by (part_info, projection_name) instead of by position, since coordinators can return parts in arbitrary order.
  - ParallelReadingExtension::sendInitialRequest returns the response. Split into sendReadRequest (Default) / sendReadInOrderRequest (positional → keyed).

  5. Connection plumbing

  - IServerConnection / IConnections / Connection / MultiplexedConnections / LocalConnection / HedgedConnections: new sendMergeTreeAllRangesAnnouncementResponse virtual + impl.
  - TCPHandler: the announcement callback now blocks for the response (when client protocol ≥ 8) via new receiveAllRangesAnnouncementResponse. Errors set stop_query = true and
  rethrow.
  - RemoteQueryExecutor::processMergeTreeInitialReadAnnouncement: relays the coordinator's response back to the announcing replica.

  6. Tests

  - New 04073_parallel_replicas_in_order_splits.sql: verifies that a single part is genuinely split into ≥ max_threads MergeTreeSelect sources for both WithOrder and
  ReverseOrder.
  - New integration test test_parallel_replicas_protocol (separate fixture under tests/integration/).
  - Modified 00177_memory_bound_merging.sh (test4): runs the in-order query with count() (which amplifies under work duplication, unlike the idempotent max(URL) in test1/test2)
  and compares against a single-node baseline. With max_threads=16 and parallel_replicas_local_plan=1 pinned so random-settings can't mask the regression. This catches the
  over-announce bug that the last two commits fix.
  - Reference updates in 02404_memory_bound_merging, 02883_parallel_replicas_join_algo_and_analyzer_{2,3}, 03222_parallel_replicas_memory_bound_merging_projection,
  03724_parallel_replicas_duplicate_requests for the new split topology.

  ---
  Suggested review order

  1. Protocol + coordinator (sections 1–2) — sets the contract.
  2. Planner (section 3) — most of the new code; the three-branch split in spreadMarkRangesAmongStreamsWithOrder plus the snapshot pinning are the heart of the change.
  2a. The last two commits in isolation (0d3f0456c06, a92a094ca72) — small, self-contained canUseLocalPlanForParallelReplicas factoring + regression test.
  3. Read pool + connection plumbing (sections 4–5).
  4. Tests (section 6) — 04073 is the cleanest functional spec of the new behavior.

Version info

  • Merged into: 26.7.1.426

@clickhouse-gh

clickhouse-gh Bot commented Mar 31, 2026

Copy link
Copy Markdown
Contributor

@clickhouse-gh clickhouse-gh Bot added the pr-performance Pull request with some performance improvements label Mar 31, 2026
Comment thread src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp Outdated
Comment thread src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp
Comment thread src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp Outdated
Comment thread src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp Outdated
Comment thread src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp Outdated
Comment thread src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp Outdated
@alexey-milovidov

Copy link
Copy Markdown
Member

The Stress test (arm_msan) failure is fixed by #101239, which should be merged first. After it is merged, please update the branch to include the fix.

Comment thread src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp Outdated
Comment thread src/Storages/MergeTree/RequestResponse.h Outdated
Comment thread src/Processors/QueryPlan/ReadFromMergeTree.cpp Outdated
Comment thread src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp Outdated
Comment thread tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py Outdated
nickitat and others added 3 commits June 22, 2026 17:08
`all_parts_for_replicas = parts_with_ranges` previously ran for every
non-initiator, including `parallel_replicas_local_plan=0`. The legacy
single-pool branch consumes the list once with `std::move` — the copy
was a pure full-vector waste on the in-order parallel-replica hot path.

Only the local-plan follower path actually needs the copy (each split
reads from a copy and filters down to its assigned subset). For the
single-pool branch, move `parts_with_ranges` directly into the only pool
that consumes it: that path never reached the split builder, so
`parts_with_ranges` is still intact and there's no second consumer to
worry about.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The cluster was bumped from 25.12 to 26.5 in 35881bb, but the
inline comment on the third (current-build) node still mentioned 25.12.
Update it so the rolling-upgrade rationale stays consistent.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The existing pipeline-shape checks (`num_sources >= 4`) caught the
"splits aren't created" regression but would silently pass through a
"split assignment drops or duplicates ranges" regression that kept the
shape intact. Add `count() = 1000000, sum(a) = 499999500000` against
the known data baseline for both `ORDER BY a` and `ORDER BY a DESC` so
the test fails immediately under coverage holes or amplification.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Comment thread src/Core/ProtocolDefines.h
nickitat and others added 4 commits June 22, 2026 18:04
Two stale mentions left over from the 25.12 → 26.5 bump:

  - Module-level rationale: the parenthetical "(25.12 has PR=5 and is
    excluded by ...)" is rewritten to talk about the disconnect gate
    generically — the specific 25.12 detour shouldn't outlive its
    relevance to anyone reading the test fresh.

  - `test_split_topology_rolling_upgrade` docstring: "The 25.12 peer"
    → "The 26.5 peer" to match the actual `tag=`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…packet 14

- Feature table `VERSIONED_PARALLEL_REPLICAS_PROTOCOL`: current value bumped
  from `7` to `8`, with a paragraph describing what version `8` adds
  (`MergeTreeAllRangesAnnouncementResponse` and the
  initiator-replies-to-followers contract).

- `ServerHello.parallel_replicas_protocol_version` and
  `Addendum.parallel_replicas_protocol_version`: current value bumped
  `7` → `8` so the canonical-value column matches
  `DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION`.

- Client → Server packet table: add packet `14`
  `MergeTreeAllRangesAnnouncementResponse`, body `not specified` to match
  the convention used by sibling parallel-replicas packets, with a
  description that explains the version gate and that it's inter-server
  only.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Add a dedicated `MergeTreeAllRangesAnnouncementResponse` section under
"Message reference" describing:

  - When the packet flies (`parallel_replicas_protocol_version ≥ 8` AND
    the originating announcement's `mode` is non-`Default`; `Default`
    mode stays fire-and-forget).
  - The three top-level body fields (`version`, `parts`, `stream_id`)
    and how `version` falls back below the
    `DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL` TCP
    revision.
  - The `RangesInDataPartsDescription` and `RangesInDataPartDescription`
    wire formats with their gates (`MIN_VERSION_WITH_PROJECTION` v5,
    `MIN_VERSION_WITH_MIN_MARKS_PER_TASK` v6).
  - The `MergeTreePartInfo` and `MarkRanges` byte layouts including the
    little-endian / VarUInt / boolean-text quirks.

Link the Client → Server packet-table row and the feature-table entry to
the new body section so the canonical spec covers everything needed to
implement or validate the v8 inter-server packet.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Comment thread docs/en/interfaces/specs/NativeProtocol.md Outdated
nickitat and others added 4 commits June 22, 2026 18:42
…up packet direction

The response-flow narrative incorrectly said the follower issues
`MergeTreeReadTaskResponse` (client packet `10`) after the
announcement-response, which is the response side — i.e. the initiator's
reply, on the wrong endpoint. The follower actually sends
`MergeTreeReadTaskRequest` (server packet `16`, follower→initiator) and
the initiator replies with `MergeTreeReadTaskResponse` (client packet
`10`).

Correct the wording and spell out both packet roles so third-party
native clients don't implement the response side on the wrong endpoint.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…egisteredParts

The two comments around `getRegisteredParts` / `stream_to_registered_parts`
suggested that `InOrderCoordinator` drops covered/covering parts during
normalization of a single announcement, making the captured working set a
subset of the announcement payload. That's wrong: within a single MergeTree
replica's announcement parts are non-overlapping by construction, and the
cover/covered branches in `doHandleInitialAllRangesAnnouncement` only ever
deduplicate across replicas (which, in the snapshot-pinned topology this
PR adds, is moot because the snapshot replica is the first announcer).

Rewrite both comments to describe what's actually happening: the working
set equals the first announcement one-to-one, and we capture it via
`getRegisteredParts` (rather than from the announcement directly) only to
keep the lookup independent of any future per-stream coordinator that may
post-process its input.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

@devcrafter devcrafter left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm


MergeTreeReadPoolPtr pool;

/// Authoritative set of (parent_info, projection_name) reported by the coordinator for this

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we simplify the comment? AI makes it harder to perceive.

Under some CI configs the loop-insert pattern (one `INSERT` per node into
a `ReplicatedMergeTree`) ended up with more than 1M rows in `ts` because
replication timing left the table somewhere between 1M and 3M rows when
queries started. `sum()` and `LIMIT` queries survived that (parallel
replicas dedupes reads, so `sum` looks correct at 1x and `LIMIT 10` is
idempotent), but the new `count()` assertion caught the extra rows and
failed on amd_msan / amd_asan_ubsan / arm_binary variants with `count=20`
instead of `10` per group.

Insert exactly once from `split_topology_nodes[0]` and `SYSTEM SYNC
REPLICA` the other two, then `OPTIMIZE FINAL` everywhere, so every
replica sees the same 1M rows regardless of scheduler timing.

Fixes: 4x reports on
https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=101434&sha=489aca864d0395f291e59e15f93d7a36e7986338&name_0=PR

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@clickhouse-gh

clickhouse-gh Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

LLVM Coverage Report

Metric Baseline Current Δ
Lines 85.50% 85.50% +0.00%
Functions 92.60% 92.60% +0.00%
Branches 77.60% 77.70% +0.10%

Changed lines: Changed C/C++ lines covered: 392/421 (93.11%) · Uncovered code

Full report · Diff report

@nickitat nickitat added this pull request to the merge queue Jul 2, 2026
Merged via the queue into master with commit 7ad18bf Jul 2, 2026
513 of 520 checks passed
@nickitat nickitat deleted the read_in_order branch July 2, 2026 11:56
@robot-ch-test-poll robot-ch-test-poll added the pr-synced-to-cloud The PR is synced to the cloud repo label Jul 2, 2026
alexey-milovidov added a commit that referenced this pull request Jul 2, 2026
…o-read-setting

The branch was ~11.7k commits behind (last merge 2026-06-20) and red. The
diff vs. `master` is still the single randomizer line — no functional change:

    "parallel_replicas_min_number_of_rows_per_replica": lambda: random.randint(0, 1),

Reason to re-merge now: the parallel-replicas owners have landed several
directly relevant fixes since 2026-06-20, so refreshing CI on today's
`master` will narrow the remaining set of genuine product bugs this PR is
meant to surface:
- #108451 (`Fix NOT_FOUND_COLUMN_IN_BLOCK for virtual columns under parallel
  replicas`, closes #106561) — should clear the tracked
  `04098_asterisk_include_virtual_columns_mergetree` failure.
- #101434 (`Reimplement reading in order for parallel replicas`) — bears
  directly on the `max_rows_to_read`-not-honored class
  (`02155_read_in_order_max_rows_to_read`, `00945_bloom_filter_index`).
- #109003 (`Fix server abort on GROUPING SETS in a set operation with
  parallel replicas`) — a "Server died" class fix.
- Flaky-test fix for `04051_pk_analysis_stats`.

Conflicts were all in files the branch does not intentionally change (its
only intended change is the one `tests/clickhouse-test` line); they were
resolved by taking `master`'s version. No pinning/blacklisting of the
affected parallel-replicas tests — that would mask the very signal this PR
exists to produce.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-performance Pull request with some performance improvements pr-synced-to-cloud The PR is synced to the cloud repo

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants