Implement direct SELECT reading and kafka_commit_on_select for Kafka2 engine by alexey-milovidov · Pull Request #100276 · ClickHouse/ClickHouse · GitHub
Skip to content

Implement direct SELECT reading and kafka_commit_on_select for Kafka2 engine#100276

Merged
alexey-milovidov merged 66 commits into
masterfrom
kafka2-direct-read
May 17, 2026
Merged

Implement direct SELECT reading and kafka_commit_on_select for Kafka2 engine#100276
alexey-milovidov merged 66 commits into
masterfrom
kafka2-direct-read

Conversation

@alexey-milovidov

@alexey-milovidov alexey-milovidov commented Mar 21, 2026

Copy link
Copy Markdown
Member

Implement direct SELECT reading, kafka_commit_on_select support, and update documentation for the Kafka2 engine (with Keeper-based offset storage).

The Kafka2 engine previously threw NOT_IMPLEMENTED on direct SELECT queries. This adds:

  1. Direct SELECT reading via a new Kafka2Source class that acquires consumers, polls messages using the KeeperHandlingConsumer API, and returns data as chunks — similar to how KafkaSource works for the original Kafka engine. Direct reads are blocked when materialized views are attached, consistent with the original Kafka engine behavior.

  2. kafka_commit_on_select setting — when enabled, offsets are committed to Keeper after a direct SELECT completes. When disabled, the OffsetGuard rolls back so messages remain unconsumed.

  3. system.kafka_consumers visibility was already implemented (the system table already handled StorageKafka2), so the documentation was updated to remove that limitation.

Also migrated StorageKafka2::read from the old Pipe read(...) API to the modern void read(QueryPlan&, ...) API, consistent with StorageKafka.

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

The Kafka2 engine (experimental, with Keeper-based offset storage) now supports direct SELECT queries and the kafka_commit_on_select setting.

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

Documentation updated in docs/en/engines/table-engines/integrations/kafka.md to remove the now-false limitations about direct reading and system.kafka_consumers visibility.

Version info

  • Merged into: 26.5.1.726

…a2 engine

The Kafka2 engine (with Keeper-based offset storage) previously threw
NOT_IMPLEMENTED on direct SELECT queries. This adds support for:

1. Direct SELECT reading via a new `Kafka2Source` class that acquires
   consumers, polls messages using the `KeeperHandlingConsumer` API,
   and returns data as chunks - similar to how `KafkaSource` works
   for the original Kafka engine.

2. `kafka_commit_on_select` setting support - when enabled, offsets
   are committed to Keeper after a direct SELECT read completes.
   When disabled, the `OffsetGuard` rolls back and messages remain
   unconsumed.

3. `system.kafka_consumers` visibility was already implemented
   (the system table already handled `StorageKafka2` via template
   lambda), so the documentation was updated to remove that
   limitation.

Also migrated `StorageKafka2::read` from the old `Pipe read(...)` API
to the modern `void read(QueryPlan&, ...)` API, consistent with
`StorageKafka`.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@clickhouse-gh

clickhouse-gh Bot commented Mar 21, 2026

Copy link
Copy Markdown
Contributor

