Implement direct SELECT reading and kafka_commit_on_select for Kafka2 engine#100276
Conversation
…a2 engine The Kafka2 engine (with Keeper-based offset storage) previously threw NOT_IMPLEMENTED on direct SELECT queries. This adds support for: 1. Direct SELECT reading via a new `Kafka2Source` class that acquires consumers, polls messages using the `KeeperHandlingConsumer` API, and returns data as chunks - similar to how `KafkaSource` works for the original Kafka engine. 2. `kafka_commit_on_select` setting support - when enabled, offsets are committed to Keeper after a direct SELECT read completes. When disabled, the `OffsetGuard` rolls back and messages remain unconsumed. 3. `system.kafka_consumers` visibility was already implemented (the system table already handled `StorageKafka2` via template lambda), so the documentation was updated to remove that limitation. Also migrated `StorageKafka2::read` from the old `Pipe read(...)` API to the modern `void read(QueryPlan&, ...)` API, consistent with `StorageKafka`. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| } | ||
| catch (...) | ||
| { | ||
| LOG_WARNING(log, "Failed to acquire consumer for direct read: {}", getCurrentExceptionMessage(true)); |
There was a problem hiding this comment.
Why don't we let the exception propagate?
There was a problem hiding this comment.
This has been addressed in commit faf50b8c191 — acquireConsumer exceptions now propagate out of Kafka2Source::generateImpl to the caller.
Silently swallowing the exception and returning an empty chunk would give the user no data and no error, which is confusing. Let the exception propagate so the user gets a clear error message. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Now that Kafka2 supports direct SELECT reading, enable the direct read code paths in the existing parametrized tests: - `test_kafka_column_types`: tests ALIAS columns with direct read - `test_kafka_producer_consumer_separate_settings`: test parameter only - `test_kafka_unavailable`: tests direct read when broker is unavailable Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix incorrect dead-letter metadata: `.offset` was assigned `currentPartition()` instead of `currentOffset()`, corrupting dead-letter queue entries. - Fix broken `kafka_commit_on_select` for dead-letter reads: when all consumed messages went to the dead-letter queue (producing zero rows), the offset guard was discarded, preventing offset commits and causing the same malformed messages to be re-read indefinitely. - Remove unused `KafkaDirectReads` and `LOGICAL_ERROR` declarations. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove trailing quote in error context string for parse failures - Add `checkTimeLimit` to `Kafka2Source` direct-read loop, matching `KafkaSource` behavior, so SELECT has bounded latency even when messages keep yielding zero rows (e.g. tombstones/empty payloads) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| return {}; | ||
| } | ||
|
|
||
| offset_guard.emplace(std::move(*maybe_guard)); |
There was a problem hiding this comment.
Isn't the offset guard from the previous call gets overridden? If not, that because of max_block_size=1 each consumer will read only 1 rows.
There was a problem hiding this comment.
Yes — with max_block_size=1 (forced in makePipe) and the is_finished flag set right before the single pollConsumer call, each Kafka2Source polls at most once per query, so the offset_guard is assigned at most once and never overwritten. This matches the existing intent (block size of 1 is needed for LIMIT to work).
| size_t num_views = DatabaseCatalog::instance().getDependentViews(table_id).size(); | ||
| if (num_views > 0) | ||
| throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka2 with attached materialized views"); |
There was a problem hiding this comment.
This is not enough here. Currently StorageKafka2 doesn't do any checks on whether a consumer is used from multiple places (because we didn't have direct read, all threads are bound to a single consumer => there was no need for this). It is possible that we pass this check, then attach an MV before this direct read finishes. Then the same consumer will be use from multiple threads simultaneously. Please either protect the consumers (and ensure their fair poll) or introduce some sort of locking here to prevent an MV being attached while direct read is in progress.
There was a problem hiding this comment.
Addressed: makePipe now acquires consumers_mutex and atomically rejects when active_mv_streamers > 0 and increments active_direct_readers; symmetrically, threadFunc rejects MV streaming when active_direct_readers > 0. The check is also re-done under getDependentViews so that attaching an MV after the initial check is correctly serialized. See commits 5aa88a5d17f and 6d181ed6623.
|
|
||
| ProfileEvents::increment(ProfileEvents::KafkaDirectReads); | ||
|
|
||
| /// Always use all consumers at once, otherwise SELECT may not read messages from all partitions. |
There was a problem hiding this comment.
The same issue as above can be problematic for multiple direct reads at the same time. I think our storages can be read from multiple queries at the same time, but this won't work here.
There was a problem hiding this comment.
Addressed: acquireConsumer now waits on a condition variable until the consumer becomes free (or a configurable timeout, kafka_consumer_acquire_timeout_ms, elapses). Two concurrent direct SELECTs on the same table no longer race on the same KeeperHandlingConsumer. See commits 90d77f8e161 and c1b9bc0e5a7.
The `Kafka2Source` methods interact with Keeper via
`KeeperHandlingConsumer`, but were missing the required
`Coordination::setCurrentComponent` guard. This caused a
LOGICAL_ERROR exception ("Current component is empty") that
crashed the server during direct reads, cascading into failures
across all Kafka integration tests.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…letter bug - Add `mv_attached` atomic flag to `StorageKafka2` to prevent direct SELECTs from running concurrently with MV streaming, matching the `StorageKafka` (v1) pattern. - Make `acquireConsumer` wait (via condition variable) until the consumer at the requested index is free, preventing concurrent direct SELECTs from using the same consumer simultaneously. - Deduplicate `Kafka2Source::generateImpl`: instead of duplicating the entire poll/parse/virtual-column logic from `StorageKafka2::pollConsumer`, delegate to `pollConsumer` directly. This reduces `Kafka2Source.cpp` from ~350 lines to ~130 lines. - Fix pre-existing dead-letter queue bug in `StorageKafka2::pollConsumer` where `.offset` was incorrectly set to `msg_info.currentPartition()` instead of `msg_info.currentOffset()`. - Add `poll_max_block_size` parameter to `pollConsumer` so `Kafka2Source` can pass its own block size (1 for direct reads) without affecting MV streaming. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
After setting `shutdown_called = true`, notify all threads waiting on the condition variable so that any `acquireConsumer` calls blocked waiting for a free consumer will wake up and throw `ABORTED` instead of hanging indefinitely. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…fka2 Addresses unresolved review feedback about missing test coverage: the new test verifies that with `kafka_commit_on_select=1` offsets are committed after a direct SELECT (so a second SELECT returns nothing), and with `kafka_commit_on_select=0` offsets are rolled back (so a second SELECT re-reads the same messages). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
The only failing check is Root cause: this PR touches Filed a one-line CI fix to extend No code changes needed on this PR — review feedback from previous rounds is addressed and all other checks are green. |
|
There is a lot of changes, give me some time to process them. I will try to finish it today or latest tomorrow. |
| valid_consumer_indices.reserve(kafka_storage.consumers.size()); | ||
| for (size_t i = 0; i < kafka_storage.consumers.size(); ++i) | ||
| if (kafka_storage.consumers[i]) | ||
| valid_consumer_indices.push_back(i); | ||
|
|
||
| if (valid_consumer_indices.empty()) | ||
| throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "No Kafka consumers available for direct read"); | ||
|
|
||
| kafka_storage.active_direct_readers.fetch_add(valid_consumer_indices.size()); |
There was a problem hiding this comment.
Maybe it makes more sense to ensure all consumers are created. As you changed startup to resize the vector of consumers, maybe we can check in acquireConsumer if the consumer is created and create it if not. I think that would result in a simpler logic. The only disadvatage is acquiring a consumer might fail temporarily if zookeeper is not initialized yet. That should be fine in my opinion and shouldn't last long.
There was a problem hiding this comment.
Good idea. I'd prefer to leave this for a follow-up: the current pre-sized vector keeps stable indices and the failure mode is well-defined, and the lazy-create variant changes recovery semantics (each background reschedule would now retry consumer creation when Keeper / brokers were temporarily unavailable at startup). I want to land this PR with the smaller surface and revisit the simplification on its own.
…n up tests - Remove unused `broken`, `stalled` fields and `isStalled` getter in `Kafka2Source` — only written, never read externally. Rollback semantics now solely rely on `OffsetGuard`. - Clarify the `active_direct_readers` comment in `StorageKafka2.h` (it counts consumers in use by direct `SELECT`s, not the number of queries). - Drop the redundant `do_direct_read` parametrize column in three integration tests (always `True`), and execute the corresponding `SELECT` unconditionally. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The flaky integration test job collects changed test modules from the PR
diff and asks `get_parallel_sequential_tests_to_run` to match each one
against the parallel/sequential module lists. Without `no_strict`, a
changed test that has been filtered out (e.g. by the private fork's
`SKIP_LIST` in `ci/jobs/scripts/integration_tests_configs.py`) trips
`assert matched, f"Test [{test_arg}] not found"`, killing the whole job.
This is the same concern that already justifies `no_strict=True` for
targeted checks (a CIDB-recorded test may be deleted or renamed before
the job runs). Extend the relaxation to flaky checks so a SKIP_LIST'd
test is silently dropped — the private fork has an explicit empty-result
handler that turns the resulting empty test list into a `SKIPPED` job.
This unblocks `CH Inc sync` on PRs that modify any test in `SKIP_LIST`
(e.g. `test_kafka_bad_messages`), such as
ClickHouse#100276 — its sync PR
ClickHouse/clickhouse-private#53191 failed in
`Integration tests (amd_asan_ubsan, flaky)` with
`AssertionError: Test [test_kafka_bad_messages/test.py] not found`.
…rs_mutex` Move the `getDependentViews` check inside the `consumers_mutex` critical section in `ReadFromStorageKafka2::makePipe` so that "no MV attached" and "no MV streamer active" are observed atomically together with the registration of `active_direct_readers`. Previously the dependent-views check ran outside the mutex, leaving a narrow race: an `ATTACH MATERIALIZED VIEW` arriving between that check and the mutex acquisition could leave `active_mv_streamers` at `0` (the MV's first `threadFunc` iteration hasn't run yet), letting the direct read be admitted while an MV was already attached. With both checks under the mutex, that window closes — and after we register, any subsequent attach will see `active_direct_readers > 0` from `threadFunc` and defer streaming until the direct read completes. Addresses review feedback on `src/Storages/Kafka/StorageKafka2.cpp:436` in #100276.
Check `shutdown_called` before the consumer-index bounds check in
`StorageKafka2::acquireConsumer`. During shutdown, `cleanConsumers`
clears the `consumers` vector, so a late call from an already-built
direct-read source would otherwise trip the bounds check first and
surface a `LOGICAL_ERROR` ("Invalid consumer index") instead of the
expected `ABORTED` for a detached table.
This is a leak of an internal invariant error on what should be a
clean abort path.
Addresses review feedback on `src/Storages/Kafka/StorageKafka2.cpp:1300`
in #100276.
|
Pushed two follow-up commits addressing the remaining open review feedback from
Remaining red checks on the previous CI run are unrelated to this PR:
@groeneai, please investigate the AST fuzzer failure: https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=100276&sha=9a0417eb9f7d37169c8c8c2e2c186a44cc427880&name_0=PR&name_1=AST%20fuzzer%20%28arm_asan_ubsan%29 — the failing query is |
|
Merged latest
@groeneai, please investigate these failures: #100276 and provide fixes in separate PRs if they are not already in progress. |
|
The remaining Tracked at #104866. @groeneai, please investigate the failure: https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=100276&sha=2aafdd0c08012b220b8622af49aa0657ac85c9ea&name_0=PR&name_1=Stateless%20tests%20%28amd_msan%2C%20WasmEdge%2C%20parallel%2C%201%2F2%29 and provide a fix in a separate PR if not already in progress. |
|
@alexey-milovidov, reporting back on both directives. 1.
|
| Source | Reported type of t1.x post-join |
|---|---|
Analyzer (toTypeName(t1.x)) |
Nullable(Int32) |
| Planner (post-Join physical column) | Dynamic(max_types=19) |
countOrNull(t1.x) is bound by the analyzer with arg type Nullable(Int32) → Null combinator wraps it → AggregateFunctionNullUnary<false, true>::addBatchSinglePlace runs assert_cast<const ColumnNullable *>(columns[0]). At execution columns[0] is ColumnDynamic, so the cast aborts at src/AggregateFunctions/Combinators/AggregateFunctionNull.h:442.
Why the planner produces Dynamic for __table1.x: the cast_to_super lambda at src/Planner/PlannerJoinsLogical.cpp:223-228 registers
changed_types[arg->result_name] = &dag.addAlias(casted, arg->result_name);
i.e. it aliases the cast result back under the original column name. JoinStepLogical::addToNullableIfNeeded then uses changed_types to replace the __table1.x output node with the alias (same name, type Dynamic). For JOIN ON the same cast is added with an empty result_name (src/Planner/PlannerJoins.cpp:911), so it gets an auto-generated _CAST(...) name and the original column is preserved — verified: LEFT JOIN ... ON t1.x = t2.x with the same setting returns 3, 1 correctly.
Why simple types work but Dynamic exposes the bug: with Int8 → Int64 USING coercion the analyzer promotes t1.x to Int64 too — analyzer and planner agree. DataTypeDynamic::canBeInsideNullable() = false, so the analyzer keeps t1.x = Nullable(Int32) instead of promoting to Dynamic. Only Dynamic (and likely any canBeInsideNullable()=false type promoted by USING) creates the mismatch.
Scope: the bug is not OrNull-specific. Plain count(t1.x) also aborts with Bad cast from DB::ColumnDynamic to DB::ColumnNullable. Any aggregate on t1.x post JOIN USING with Dynamic aborts.
Fix direction. Narrow option preferred: in cast_to_super, skip the changed_types[arg->result_name] = alias insertion (and return the cast node itself) only when the common type satisfies !canBeInsideNullable() — i.e. exactly the gap PR #86358 left open behind the escape hatch. Keeps SQL-standard combined-column promotion for the existing simple-type coercions (Int8→Int64, etc.). Will follow up with a separate PR + regression test for countOrNull / count / sumOrNull / anyOrNull over LEFT|INNER JOIN ... USING(x) with Dynamic right-hand side. @Avogar — would appreciate your sanity check on whether the Dynamic-side t1.x should retain Nullable(Int32) post-USING (analyzer's current view) or be promoted to Dynamic (the planner's current view); the fix shape depends on that.
2. Stateless tests (amd_msan, WasmEdge, parallel) — 04039_merge_tree_snapshot_teardown_race
Confirmed sighting of the WasmEdge MSan family already tracked at #104866. Root cause attributed by @Algunenano (x86v3 + __msan_chain_origin + vzeroupper); his patch is in flight. No separate action needed.
|
Merged latest Previous CI on
@groeneai, please investigate the |
LLVM Coverage Report
Changed lines: 88.62% (218/246) | lost baseline coverage: 1 line(s) · Uncovered code |

Implement direct SELECT reading,
kafka_commit_on_selectsupport, and update documentation for the Kafka2 engine (with Keeper-based offset storage).The Kafka2 engine previously threw
NOT_IMPLEMENTEDon direct SELECT queries. This adds:Direct SELECT reading via a new
Kafka2Sourceclass that acquires consumers, polls messages using theKeeperHandlingConsumerAPI, and returns data as chunks — similar to howKafkaSourceworks for the original Kafka engine. Direct reads are blocked when materialized views are attached, consistent with the original Kafka engine behavior.kafka_commit_on_selectsetting — when enabled, offsets are committed to Keeper after a direct SELECT completes. When disabled, theOffsetGuardrolls back so messages remain unconsumed.system.kafka_consumersvisibility was already implemented (the system table already handledStorageKafka2), so the documentation was updated to remove that limitation.Also migrated
StorageKafka2::readfrom the oldPipe read(...)API to the modernvoid read(QueryPlan&, ...)API, consistent withStorageKafka.Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
The Kafka2 engine (experimental, with Keeper-based offset storage) now supports direct
SELECTqueries and thekafka_commit_on_selectsetting.Documentation entry for user-facing changes
Documentation updated in
docs/en/engines/table-engines/integrations/kafka.mdto remove the now-false limitations about direct reading andsystem.kafka_consumersvisibility.Version info
26.5.1.726