Fix ScatterExchangeStep crash on a multi-bucket source (make_distributed_plan over Merge of Distributed) by groeneai · Pull Request #108401 · ClickHouse/ClickHouse · GitHub
Skip to content

Fix ScatterExchangeStep crash on a multi-bucket source (make_distributed_plan over Merge of Distributed)#108401

Open
groeneai wants to merge 3 commits into
ClickHouse:masterfrom
groeneai:fix-scatter-multishard-merge-107946
Open

Fix ScatterExchangeStep crash on a multi-bucket source (make_distributed_plan over Merge of Distributed)#108401
groeneai wants to merge 3 commits into
ClickHouse:masterfrom
groeneai:fix-scatter-multishard-merge-107946

Conversation

@groeneai

@groeneai groeneai commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Closes: #107946

Changelog category (leave one):

  • Bug Fix (user-visible misbehavior in an official stable release)

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

Fixed a LOGICAL_ERROR exception (ScatterExchangeStep should have one source shard, got 8) when aggregating over a Merge table whose underlying tables are Distributed, with the experimental setting make_distributed_plan = 1.

Description

ScatterExchangeStep::createSinkAndSourcePair rejected any source exposing more than one bucket with a LOGICAL_ERROR. That condition is reachable from SQL:

SET make_distributed_plan = 1;
SELECT count(_table) FROM m2 WHERE _table = 'd1' GROUP BY _table;
-- m2 = Merge over Distributed tables d1, d4

raised the exception Logical error: 'ScatterExchangeStep should have one source shard, got 8' (release builds throw, debug builds abort).

Root cause: StorageMerge::ReadFromMerge builds its children at a post-FetchColumns stage and converts each child plan to a distributed plan on its own (ReadFromMerge::buildPipeline -> QueryPlan::buildQueryPipeline -> QueryPlan::convertToDistributed). The per-child plan therefore carries a ScatterExchange above an already-bucketed distributed read, so by the time makeDistributedPlan reaches the scatter its source has several buckets (distributed_plan_default_reader_bucket_count, 8 by default) and the assertion fires.

Fix: ScatterExchange is documented as a special case of ShuffleExchange whose source bucket count is 1, and it already reuses ShuffleSend/ShuffleReceive as its sink/source. Neither step requires a single source bucket (ShuffleSend runs per source bucket, ShuffleReceive reads every source bucket for its destination bucket), so a multi-bucket source is handled correctly: the same pair repartitions all source buckets into the requested result buckets, i.e. behaves as a plain shuffle. This is exactly what optimizeExchanges already does when it rewrites a Scatter over a Gather into a ShuffleExchange. The assertion is dropped and the multi-bucket source flows through.

The reporter (qoega) suggested this localized direction (relax ScatterExchangeStep to behave as a shuffle when the source has more than one bucket).

The same exception signature (ScatterExchangeStep should have one source shard) was also produced by the AST fuzzer on several PR runs, e.g. https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=108129&sha=8aaab301293dd91ed33f6628d8e888802566c69a&name_0=PR&name_1=AST%20fuzzer%20%28arm_asan_ubsan%29 (AST fuzzer (arm_asan_ubsan)).