@clickhouse-gh clickhouse-gh Bot added the pr-improvement Pull request with some product improvements label Mar 21, 2026
Comment thread src/Storages/Kafka/Kafka2Source.cpp Outdated
}
catch (...)
{
LOG_WARNING(log, "Failed to acquire consumer for direct read: {}", getCurrentExceptionMessage(true));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we let the exception propagate?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been addressed in commit faf50b8c191acquireConsumer exceptions now propagate out of Kafka2Source::generateImpl to the caller.

@alexey-milovidov alexey-milovidov marked this pull request as draft March 21, 2026 12:12
alexey-milovidov and others added 2 commits March 21, 2026 13:14
Silently swallowing the exception and returning an empty chunk would
give the user no data and no error, which is confusing. Let the
exception propagate so the user gets a clear error message.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Now that Kafka2 supports direct SELECT reading, enable the direct read
code paths in the existing parametrized tests:

- `test_kafka_column_types`: tests ALIAS columns with direct read
- `test_kafka_producer_consumer_separate_settings`: test parameter only
- `test_kafka_unavailable`: tests direct read when broker is unavailable

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Comment thread src/Storages/Kafka/Kafka2Source.cpp Outdated
Comment thread src/Storages/Kafka/Kafka2Source.cpp Outdated
- Fix incorrect dead-letter metadata: `.offset` was assigned
  `currentPartition()` instead of `currentOffset()`, corrupting
  dead-letter queue entries.

- Fix broken `kafka_commit_on_select` for dead-letter reads: when all
  consumed messages went to the dead-letter queue (producing zero rows),
  the offset guard was discarded, preventing offset commits and causing
  the same malformed messages to be re-read indefinitely.

- Remove unused `KafkaDirectReads` and `LOGICAL_ERROR` declarations.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Comment thread src/Storages/Kafka/Kafka2Source.cpp Outdated
Comment thread src/Storages/Kafka/Kafka2Source.cpp Outdated
alexey-milovidov and others added 2 commits March 22, 2026 10:31
- Remove trailing quote in error context string for parse failures
- Add `checkTimeLimit` to `Kafka2Source` direct-read loop, matching
  `KafkaSource` behavior, so SELECT has bounded latency even when
  messages keep yielding zero rows (e.g. tombstones/empty payloads)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Comment thread tests/integration/test_storage_kafka/test_batch_fast.py Outdated
Comment thread src/Storages/Kafka/Kafka2Source.cpp Outdated
return {};
}

offset_guard.emplace(std::move(*maybe_guard));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the offset guard from the previous call gets overridden? If not, that because of max_block_size=1 each consumer will read only 1 rows.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — with max_block_size=1 (forced in makePipe) and the is_finished flag set right before the single pollConsumer call, each Kafka2Source polls at most once per query, so the offset_guard is assigned at most once and never overwritten. This matches the existing intent (block size of 1 is needed for LIMIT to work).

Comment thread src/Storages/Kafka/Kafka2Source.cpp
Comment thread src/Storages/Kafka/StorageKafka2.cpp Outdated
Comment on lines +414 to +416
size_t num_views = DatabaseCatalog::instance().getDependentViews(table_id).size();
if (num_views > 0)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka2 with attached materialized views");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not enough here. Currently StorageKafka2 doesn't do any checks on whether a consumer is used from multiple places (because we didn't have direct read, all threads are bound to a single consumer => there was no need for this). It is possible that we pass this check, then attach an MV before this direct read finishes. Then the same consumer will be use from multiple threads simultaneously. Please either protect the consumers (and ensure their fair poll) or introduce some sort of locking here to prevent an MV being attached while direct read is in progress.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed: makePipe now acquires consumers_mutex and atomically rejects when active_mv_streamers > 0 and increments active_direct_readers; symmetrically, threadFunc rejects MV streaming when active_direct_readers > 0. The check is also re-done under getDependentViews so that attaching an MV after the initial check is correctly serialized. See commits 5aa88a5d17f and 6d181ed6623.

Comment thread src/Storages/Kafka/StorageKafka2.cpp Outdated

ProfileEvents::increment(ProfileEvents::KafkaDirectReads);

/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same issue as above can be problematic for multiple direct reads at the same time. I think our storages can be read from multiple queries at the same time, but this won't work here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed: acquireConsumer now waits on a condition variable until the consumer becomes free (or a configurable timeout, kafka_consumer_acquire_timeout_ms, elapses). Two concurrent direct SELECTs on the same table no longer race on the same KeeperHandlingConsumer. See commits 90d77f8e161 and c1b9bc0e5a7.

Comment thread src/Storages/Kafka/Kafka2Source.cpp Outdated
The `Kafka2Source` methods interact with Keeper via
`KeeperHandlingConsumer`, but were missing the required
`Coordination::setCurrentComponent` guard. This caused a
LOGICAL_ERROR exception ("Current component is empty") that
crashed the server during direct reads, cascading into failures
across all Kafka integration tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Comment thread src/Storages/Kafka/StorageKafka2.cpp Outdated
Comment thread src/Storages/Kafka/StorageKafka2.cpp Outdated
…letter bug

- Add `mv_attached` atomic flag to `StorageKafka2` to prevent direct SELECTs
  from running concurrently with MV streaming, matching the `StorageKafka` (v1)
  pattern.

- Make `acquireConsumer` wait (via condition variable) until the consumer at
  the requested index is free, preventing concurrent direct SELECTs from
  using the same consumer simultaneously.

