Fix pipeline-stuck deadlock in BufferedShardByHashTransform#106251
Fix pipeline-stuck deadlock in BufferedShardByHashTransform#106251groeneai wants to merge 10 commits into
Conversation
`BufferedShardByHashTransform` had two prepare() bugs that deadlocked the
pipeline when a downstream consumer activated its inputs sequentially
(e.g. `ConcatProcessor` from `narrowPipe` on a `UNION ALL` with
`max_streams_for_union_step` < pipeline streams).
Scenario: data hashes to only a subset of shards (skewed keys, low
cardinality), so some output ports keep empty queues while sibling
ports fill up. A `Concat` downstream pulls from the empty-queue
branch first; the queued shards stay back-pressured because `Concat`
never advances to them.
Bug 1 (input-finished path): when input is exhausted and a shard's
queue is empty, `prepare()` never finished that output port. The
downstream waited forever for a finish signal, and the queued chunks
on other shards never drained because `Concat` never moved on.
Bug 2 (input-not-finished, queue-at-capacity path): the soft memory
cap (`MAX_QUEUE_LENGTH`) returned `PortFull` as soon as any queue hit
the limit, even when a sibling port had an empty queue and was
actively asking for data (`canPush() == true`). The active sibling
could never receive data, so `Concat` never advanced and the cap'd
queues never drained.
Fix:
1. Finish empty-queue output ports eagerly in the first pass when
`input.isFinished() && !has_pending_input_chunk`. Downstream of
the empty shards then sees `Finished` and proceeds.
2. Bypass the cap-induced `PortFull` when at least one empty port has
`canPush() == true`. The deadlock priority outweighs the soft
memory bound; a brief overshoot lets the asking path receive data
while preserving forward progress.
3. Track `has_pushable_empty_port` and require `has_pushable_queued`
OR `has_pushable_empty` to make any forward-progress claim.
Closes: ClickHouse#106237
Related: ClickHouse#104233
|
cc @PedroTadim @nihalzp @KochetovNicolai — could you review this? It fixes the pipeline-stuck deadlock you flagged on #106237: |
|
Workflow [PR], commit [7f06694] Summary: ❌
AI ReviewSummaryThis PR fixes the Findings
Tests
Final VerdictStatus: Minimum required actions:
|
| /// When such a port exists, the deadlock with sequential consumers takes priority | ||
| /// over the soft memory bound: we let queues briefly overshoot to feed the asking | ||
| /// path. Once input finishes the first pass above will finalize the empty ports. | ||
| if (any_queue_at_capacity && !has_pushable_empty_port) |
There was a problem hiding this comment.
This cap bypass is not bounded. If the demanded empty output belongs to a shard that never receives rows, has_pushable_empty_port stays true, so this check never returns PortFull even after a sibling queue reaches MAX_QUEUE_LENGTH. A concrete trace is all rows hashing to shard 1 while ConcatProcessor is currently asking shard 0: each prepare call skips the cap, pulls another input chunk, and generateOutputChunks appends it to shard 1 until upstream finishes. For a large skewed query this can buffer the whole input and hit memory limits instead of respecting the existing queue cap.
Please keep the forward-progress escape bounded, or change the pipeline/state machine so the cap is not disabled until the pulled chunk actually feeds the demanded empty port.
There was a problem hiding this comment.
Bounded the bypass via a hard per-shard limit (MAX_QUEUE_HARD_LIMIT = 2 * MAX_QUEUE_LENGTH = 20) in 874c4f0. The new check is unconditional - an asking-empty port no longer bypasses it - so worst-case per-shard buffering is MAX_QUEUE_HARD_LIMIT chunks and total MAX_QUEUE_HARD_LIMIT * num_shards. The soft cap and its forward-progress bypass for the normal case are unchanged. When the hard cap fires the pipeline waits, and the eager-finish on input_finished (from the parent commit) unblocks it as soon as upstream depletes.
Added regression test 04306_sharded_aggregation_pipeline_skew_memory_bound for the exact scenario you described - a single distinct key value forces all rows to one shard while max_streams_for_union_step = 1 puts a ConcatProcessor downstream.
Follow-up to the soft-cap bypass in the parent commit: the bypass was unbounded under pathological hash-key skew. If all rows hash to a single shard while a sibling output port is empty and downstream is asking on it, `has_pushable_empty_port` stays true and the bypass never returns `PortFull`, so each `prepare` keeps pulling another input chunk and `generateOutputChunks` keeps appending to the one receiving shard until upstream depletes. For a large skewed query that meant buffering the whole input on one shard. Add a hard per-shard limit `MAX_QUEUE_HARD_LIMIT = 2 * MAX_QUEUE_LENGTH` that is unconditional - even an asking-empty port does not bypass it. Worst-case per-shard buffering is now `MAX_QUEUE_HARD_LIMIT` chunks; worst-case total `MAX_QUEUE_HARD_LIMIT * num_shards`. The soft cap and its forward-progress bypass (which fixes the original `ConcatProcessor` deadlock) are unchanged for the normal case; only pathological skew trips the new hard limit. When it fires the pipeline waits and the eager-finish on `input_finished` (also added in the parent commit) unblocks it as soon as upstream depletes. New regression test `04306_sharded_aggregation_pipeline_skew_memory_bound` exercises the bot-described scenario: a single distinct key value forces all rows to one shard, and `max_streams_for_union_step = 1` puts a `ConcatProcessor` downstream. Bot review thread: ClickHouse#106251 (comment)
|
Pushed 874c4f0 addressing the inline bot review:
Both tests pass 60/60 locally with randomized settings. Session: cron:clickhouse-ci-task-worker:20260601-190000 |
|
@groeneai tests failed, check |
…rowth Per bot review thread ClickHouse#106251 (comment) the hard cap added in 874c4f0 can reintroduce the same `Pipeline stuck` deadlock it tried to prevent: when shard X is at the hard cap with its downstream not asking, shard Y is empty and asking, and input is not finished, `prepare` returns `PortFull` without `input.setNeeded`. Upstream is back-pressured forever while the asking shard never gets data because all rows hash to X. The fundamental tradeoff: under sequential downstream consumption (e.g. `ConcatProcessor`) plus pathological hash skew, bounded memory and forward progress cannot both hold without a spill-to-disk fallback. Pick forward progress here: revert to the soft cap with bypass-on-demand from 2067a25 and document the consequence. Under skew the receiving shard's queue grows until upstream depletes, then the eager-finish on empty ports unblocks the pipeline; memory is bounded by `max_memory_usage`. A proper bound would require spilling overflow chunks; that is deferred to a follow-up. The 04306 regression test was specific to the hard cap and is removed.
|
@PedroTadim Investigated. CI failures:
Reverted the hard cap (commit 874c4f0) in 9645838 per the second bot CR ( PR is now the parent fix (commit 2067a25): eager-finish empty-queue outputs when input is exhausted, soft-cap bypass on demand. CI re-running. |
| INSERT INTO test_106237 SELECT 1 AS a, number AS b FROM numbers(100000); | ||
| INSERT INTO test_106237 SELECT 2 AS a, number AS b FROM numbers(100000); | ||
|
|
||
| SELECT a, max(s) |
There was a problem hiding this comment.
Bugfix validation shows this regression test is not proving the promised fix yet. The job ran 04305_sharded_aggregation_pipeline_stuck_106237 against master 55150d773f60812a7e2fc8bd5410f110040f0549 / server 26.6.1.316; the test reported [ OK ] 0.16 sec, then the job failed with Failed to reproduce the bug (report: https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=106251&sha=96458389a2a30ef4ea85a7f200d15ef928ec0d7d&name_0=PR&name_1=Bugfix%20validation%20%28functional%20tests%29).
For a Bug Fix, the regression test needs to fail on the old code and pass with this change, otherwise the fix is only code-review plausible. Please make this test deterministic for the stuck state, for example by reducing the dependence on scheduler timing or adding a focused processor-level test that drives the empty-output/queued-sibling state, or provide equivalent reproducible evidence.
There was a problem hiding this comment.
Done in 5b66cf7. Removed the timing-dependent functional test 04305 and added a deterministic processor-level gtest gtest_buffered_shard_by_hash_transform (your preferred option).
The gtest builds the exact stuck state: SourceFromChunks with every row routed to the last shard -> BufferedShardByHashTransform (2 shards) -> ConcatProcessor -> draining sink. The key is chosen to map to the last shard via the same WeakHash32 + Lemire mapping the transform uses, so the first shard's output stays empty while the last shard's queue fills. Concat consumes inputs sequentially and blocks on the empty first input; the transform must finish that empty output once the input is exhausted so Concat advances.
Verified both directions in a local ASan build:
- Without the fix (pre-fix
prepare):PipelineExecutoraborts withPipeline stuck(the Logical error: Pipeline stuck still happens #106237 signature; the dumped graph showsBufferedShardByHashTransformPortFull andConcatNeedData on the empty input). - With the fix: passes, 10/10 runs.
Since there is no longer a changed functional test, the Bugfix validation gate no longer has a flaky regression test to run.
There was a problem hiding this comment.
The deterministic proof that closed this thread is no longer present in the current code. The only remaining regression is tests/queries/0_stateless/04342_sharded_aggregation_pipeline_stuck.sql:21-35, and that SQL-only form was later reported in the PR discussion as passing on all master Bugfix-validation binaries with Failed to reproduce the bug. So the original concern is back: after the gtest was removed, the PR again lacks a fail-on-base regression for the Bug Fix contract.
There was a problem hiding this comment.
On the current HEAD (master merged in today) both Bugfix validation (functional tests) jobs are green, so this no longer blocks the PR.
You are right that the base-binary behavior is unchanged: 04342 does not reproduce the deadlock on the master base build. The job log records Bug does not reproduce on this arch - bugfix validation N/A (Failed: 0, Passed: 1). The stuck state is scheduler- and routing-timing dependent, and master's ColumnsScatter/CRC32C routing rewrite changed the skew distribution, so an SQL-only test cannot be made to deterministically fail on the base binary here. That is why the harness now records N/A rather than the earlier Failed to reproduce the bug.
The deterministic fail-on-base proof was the processor-level gtest (counting sink, both directions verified). It was removed at the assignee's explicit request on 2026-06-16 ("Remove gtest and only keep the stateless sql test"), so I have not re-added it. 04342 stays as a regression guard: it passes with the fix (40/40 in the flaky check on this HEAD). Reinstating the gtest alongside it is the assignee's call.
`decodeDataType` for `BinaryTypeIndex::Interval` cast a `UInt8` byte from
the input buffer directly to `IntervalKind::Kind` without validating that
the value belonged to the enum (valid range `0x00`..`0x0A`). Anything in
`0x0B`..`0xFF` produced a `DataTypeInterval` whose internal `kind` was
out of range, after which `IntervalKind::toString` returned
`magic_enum::enum_name` on an unknown value. On libcxx that returns
`std::string_view{nullptr, 0}`, and `SerializationInterval::getHash` then
fed that null pointer into `SipHash::update`, where the unrolled loop
computed `data + 8` on a null pointer (UB per `[expr.add]/4`). UBSan
flagged it via `-fsanitize=pointer-overflow` (STID 3285-3bba).
The primary fix is at the deserialization site: reject any byte above
`IntervalKind::Kind::Year` with `INCORRECT_DATA`. A regression test
`04307_03285_3bba_interval_binary_encoding_invalid_kind` feeds three
malformed bytes (`0x0b`, `0x7f`, `0xff`) through
`RowBinaryWithNamesAndTypes` with
`input_format_binary_decode_types_in_binary_format` and asserts each is
rejected.
The previous `SipHash::update(nullptr, 0)` short-circuit is kept as
defense-in-depth. The accompanying comment is shortened, and the
explanatory blocks in `gtest_sip_hash.cpp` plus the `ASSERT_EQ` on
`std::string_view` internals are removed per review feedback.
CI report:
https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=106251&sha=874c4f0fc6fa05c7726121221bd18e4185eda64e&name_0=PR&name_1=AST%20fuzzer%20%28arm_asan_ubsan%29
Related: ClickHouse#106251
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…l gtest The functional test 04305_sharded_aggregation_pipeline_stuck_106237 depended on scheduler timing to hit the empty-output/queued-sibling state, so it passed on master HEAD and the Bugfix validation gate reported "Failed to reproduce the bug". Remove it and add gtest_buffered_shard_by_hash_transform instead. The gtest drives the deadlock directly: SourceFromChunks (all rows routed to the last shard) -> BufferedShardByHashTransform -> ConcatProcessor -> sink. Concat consumes its inputs sequentially and waits on the empty first input; the transform must finish that empty output once the input is exhausted so Concat advances to the last shard's queued data. Without the fix the empty output is never finished and PipelineExecutor reports "Pipeline stuck"; with the fix the pipeline drains. The test is deterministic: the key is chosen to route to the last shard via the same WeakHash32 + Lemire mapping the transform uses. Closes: ClickHouse#106237 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
The failures of They were introduced by #104690 ("Add #104690 was merged in violation of the ClickHouse team rules: its own CI already showed these two tests failing (10 times between May 12 and June 1) before it was merged. Please update your branch to pick up the revert; the tests should pass again. |
Push more than MAX_QUEUE_LENGTH skewed chunks so the loaded shard's queue overshoots the soft cap while the sibling port is empty and asking. This covers the `any_queue_at_capacity && has_pushable_empty_port` bypass branch in addition to the empty-output finish path. Replace the discarding sink with a counting sink and assert every row reached it, proving the queued chunks drained through `Concat` rather than being dropped. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Thanks @alexey-milovidov. The branch already picked up the revert: the merge in c4152b5 includes #106365 (3930f8a, the revert of #104690), so the Also pushed f8c10c4 addressing the two review-bot notes on the gtest: it now drives 12 skewed chunks (> the soft cap) to exercise the cap-bypass path, and uses a counting sink asserting all 768 rows drain through |
CI ledger — covered SHA:
|
| Check | Test | Disposition |
|---|---|---|
Stateless tests (amd_tsan, parallel, 1/2) |
00175_obfuscator_schema_inference |
flaky chronic trunk — task 2026-06-02-ci-00175obfuscatorschemainference-ab |
Unit tests (asan_ubsan) |
(check-level) | infra/chronic UBSan UntrackedMemoryCounter — task 2026-06-03-ci-p1-chronic-ubsan-in-untrackedmemoryc |
No PR-caused failures. Awaiting human review.
Session: cron:our-pr-ci-monitor:20260604-213000
nihalzp
left a comment
There was a problem hiding this comment.
I do not like that it can lead to unbounded memory usage and MAX_QUEUE_LENGTH is not enforcing at all. Also, add the reproducing query as a stateless test.
|
@nihalzp Thanks. Memory bound: agreed the soft cap is best-effort today. Under hash skew (all rows to one shard while the sibling port is asking) it is bypassed, so memory is bounded by Which direction do you prefer? I lean (a) if you want it enforced in-PR, since it is the only way to satisfy both for a sequential consumer. Stateless test: will add the reproducing query back. One caveat: when it last ran under the |
|
Remove gtest and only keep the stateless sql test. |
Per reviewer request, drop the gtest and keep only the SQL stateless test. The earlier concern was that the SQL form passed on master HEAD and tripped Bugfix validation's "Failed to reproduce the bug". That was a different test variant; the reproducer below (enable_sharding_aggregator = 1 + UNION ALL + max_streams_for_union_step = 1, three low-cardinality keys) deadlocks deterministically on master HEAD and passes with the fix. Verified locally: 3/3 "Pipeline stuck" aborts without the fix, 5/5 OK with it. no-random-merge-tree-settings is set because the deadlock is independent of the on-disk layout (verified across index_granularity 1/8192 and wide/compact parts); the tag only avoids unrelated CREATE TABLE BAD_ARGUMENTS failures from randomized settings such as part_minmax_index_columns. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
@nihalzp Done in e29d6cc: removed the gtest, added stateless test The SQL reproducer is deterministic after all (the earlier "Failed to reproduce the bug" was a different, timing-dependent variant). On master HEAD the query in the PR description aborts with On the memory bound: still your call. I did not re-add a hard cap because the review bot showed it re-introduces the same stuck pipeline under skew (the capped shard is not the one |
Pre-PR validation gate
Session id: cron:clickhouse-worker-slot-0:20260616-125600 |
The regression test pinned max_threads=5. That reliably reproduced the
Pipeline-stuck deadlock on the branch's older merge-base, but master has
since rewritten BufferedShardByHashTransform's shard routing (WeakHash32 +
Lemire fastrange -> computeHashInto/CRC32C + mapToRange, the ColumnsScatter
work). Under the new routing, keys 0/1/2 distribute differently and the
deadlock is only ~10% likely at max_threads=5, so Bugfix validation ran the
test once per master binary and saw it pass ("Failed to reproduce the bug").
Measured the deadlock rate against the exact master binaries on the merged
tree (current master routing): 0/12 at max_threads<=3, ~1/10 at 5, 11/12 at
8, 24/24 at 16, 12/12 at 32. More threads widen the sharded fan-out, which
guarantees the sequentially-activated ConcatProcessor demands an empty shard
under the skewed keys - the exact stuck state. Pinning max_threads=16 makes
the repro deterministic again.
Validated on the merged tree (master routing + this fix):
- WITHOUT the prepare() fix: clickhouse-test FAILs on the first run
(Pipeline stuck, server aborts).
- WITH the fix: 10/10 OK with no randomization, 30/30 OK with base settings
randomization.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Pre-PR validation gate
Session id: cron:clickhouse-worker-slot-1:20260616-143700 |
|
@nihalzp Pushed I merged current master into the branch and remeasured against the exact master binary: 0/12 deadlocks at On |
|
@nihalzp The gtest is removed and only the stateless SQL test One blocker needs your call. Two ways to green it, both keep your SQL test:
I lean to the first. Which do you want? Separately, still your call on the |
…-buffered-shard-by-hash-pipeline-stuck
LLVM Coverage Report
Changed lines: Changed C/C++ lines covered: 38/41 (92.68%) · Uncovered code |
CI finish ledger — 7f06694CI is complete on this head (only the private
Session id: cron:our-pr-ci-monitor:20260702-220000 |
CI finish ledger — 7f06694Every failure below has an owner. Only No PR-caused failure on this SHA. The only real leaf failure (Stress arm_msan hung check) is owned by the merged hung-check fixing PRs #108212 / #105905; the freshly-merged-master branch (7f06694) already picks both up. Bugfix-validation "FAIL" rows are the required master-HEAD reproduction proving the regression test works. Session id: cron:our-pr-ci-monitor:20260703-013000 |

BufferedShardByHashTransform.prepare()could deadlock when a downstreamConcatProcessor(e.g. fromnarrowPipeon aUNION ALLwithmax_streams_for_union_steplower than the pipeline width) activated its inputs sequentially.Two bugs in
prepare:preparenever finished that output port. The downstream waited forever for a finish signal, and the queued chunks on the other shards never drained becauseConcatnever moved on.MAX_QUEUE_LENGTHsoft cap,preparereturnedPortFulleven if a sibling port had an empty queue andcanPush() == true. The asking sibling could never receive data, soConcatnever advanced.Fix: finish empty-queue ports eagerly once input is exhausted (and no chunk is pending), and bypass the cap-induced
PortFullwhen at least one empty output port hascanPush() == true.Reproducer (deadlocks reliably on the first run on master; passes with the fix):
Closes #106237. Related to #104233 (which introduced
BufferedShardByHashTransform).Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Fix
Logical error: 'Pipeline stuck'in queries that useenable_sharding_aggregator = 1together with aUNION ALLandmax_streams_for_union_stepsmaller than the pipeline width.BufferedShardByHashTransformnow finishes empty-queue output ports as soon as its input is exhausted, and keeps pulling input when an empty port has downstream demand even if a sibling queue hit the back-pressure cap.Documentation entry for user-facing changes