ScatterExchangeStep::createSinkAndSourcePair rejected any source with more
than one bucket with a LOGICAL_ERROR. This is reachable from SQL: aggregating
over a Merge table whose children are Distributed tables crashed the server
with 'ScatterExchangeStep should have one source shard, got 8' when
make_distributed_plan=1 (issue ClickHouse#107946).

StorageMerge::ReadFromMerge builds its children at a post-FetchColumns stage
and converts each child plan to a distributed plan on its own
(ReadFromMerge::buildPipeline -> QueryPlan::buildQueryPipeline ->
convertToDistributed). The per-child plan therefore carries a ScatterExchange
above an already-bucketed distributed read, so the scatter sees several source
buckets (distributed_plan_default_reader_bucket_count, 8 by default) and the
assertion aborts the server.

Scatter is documented as a special case of Shuffle whose source bucket count is
1, and it already reuses ShuffleSend/ShuffleReceive as its sink/source. Neither
step requires a single source bucket: ShuffleSend runs per source bucket and
ShuffleReceive reads every source bucket for its destination bucket. So when the
source happens to expose several buckets, the same pair correctly repartitions
all source buckets into the requested result buckets, i.e. behaves as a plain
shuffle - which is exactly what optimizeExchanges already does when it rewrites a
Scatter over a Gather into a ShuffleExchange. Drop the assertion and let the
multi-bucket source flow through.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@groeneai

Copy link
Copy Markdown
Contributor Author

@groeneai

Copy link
Copy Markdown
Contributor Author

cc @davenger @KochetovNicolai — could you review? make_distributed_plan over a Merge of Distributed tables distributes each child plan on its own, so a ScatterExchange ends up over an already-bucketed distributed read and its source has 8 buckets, tripping the single-source LOGICAL_ERROR. Since Scatter is a special case of Shuffle and already builds ShuffleSend/ShuffleReceive (both fine with N source buckets), this drops the assertion and lets the multi-bucket source flow through as a shuffle. @qoega confirmed the OSS repro and suggested this direction.

@clickhouse-gh

clickhouse-gh Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Workflow [PR], commit [789c2ed]

Summary:

job_name test_name status info comment
AST fuzzer (amd_debug, targeted) FAIL
Logical error: Block structure mismatch in A stream: different columns: (STID: 0993-27f0) FAIL cidb, issue
Stress test (arm_msan) FAIL
Hung check failed, possible deadlock found FAIL cidb, issue

AI Review

Summary
  • The PR removes the over-strict ScatterExchangeStep single-source-bucket assertion and lets the existing ShuffleSend / ShuffleReceive pair repartition multi-bucket inputs. The regression test now pins the distributed-plan settings needed to force the multi-source path and checks non-empty results through the same Merge over Distributed topology. I found no remaining correctness issues.
Final Verdict
  • Status: ✅ Approve

@clickhouse-gh clickhouse-gh Bot added the pr-bugfix Pull request with bugfix, not backported by default label Jun 24, 2026
Comment thread tests/queries/0_stateless/04367_distributed_plan_merge_scatter_multishard.sql Outdated
…on test

The CI test profile (tests/config/users.d/limits.yaml) sets max_rows_to_group_by
to 10G, and make_distributed_plan rejects aggregation with a non-zero limit
(Code 344, SUPPORT_IS_DISABLED). That fired before the query could reach the
ScatterExchangeStep path, so the Fast test for this regression failed without
exercising the fix.

clickhouse-test also randomizes prefer_localhost_replica. With it set to 0 the
Distributed engine serializes the per-child plan to the localhost replica, where
the experimental BlocksMarshalling step is not deserializable (Code 47,
UNKNOWN_IDENTIFIER). That path is unrelated to the crash, which happens earlier
while building the per-child plan.

Pin both settings so the query deterministically reaches the multi-bucket
ScatterExchangeStep. EXPLAIN distributed=1 confirms the per-child plan still
carries a ShuffleExchange over a bucketed ReadFromMergeTree (the step that threw
'should have one source shard, got 8' before the fix), so crash-path coverage is
preserved. 20/20 runs pass under full CI randomization.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@groeneai

Copy link
Copy Markdown
Contributor Author

Fixed the Fast test failure on the new regression test (04367) at fea96be.

Two CI-environment settings masked the crash path:

  • The test profile (tests/config/users.d/limits.yaml) sets max_rows_to_group_by=10G, and make_distributed_plan rejects aggregation with a non-zero limit (Code 344). Pinned max_rows_to_group_by=0.
  • clickhouse-test randomizes prefer_localhost_replica; with 0 the Distributed engine serializes the per-child plan to the localhost replica where the experimental BlocksMarshalling step is not deserializable (Code 47), unrelated to this crash. Pinned prefer_localhost_replica=1.

EXPLAIN distributed=1 confirms the per-child plan still carries a ShuffleExchange over a bucketed ReadFromMergeTree (the step that threw 'should have one source shard, got 8' before the fix), so crash-path coverage is preserved. 20/20 runs pass under full randomization.

Session id: cron:clickhouse-worker-slot-4:20260624-181800

Pin distributed_plan_max_rows_to_broadcast = 0 and
distributed_plan_default_reader_bucket_count = 3 so the distributed read is
neither broadcast nor single-bucket, which guarantees the per-child scatter sees
a multi-bucket source and the removed guard is actually exercised (verified: the
test fails with the pre-fix binary and passes with the fix). Add a non-empty
assertion through the same Merge over Distributed topology, filtering on the real
underlying table names that _table exposes, so the test proves rows survive the
multi-bucket scatter, not only that the no-row reproducer no longer throws.

Reword 'crashed the server' / 'survive' / 'abort' to 'raised the LOGICAL_ERROR
exception' / 'throw', since release builds throw an exception rather than crash.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@groeneai

Copy link
Copy Markdown
Contributor Author

Updated the regression test and exception wording per the two review comments.

Pre-PR validation gate (click to expand)
# Question Answer
a Deterministic repro? Yes. SELECT count(_table) FROM m107946 WHERE _table = 'base107946_1' GROUP BY _table over a Merge of Distributed tables with make_distributed_plan = 1, distributed_plan_max_rows_to_broadcast = 0, distributed_plan_default_reader_bucket_count = 3 raises the exception deterministically on the pre-fix binary.
b Root cause explained? StorageMerge::ReadFromMerge converts each child plan to a distributed plan on its own, so the per-child plan carries a ScatterExchange above an already-bucketed distributed read. makeDistributedPlan reaches the scatter with N (>1) source buckets, and ScatterExchangeStep::createSinkAndSourcePair rejected source_shards.size() != 1 with a LOGICAL_ERROR.
c Fix matches root cause? Yes. The assertion is dropped because the step already builds ShuffleSend/ShuffleReceive, which handle a multi-bucket source (repartition all source buckets into the result buckets, i.e. behave as a plain shuffle). Same rewrite optimizeExchanges already performs for Scatter-over-Gather. Not a band-aid.
d Test intent preserved / new tests added? New regression test 04367_distributed_plan_merge_scatter_multishard. It now pins the distributed-read settings so the multi-bucket scatter path is forced, and adds a non-empty assertion through the same Merge-over-Distributed topology (counts must match the per-table row counts), so it proves rows survive the multi-bucket scatter rather than only that the no-row reproducer no longer throws.
e Both directions demonstrated? Yes. Pre-fix binary (build 0d3580d4): test FAILS, server dies with Logical error: 'ScatterExchangeStep should have one source shard, got 8'. Fixed binary (build cffaad64): 20/20 OK with full randomization.
f Fix is general across code paths? Yes. The change is in the shared ScatterExchangeStep, so every planner entry that builds a scatter over a multi-bucket source (OSS ReadFromMerge recursion and the cloud worker leaf path) is covered. BroadcastExchangeStep's analogous guard is intentionally left intact because BroadcastReceiveStep genuinely requires a single source (multi-source broadcast is unimplemented).
g Fix generalizes across inputs (params/datatypes/wrappers)? Yes. The fix is independent of column type and of the bucket count: ShuffleSend/ShuffleReceive already use getResultBucketCount() and the actual source_shards list, so any source bucket count >= 1 and any key type repartitions correctly. The test exercises a non-default bucket count (3) rather than relying on the default 8.
h Backward compatible? (maintainer-approved exception only) Yes. No setting default, on-disk/wire format, or validation change. The fix only removes an over-strict internal assertion on an experimental code path (make_distributed_plan), turning a hard error into correct execution.
i Invariants and contracts preserved? Yes. ShuffleSendStep/ShuffleReceiveStep are invoked exactly as they are for a real ShuffleExchange: ShuffleSend partitions each source bucket by hash(key_names) into getResultBucketCount() buckets; ShuffleReceive reads every entry of source_shards for its destination. The removed precondition (source_shards.size() == 1) was the only thing that distinguished a scatter from a shuffle at this layer, and getSourceBucketCount() (hardcoded 1) is not consulted on this path. Verified the non-distributed result oracle matches (100000 / 50000).

Session id: cron:clickhouse-worker-slot-2:20260624-231100

@clickhouse-gh

clickhouse-gh Bot commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

LLVM Coverage Report

Metric Baseline Current Δ
Lines 85.30% 85.30% +0.00%
Functions 92.60% 92.60% +0.00%
Branches 77.50% 77.50% +0.00%

Changed lines: Changed C/C++ lines covered by tests: 2/2 (100.00%) | Lost baseline coverage: none · Uncovered code

Full report · Diff report

@groeneai

Copy link
Copy Markdown
Contributor Author

CI finish ledger - 789c2ede1116

Every failure below has an owner: a fixing PR (ours or external), or a full-effort fix task whose fixing-PR link will be posted here when it opens. Only CH Inc sync is exempt.

Check / test Reason Owner / fixing PR
AST fuzzer (amd_debug, targeted) / Block structure mismatch in UnionStep stream: different columns (STID 0993-27f0) trunk fuzzer LOGICAL_ERROR on WITH RECURSIVE ... UNION ALL Nullable(Nothing) Const-vs-full mismatch; 9 master hits / 20+ unrelated PRs in 30d; unrelated to this PR's ScatterExchangeStep change #107719 (ours, open)
Stress test (arm_msan) / Hung check failed, possible deadlock found chronic trunk shutdown-deadlock family (338 master hits/30d) #101680 / #105905 (ours, open)
CH Inc sync - CH Inc sync (private, not actionable)

No PR-caused failure on this head. (The earlier Fast test 04367 and STID 4661-51c1 failures were on prior commits a273cb6/fea96be8 and are not present on the covered head 789c2ed - Fast test is SUCCESS here.)

Session id: cron:our-pr-ci-monitor:20260625-030000

@davenger davenger self-assigned this Jun 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors pr-bugfix Pull request with bugfix, not backported by default

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Logical error: 'ScatterExchangeStep should have one source shard, got 8' with make_distributed_plan=1 + Merge table

3 participants