- Deduplicate `Kafka2Source::generateImpl`: instead of duplicating the entire
  poll/parse/virtual-column logic from `StorageKafka2::pollConsumer`, delegate
  to `pollConsumer` directly. This reduces `Kafka2Source.cpp` from ~350 lines
  to ~130 lines.

- Fix pre-existing dead-letter queue bug in `StorageKafka2::pollConsumer` where
  `.offset` was incorrectly set to `msg_info.currentPartition()` instead of
  `msg_info.currentOffset()`.

- Add `poll_max_block_size` parameter to `pollConsumer` so `Kafka2Source` can
  pass its own block size (1 for direct reads) without affecting MV streaming.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Comment thread src/Storages/Kafka/StorageKafka2.cpp Outdated
After setting `shutdown_called = true`, notify all threads waiting on the
condition variable so that any `acquireConsumer` calls blocked waiting for
a free consumer will wake up and throw `ABORTED` instead of hanging
indefinitely.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@alexey-milovidov alexey-milovidov marked this pull request as ready for review March 24, 2026 19:29
alexey-milovidov and others added 2 commits March 26, 2026 23:00
…fka2

Addresses unresolved review feedback about missing test coverage: the new
test verifies that with `kafka_commit_on_select=1` offsets are committed
after a direct SELECT (so a second SELECT returns nothing), and with
`kafka_commit_on_select=0` offsets are rolled back (so a second SELECT
re-reads the same messages).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@alexey-milovidov

Copy link
Copy Markdown
Member Author

The only failing check is CH Inc sync, caused by Integration tests (amd_asan_ubsan, flaky) failing in the private sync PR https://github.com/ClickHouse/clickhouse-private/pull/53191 with:

AssertionError: Test [test_kafka_bad_messages/test.py] not found

Root cause: this PR touches tests/integration/test_kafka_bad_messages/test.py (the substr length adjustment in test_bad_messages_parsing_exception). In the private fork, test_kafka_bad_messages is on the SKIP_LIST in ci/jobs/scripts/integration_tests_configs.py, so get_optimal_test_batch strips it from the parallel/sequential module lists. The flaky-check code path passes no_strict=is_targeted_check to get_parallel_sequential_tests_to_run, so a SKIP_LIST'd changed test trips the strict assertion and kills the whole job — even though the private fork has an explicit empty-result handler ready to mark such a job SKIPPED.

Filed a one-line CI fix to extend no_strict to flaky checks: #104542. Once that lands, the sync re-runs and CH Inc sync here unblocks.

No code changes needed on this PR — review feedback from previous rounds is addressed and all other checks are green.

@antaljanosbenjamin

Copy link
Copy Markdown
Member

There is a lot of changes, give me some time to process them. I will try to finish it today or latest tomorrow.

Comment thread src/Storages/Kafka/Kafka2Source.h Outdated
Comment on lines +438 to +446
valid_consumer_indices.reserve(kafka_storage.consumers.size());
for (size_t i = 0; i < kafka_storage.consumers.size(); ++i)
if (kafka_storage.consumers[i])
valid_consumer_indices.push_back(i);

if (valid_consumer_indices.empty())
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "No Kafka consumers available for direct read");

kafka_storage.active_direct_readers.fetch_add(valid_consumer_indices.size());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes more sense to ensure all consumers are created. As you changed startup to resize the vector of consumers, maybe we can check in acquireConsumer if the consumer is created and create it if not. I think that would result in a simpler logic. The only disadvatage is acquiring a consumer might fail temporarily if zookeeper is not initialized yet. That should be fine in my opinion and shouldn't last long.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I'd prefer to leave this for a follow-up: the current pre-sized vector keeps stable indices and the failure mode is well-defined, and the lazy-create variant changes recovery semantics (each background reschedule would now retry consumer creation when Keeper / brokers were temporarily unavailable at startup). I want to land this PR with the smaller surface and revisit the simplification on its own.

Comment thread src/Storages/Kafka/StorageKafka2.h Outdated
Comment thread src/Storages/Kafka/Kafka2Source.h Outdated
Comment thread src/Storages/Kafka/Kafka2Source.h Outdated
Comment thread tests/integration/test_storage_kafka/test_batch_fast.py Outdated
Comment thread tests/integration/test_storage_kafka/test_batch_slow_1.py Outdated
…n up tests

