Add selective replication for ReplicatedMergeTree by zoomxi · Pull Request #102244 · ClickHouse/ClickHouse · GitHub
Skip to content

Add selective replication for ReplicatedMergeTree#102244

Open
zoomxi wants to merge 2 commits into
ClickHouse:masterfrom
zoomxi:selective_replication
Open

Add selective replication for ReplicatedMergeTree#102244
zoomxi wants to merge 2 commits into
ClickHouse:masterfrom
zoomxi:selective_replication

Conversation

@zoomxi

@zoomxi zoomxi commented Apr 9, 2026

Copy link
Copy Markdown
Contributor

By default, every replica in a ReplicatedMergeTree shard stores a full copy of all data. This PR introduces selective replication: a new replication_factor table setting that controls how many replicas store each partition's data.This reduces storage costs and write amplification while maintaining read availability through automatic partition-to-replica assignment, query routing, and background rebalancing.

This implementation is inspired by #58132 .

Changelog category (leave one):

  • New Feature

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

Add selective replication for ReplicatedMergeTree, allowing each partition to be stored on only replication_factor replicas instead of all replicas in a shard. Partitions are automatically rebalanced in the background. Closes #45766.

Documentation entry for user-facing changes

  • [*] Documentation is written (mandatory for new features)

@azat azat added the can be tested Allows running workflows for external contributors label Apr 9, 2026
@clickhouse-gh

clickhouse-gh Bot commented Apr 9, 2026

Copy link
Copy Markdown
Contributor

@clickhouse-gh clickhouse-gh Bot added the pr-feature Pull request with new product feature label Apr 9, 2026
Comment thread src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from e6432f0 to 76ef782 Compare April 10, 2026 09:53
Comment thread src/Storages/StorageReplicatedMergeTree.cpp
@zoomxi zoomxi force-pushed the selective_replication branch 2 times, most recently from 481fe9a to 989b2dd Compare April 13, 2026 07:51
Comment thread src/Storages/MergeTree/PartitionMigrationCoordinator.cpp Outdated
Comment thread src/Storages/MergeTree/PartitionMigrationCoordinator.cpp Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from 989b2dd to d140478 Compare April 13, 2026 08:50
Comment thread src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from d140478 to 24aaa89 Compare April 13, 2026 09:56
Comment thread src/Storages/MergeTree/KeeperReplicaAssignment.cpp Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from 24aaa89 to f7c9889 Compare April 13, 2026 11:29
Comment thread docs/en/operations/system-tables/selective_assignments.md Outdated
Comment thread docs/en/operations/system-tables/selective_migrations.md Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from f7c9889 to 07edaf9 Compare April 14, 2026 13:02
Comment thread src/Storages/MergeTree/PartitionMigrationCoordinator.cpp Outdated
Comment thread docs/en/operations/system-tables/replication_queue.md Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from 07edaf9 to eb3f0d9 Compare April 15, 2026 03:44

String shared_log_path = fs::path(storage.getZooKeeperPath()) / "log/log-";

for (const auto & part_name : meta.source_parts_snapshot)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startClone defines SelectiveReplication::GET_PART_BATCH_SIZE, but still issues one zk->create RPC per part in a tight loop. For large partitions this can generate thousands of sequential Keeper writes and significantly delay migration or trigger Keeper timeouts under load.

Please batch log entry creation in chunks (for example, GET_PART_BATCH_SIZE) using tryMulti and retry partial batch failures. That keeps migration throughput predictable and reduces Keeper pressure.

@clickhouse-gh

clickhouse-gh Bot commented Apr 15, 2026

Copy link
Copy Markdown
Contributor

LLVM Coverage Report

Metric Baseline Current Δ
Lines 84.00% 84.10% +0.10%
Functions 90.90% 90.90% +0.00%
Branches 76.50% 76.60% +0.10%

Changed lines: 74.48% (1643/2206) | lost baseline coverage: 58 line(s) · Uncovered code

Full report · Diff report

@zoomxi zoomxi force-pushed the selective_replication branch from eb3f0d9 to dcdfb2a Compare April 15, 2026 14:12
@clickhouse-gh clickhouse-gh Bot added the manual approve Manual approve required to run CI label Apr 15, 2026
@zoomxi zoomxi force-pushed the selective_replication branch 3 times, most recently from ad3885f to 914956b Compare April 21, 2026 12:07
@zoomxi

zoomxi commented Apr 21, 2026

Copy link
Copy Markdown
Contributor Author

1. Keeper Data Structure

Under each table's ZK path ({zk_path}), a /selective subtree stores all metadata for selective replication:

{zk_path}/selective/
  config                        — JSON: {"replication_factor": N}
  counts                        — "format version: 1\nr1:10,r2:8,..." (per-replica partition counts)
  assignments/
    {partition_id}              — "format version: 1\nr1,r2" or "r1,r2:cloning" during migration
  rebalance_lock                — Ephemeral node holding auto-rebalance distributed lock
  migrations/
    {uuid}                      — JSON: {state, partition_id, source_replica, target_replica, coordinator, ...}
      clone_complete            — Empty node, created by target replica to signal clone is done

