Fix ScatterExchangeStep crash on a multi-bucket source (make_distributed_plan over Merge of Distributed)#108401
Conversation
ScatterExchangeStep::createSinkAndSourcePair rejected any source with more than one bucket with a LOGICAL_ERROR. This is reachable from SQL: aggregating over a Merge table whose children are Distributed tables crashed the server with 'ScatterExchangeStep should have one source shard, got 8' when make_distributed_plan=1 (issue ClickHouse#107946). StorageMerge::ReadFromMerge builds its children at a post-FetchColumns stage and converts each child plan to a distributed plan on its own (ReadFromMerge::buildPipeline -> QueryPlan::buildQueryPipeline -> convertToDistributed). The per-child plan therefore carries a ScatterExchange above an already-bucketed distributed read, so the scatter sees several source buckets (distributed_plan_default_reader_bucket_count, 8 by default) and the assertion aborts the server. Scatter is documented as a special case of Shuffle whose source bucket count is 1, and it already reuses ShuffleSend/ShuffleReceive as its sink/source. Neither step requires a single source bucket: ShuffleSend runs per source bucket and ShuffleReceive reads every source bucket for its destination bucket. So when the source happens to expose several buckets, the same pair correctly repartitions all source buckets into the requested result buckets, i.e. behaves as a plain shuffle - which is exactly what optimizeExchanges already does when it rewrites a Scatter over a Gather into a ShuffleExchange. Drop the assertion and let the multi-bucket source flow through. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
cc @davenger @KochetovNicolai — could you review? |
|
Workflow [PR], commit [789c2ed] Summary: ❌
AI ReviewSummary
Final Verdict
|
…on test The CI test profile (tests/config/users.d/limits.yaml) sets max_rows_to_group_by to 10G, and make_distributed_plan rejects aggregation with a non-zero limit (Code 344, SUPPORT_IS_DISABLED). That fired before the query could reach the ScatterExchangeStep path, so the Fast test for this regression failed without exercising the fix. clickhouse-test also randomizes prefer_localhost_replica. With it set to 0 the Distributed engine serializes the per-child plan to the localhost replica, where the experimental BlocksMarshalling step is not deserializable (Code 47, UNKNOWN_IDENTIFIER). That path is unrelated to the crash, which happens earlier while building the per-child plan. Pin both settings so the query deterministically reaches the multi-bucket ScatterExchangeStep. EXPLAIN distributed=1 confirms the per-child plan still carries a ShuffleExchange over a bucketed ReadFromMergeTree (the step that threw 'should have one source shard, got 8' before the fix), so crash-path coverage is preserved. 20/20 runs pass under full CI randomization. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Fixed the Fast test failure on the new regression test ( Two CI-environment settings masked the crash path:
Session id: cron:clickhouse-worker-slot-4:20260624-181800 |
Pin distributed_plan_max_rows_to_broadcast = 0 and distributed_plan_default_reader_bucket_count = 3 so the distributed read is neither broadcast nor single-bucket, which guarantees the per-child scatter sees a multi-bucket source and the removed guard is actually exercised (verified: the test fails with the pre-fix binary and passes with the fix). Add a non-empty assertion through the same Merge over Distributed topology, filtering on the real underlying table names that _table exposes, so the test proves rows survive the multi-bucket scatter, not only that the no-row reproducer no longer throws. Reword 'crashed the server' / 'survive' / 'abort' to 'raised the LOGICAL_ERROR exception' / 'throw', since release builds throw an exception rather than crash. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Updated the regression test and exception wording per the two review comments. Pre-PR validation gate (click to expand)
Session id: cron:clickhouse-worker-slot-2:20260624-231100 |
LLVM Coverage Report
Changed lines: Changed C/C++ lines covered by tests: 2/2 (100.00%) | Lost baseline coverage: none · Uncovered code |
CI finish ledger - 789c2ede1116Every failure below has an owner: a fixing PR (ours or external), or a full-effort fix task whose fixing-PR link will be posted here when it opens. Only No PR-caused failure on this head. (The earlier Session id: cron:our-pr-ci-monitor:20260625-030000 |

Closes: #107946
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Fixed a
LOGICAL_ERRORexception (ScatterExchangeStep should have one source shard, got 8) when aggregating over aMergetable whose underlying tables areDistributed, with the experimental settingmake_distributed_plan = 1.Description
ScatterExchangeStep::createSinkAndSourcePairrejected any source exposing more than one bucket with aLOGICAL_ERROR. That condition is reachable from SQL:raised the exception
Logical error: 'ScatterExchangeStep should have one source shard, got 8'(release builds throw, debug builds abort).Root cause:
StorageMerge::ReadFromMergebuilds its children at a post-FetchColumnsstage and converts each child plan to a distributed plan on its own (ReadFromMerge::buildPipeline->QueryPlan::buildQueryPipeline->QueryPlan::convertToDistributed). The per-child plan therefore carries aScatterExchangeabove an already-bucketed distributed read, so by the timemakeDistributedPlanreaches the scatter its source has several buckets (distributed_plan_default_reader_bucket_count, 8 by default) and the assertion fires.Fix:
ScatterExchangeis documented as a special case ofShuffleExchangewhose source bucket count is 1, and it already reusesShuffleSend/ShuffleReceiveas its sink/source. Neither step requires a single source bucket (ShuffleSendruns per source bucket,ShuffleReceivereads every source bucket for its destination bucket), so a multi-bucket source is handled correctly: the same pair repartitions all source buckets into the requested result buckets, i.e. behaves as a plain shuffle. This is exactly whatoptimizeExchangesalready does when it rewrites aScatterover aGatherinto aShuffleExchange. The assertion is dropped and the multi-bucket source flows through.The reporter (qoega) suggested this localized direction (relax
ScatterExchangeStepto behave as a shuffle when the source has more than one bucket).The same exception signature (
ScatterExchangeStep should have one source shard) was also produced by the AST fuzzer on several PR runs, e.g. https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=108129&sha=8aaab301293dd91ed33f6628d8e888802566c69a&name_0=PR&name_1=AST%20fuzzer%20%28arm_asan_ubsan%29 (AST fuzzer (arm_asan_ubsan)).