- Remove unused `broken`, `stalled` fields and `isStalled` getter in `Kafka2Source` — only written, never read externally. Rollback semantics now solely rely on `OffsetGuard`.
- Clarify the `active_direct_readers` comment in `StorageKafka2.h` (it counts consumers in use by direct `SELECT`s, not the number of queries).
- Drop the redundant `do_direct_read` parametrize column in three integration tests (always `True`), and execute the corresponding `SELECT` unconditionally.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
huldarchen pushed a commit to huldarchen/ClickHouse that referenced this pull request May 12, 2026
The flaky integration test job collects changed test modules from the PR
diff and asks `get_parallel_sequential_tests_to_run` to match each one
against the parallel/sequential module lists. Without `no_strict`, a
changed test that has been filtered out (e.g. by the private fork's
`SKIP_LIST` in `ci/jobs/scripts/integration_tests_configs.py`) trips
`assert matched, f"Test [{test_arg}] not found"`, killing the whole job.

This is the same concern that already justifies `no_strict=True` for
targeted checks (a CIDB-recorded test may be deleted or renamed before
the job runs). Extend the relaxation to flaky checks so a SKIP_LIST'd
test is silently dropped — the private fork has an explicit empty-result
handler that turns the resulting empty test list into a `SKIPPED` job.