Key points:

  • /selective/config is written once at CREATE TABLE and verified by subsequent replicas on join.
  • /selective/assignments/{pid} uses a format version: 1 text format. The :cloning suffix marks a replica still pulling data during migration — excluded from read/write routing until SWITCH removes the suffix.
  • /selective/counts is a denormalized cache of per-replica partition counts. Updated atomically with assignments in a single ZK multi-op, so the least-loaded-first allocator never needs a full scan.
  • /selective/migrations/{uuid} stores migration metadata as JSON. State transitions are CLONE → SWITCH → DONE (or FAILED on rollback), CAS-protected.

@zoomxi

zoomxi commented Apr 21, 2026

Copy link
Copy Markdown
Contributor Author

2. Read Path

When replication_factor > 0, SELECT queries are routed so each partition is read only from an assigned replica:

SELECT * FROM table
  │
  ├─ createStorageSnapshot()
  │    └─ getAssignments(zk, {})  →  60s TTL cache or ZK refresh
  │    └─ snapshot.selective_assignment_map[pid] = assigned_replicas
  │
  ├─ getQueryProcessingStage()
  │    ├─ depth == 0 && has remote partitions → WithMergeableState
  │    └─ depth == 0 && all local              → default (Complete)
  │
  ├─ read()  [processed_stage == WithMergeableState && depth == 0]
  │    └─ readWithSelectiveRouting()
  │         ├─ buildPartitionRoutingMap():
  │         │    ├─ local partition  → read locally + _partition_id IN (...) filter
  │         │    ├─ remote partition → hash-select one assigned replica → remote sub-query
  │         │    └─ not in map       → fallback to local read
  │         ├─ redistributeFailedReplicas() on unreachable replicas
  │         └─ UnionStep merge all shard plans
  │
  └─ Sub-queries (depth > 0): readLocalImpl() only, no re-routing

Depth guard: Sub-queries arrive with distributed_depth > 0 and skip selective routing, reading only their locally-filtered partitions. MAX_FORWARDING_DEPTH = 3 prevents infinite forwarding loops.

Cache consistency: The 60-second TTL means a recently-migrated partition may route to the old replica until the cache expires. This is an acceptable trade-off for avoiding per-query ZK lookups.

@zoomxi

zoomxi commented Apr 21, 2026

Copy link
Copy Markdown
Contributor Author

3. Write Path

INSERT queries are forwarded to assigned replicas with CAS-protected assignment verification:

INSERT (distributed_depth == 0)
  │
  ├─ allocatePartitions()
  │    └─ least-loaded-first: prefer replicas with fewer partitions
  │    └─ tie-break: hash(partition_id) rotation
  │    └─ ZK multi-op: create assignment + update counts (atomic)
  │
  ├─ buildForwardingPlan()
  │    ├─ local blocks  (assigned to self)  → stay for local write
  │    └─ remote blocks (assigned to other) → forward with depth++
  │
  ├─ commitPart()  [for each local block]
  │    ├─ getAssignmentCASVersion()  →  cache-first, ZK on miss
  │    ├─ CheckRequest(assignment_path, cas_version) in multi-op
  │    ├─ Success → part committed
  │    └─ ZBADVERSION:
  │         ├─ still assigned    → rollback temp part, retry with new version
  │         └─ no longer assigned → AssignmentChangedException → Phase 2
  │
  └─ Phase 2: re-forward (assignment_failures)
       ├─ depth >= MAX_FORWARDING_DEPTH (3) → throw error
       ├─ re-allocatePartitions()
       └─ buildForwardingPlan + executeForwarding (re-forward)

Queue filtering: shouldSkipForSelectiveReplication() skips log entries (GET_PART/MERGE/MUTATE) for partitions not assigned to the local replica.

@zoomxi

zoomxi commented Apr 21, 2026

Copy link
Copy Markdown
Contributor Author

4. Rebalance & Migration

Migration State Machine

CLONE  ──→  SWITCH  ──→  DONE
  │                      ↑
  └──→ FAILED ──────────┘  (rollback)
CLONE phase:
  ├─ Coordinator records source_parts_snapshot in migration metadata
  ├─ Target appears as "target:cloning" in assignment (excluded from read/write routing)
  ├─ Coordinator writes GET_PART entries to shared replication log
  ├─ Target pulls data via normal replication queue
  └─ Target creates clone_complete signal node when all parts are present

SWITCH phase:
  ├─ Coordinator atomically updates assignment (target:cloning → target, remove source)
  ├─ Atomically updates counts (source -1, target +1) in same multi-op
  └─ CAS-protected against concurrent modifications

DONE / FAILED:
  ├─ Migration ZK subtree cleaned up
  └─ On failure: :cloning entry removed from assignment

@zoomxi

zoomxi commented Apr 21, 2026

Copy link
Copy Markdown
Contributor Author

@zoomxi zoomxi force-pushed the selective_replication branch from 8ac59c8 to f070806 Compare May 6, 2026 11:46
`aspell-dict.txt` was deleted in master along with the `aspell` and
`codespell` style checks. Removing the file here so the PR can merge
cleanly without a modify/delete conflict.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors manual approve Manual approve required to run CI pr-feature Pull request with new product feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Trivial Support For Resharding (RFC)

3 participants