This unblocks `CH Inc sync` on PRs that modify any test in `SKIP_LIST`
(e.g. `test_kafka_bad_messages`), such as
ClickHouse#100276 — its sync PR
ClickHouse/clickhouse-private#53191 failed in
`Integration tests (amd_asan_ubsan, flaky)` with
`AssertionError: Test [test_kafka_bad_messages/test.py] not found`.
Comment thread src/Storages/Kafka/StorageKafka2.cpp
…rs_mutex`

Move the `getDependentViews` check inside the `consumers_mutex` critical
section in `ReadFromStorageKafka2::makePipe` so that "no MV attached" and
"no MV streamer active" are observed atomically together with the
registration of `active_direct_readers`.

Previously the dependent-views check ran outside the mutex, leaving a
narrow race: an `ATTACH MATERIALIZED VIEW` arriving between that check
and the mutex acquisition could leave `active_mv_streamers` at `0` (the
MV's first `threadFunc` iteration hasn't run yet), letting the direct
read be admitted while an MV was already attached. With both checks
under the mutex, that window closes — and after we register, any
subsequent attach will see `active_direct_readers > 0` from `threadFunc`
and defer streaming until the direct read completes.

Addresses review feedback on `src/Storages/Kafka/StorageKafka2.cpp:436`
in #100276.
Check `shutdown_called` before the consumer-index bounds check in
`StorageKafka2::acquireConsumer`. During shutdown, `cleanConsumers`
clears the `consumers` vector, so a late call from an already-built
direct-read source would otherwise trip the bounds check first and
surface a `LOGICAL_ERROR` ("Invalid consumer index") instead of the
expected `ABORTED` for a detached table.

This is a leak of an internal invariant error on what should be a
clean abort path.

Addresses review feedback on `src/Storages/Kafka/StorageKafka2.cpp:1300`
in #100276.
@alexey-milovidov

Copy link
Copy Markdown
Member Author

Pushed two follow-up commits addressing the remaining open review feedback from clickhouse-gh:

  • 929cfc24b6a — Move the getDependentViews check inside consumers_mutex in ReadFromStorageKafka2::makePipe. The previous code checked dependent views before taking the mutex, leaving a narrow race: ATTACH MATERIALIZED VIEW arriving between the check and the mutex acquisition could leave active_mv_streamers at 0 (the MV's first threadFunc iteration hasn't run yet), letting a direct read be admitted while an MV was already attached. Both checks are now done atomically; once we register as a direct reader, any subsequent attach will see active_direct_readers > 0 in threadFunc and defer streaming.
  • 9b012757cbf — Check shutdown_called before the index bounds check in StorageKafka2::acquireConsumer. During shutdown, cleanConsumers clears consumers, so a late call from an already-built direct-read source would otherwise trip the bounds check first and surface a LOGICAL_ERROR instead of the expected ABORTED.

Remaining red checks on the previous CI run are unrelated to this PR:

  • Tests/01710_projection_additional_filters — known flaky test, see Test 01710_projection_additional_filters is flaky #104219.
  • Tests/02801_backup_native_copy — single timeout; the automatic re-run with the same randomized settings passed (transient).
  • AST fuzzer (arm_asan_ubsan)Bad cast in AggregateFunctionNullUnary::addBatchSinglePlace (countOrNull + Dynamic + LEFT JOIN); not exercised by Kafka2 changes.

@groeneai, please investigate the AST fuzzer failure: https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=100276&sha=9a0417eb9f7d37169c8c8c2e2c186a44cc427880&name_0=PR&name_1=AST%20fuzzer%20%28arm_asan_ubsan%29 — the failing query is SELECT countOrNull(t1.x), sum(isNull(t1.x)) FROM nt AS t1 LEFT JOIN nt__fuzz_8 USING (x) with a Dynamic-typed column on the right side. Please file an issue or fix in a separate PR.

@alexey-milovidov

Copy link
Copy Markdown
Member Author

Merged latest master to refresh CI. The two previous failures were unrelated flaky tests, both confirmed via cross-PR failure history on play.clickhouse.com:

  • libFuzzer tests / select_parser_fuzzer — flaked on 24 other PRs on 2026-05-13 and 8 PRs on 2026-05-14. This is a SQL parser fuzzer, no Kafka involvement.
  • Stress test (arm_asan_ubsan, s3) / Hung check failed, possible deadlock found — the hung query is a CREATE TABLE ... ENGINE = MergeTree with random fuzz settings, no Kafka involvement; the same test has been flaking sporadically across many PRs.

@groeneai, please investigate these failures: #100276 and provide fixes in separate PRs if they are not already in progress.

@alexey-milovidov

Copy link
Copy Markdown
Member Author

The remaining Stateless tests (amd_msan, WasmEdge, parallel, 1/2) failure is the known flaky test 04039_merge_tree_snapshot_teardown_race — a MergeTree projection-teardown race test, no Kafka involvement. The CI rerun on the same SHA passed:

--- Randomized settings diagnosis ---
Step 1: Re-running with the same randomized settings (budget: 60s)...
  Runs: 1, Failed: 0, Passed: 1, Other: 0
Result: All reruns passed. The failure is not reproducible (likely a transient issue).

Tracked at #104866.

@groeneai, please investigate the failure: https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=100276&sha=2aafdd0c08012b220b8622af49aa0657ac85c9ea&name_0=PR&name_1=Stateless%20tests%20%28amd_msan%2C%20WasmEdge%2C%20parallel%2C%201%2F2%29 and provide a fix in a separate PR if not already in progress.

@groeneai

Copy link
Copy Markdown
Contributor

@alexey-milovidov, reporting back on both directives.

1. AST fuzzer (arm_asan_ubsan)Bad cast in AggregateFunctionNullUnary::addBatchSinglePlace (countOrNull + Dynamic + LEFT JOIN)

Reproduced locally on debug 26.5.1 (commit c1ccce21e7d).

Minimal deterministic reproducer:

CREATE TABLE nt (x Nullable(Int32)) ENGINE = MergeTree ORDER BY tuple() SETTINGS allow_nullable_key=1;
CREATE TABLE nt__fuzz_8 (x Dynamic(max_types = 19)) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO nt VALUES (1), (2), (NULL), (3);
INSERT INTO nt__fuzz_8 SELECT 1::Int32 UNION ALL SELECT 2::Int32 UNION ALL SELECT 4::Int32;

SELECT countOrNull(t1.x), sum(isNull(t1.x))
FROM nt AS t1 LEFT JOIN nt__fuzz_8 AS t2 USING (x)
SETTINGS allow_dynamic_type_in_join_keys = 1;
-- Logical error: 'Bad cast from type DB::IColumn const* to DB::ColumnNullable const*'

The allow_dynamic_type_in_join_keys = 1 setting is required — without it the three guards added in PR #86358 reject the query with ILLEGAL_COLUMN: JOIN on keys with Dynamic type is not supported. The CI fuzzer was setting this escape hatch.

Root cause: planner/analyzer type inconsistency for JOIN USING with Dynamic.

Source Reported type of t1.x post-join
Analyzer (toTypeName(t1.x)) Nullable(Int32)
Planner (post-Join physical column) Dynamic(max_types=19)

countOrNull(t1.x) is bound by the analyzer with arg type Nullable(Int32)Null combinator wraps it → AggregateFunctionNullUnary<false, true>::addBatchSinglePlace runs assert_cast<const ColumnNullable *>(columns[0]). At execution columns[0] is ColumnDynamic, so the cast aborts at src/AggregateFunctions/Combinators/AggregateFunctionNull.h:442.

Why the planner produces Dynamic for __table1.x: the cast_to_super lambda at src/Planner/PlannerJoinsLogical.cpp:223-228 registers

changed_types[arg->result_name] = &dag.addAlias(casted, arg->result_name);

i.e. it aliases the cast result back under the original column name. JoinStepLogical::addToNullableIfNeeded then uses changed_types to replace the __table1.x output node with the alias (same name, type Dynamic). For JOIN ON the same cast is added with an empty result_name (src/Planner/PlannerJoins.cpp:911), so it gets an auto-generated _CAST(...) name and the original column is preserved — verified: LEFT JOIN ... ON t1.x = t2.x with the same setting returns 3, 1 correctly.

Why simple types work but Dynamic exposes the bug: with Int8 → Int64 USING coercion the analyzer promotes t1.x to Int64 too — analyzer and planner agree. DataTypeDynamic::canBeInsideNullable() = false, so the analyzer keeps t1.x = Nullable(Int32) instead of promoting to Dynamic. Only Dynamic (and likely any canBeInsideNullable()=false type promoted by USING) creates the mismatch.

Scope: the bug is not OrNull-specific. Plain count(t1.x) also aborts with Bad cast from DB::ColumnDynamic to DB::ColumnNullable. Any aggregate on t1.x post JOIN USING with Dynamic aborts.

Fix direction. Narrow option preferred: in cast_to_super, skip the changed_types[arg->result_name] = alias insertion (and return the cast node itself) only when the common type satisfies !canBeInsideNullable() — i.e. exactly the gap PR #86358 left open behind the escape hatch. Keeps SQL-standard combined-column promotion for the existing simple-type coercions (Int8→Int64, etc.). Will follow up with a separate PR + regression test for countOrNull / count / sumOrNull / anyOrNull over LEFT|INNER JOIN ... USING(x) with Dynamic right-hand side. @Avogar — would appreciate your sanity check on whether the Dynamic-side t1.x should retain Nullable(Int32) post-USING (analyzer's current view) or be promoted to Dynamic (the planner's current view); the fix shape depends on that.


2. Stateless tests (amd_msan, WasmEdge, parallel)04039_merge_tree_snapshot_teardown_race

Confirmed sighting of the WasmEdge MSan family already tracked at #104866. Root cause attributed by @Algunenano (x86v3 + __msan_chain_origin + vzeroupper); his patch is in flight. No separate action needed.

@alexey-milovidov

Copy link
Copy Markdown
Member Author

Merged latest master to refresh CI.

Previous CI on 501cb7a012c had two failing checks:

@groeneai, please investigate the Build (arm_tidy) failure on the sync PR https://github.com/ClickHouse/clickhouse-private/pull/53191 (run https://github.com/ClickHouse/clickhouse-private/actions/runs/25965130599/job/76327380281): pull the actual clang-tidy / compile diagnostic from the private report and report back whether it points at code introduced by this PR (mostly src/Storages/Kafka/Kafka2Source.{cpp,h}, StorageKafka2.{cpp,h}, KafkaSource.cpp) or at unrelated master code. If unrelated, link the corresponding upstream fix; if related, provide the specific file/line and proposed patch.

@clickhouse-gh

clickhouse-gh Bot commented May 17, 2026

Copy link
Copy Markdown
Contributor

LLVM Coverage Report

Metric Baseline Current Δ
Lines 84.10% 84.20% +0.10%
Functions 91.40% 91.40% +0.00%
Branches 76.50% 76.60% +0.10%

Changed lines: 88.62% (218/246) | lost baseline coverage: 1 line(s) · Uncovered code

Full report · Diff report

@alexey-milovidov alexey-milovidov merged commit 7bdb9ea into master May 17, 2026
166 of 167 checks passed
@alexey-milovidov alexey-milovidov deleted the kafka2-direct-read branch May 17, 2026 23:23
@robot-clickhouse-ci-1 robot-clickhouse-ci-1 added the pr-synced-to-cloud The PR is synced to the cloud repo label May 17, 2026
@clickgapai

Copy link
Copy Markdown
Contributor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-improvement Pull request with some product improvements pr-synced-to-cloud The PR is synced to the cloud repo

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants