KAFKA-20703: IQv2 TimestampedKeyWithHeadersQuery for headers-aware key-value stores (KIP-1356) by Jess668 · Pull Request #22666 · apache/kafka · GitHub
Skip to content

KAFKA-20703: IQv2 TimestampedKeyWithHeadersQuery for headers-aware key-value stores (KIP-1356)#22666

Draft
Jess668 wants to merge 7 commits into
apache:trunkfrom
Jess668:kip-1356-timestamped-key-with-headers-query
Draft

KAFKA-20703: IQv2 TimestampedKeyWithHeadersQuery for headers-aware key-value stores (KIP-1356)#22666
Jess668 wants to merge 7 commits into
apache:trunkfrom
Jess668:kip-1356-timestamped-key-with-headers-query

Conversation

@Jess668

@Jess668 Jess668 commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

This PR is the first increment of KIP-1356 — the key/value point query — for
TimestampedKeyValueStoreWithHeaders. The range/window/session header query types described in the
KIP will follow in subsequent PRs.

Depends on #22677. The ReadOnlyRecord change is reviewed in #22677; this PR will be rebased to drop it once #22677 merges.

Summary

  1. TimestampedKeyWithHeadersQuery<K, V> (new, @Evolving, org.apache.kafka.streams.query) —
    the headers-aware parallel of TimestampedKeyQuery, returning a ReadOnlyRecord that carries the
    key, value, timestamp, and headers.
  2. Native header store serves basic IQv2 queries. RocksDBTimestampedStoreWithHeaders no longer
    overrides query(...) to return UNKNOWN_QUERY_TYPE; it inherits RocksDBStore.query(), so the
    existing query types (KeyQuery, RangeQuery, …) behave identically on both build paths.
  3. Header store caching participates in IQv2 point queries. CachingKeyValueStoreWithHeaders no
    longer overrides query(...) to bypass the cache; it inherits CachingKeyValueStore.query(), so
    point queries consult the record cache (read-your-writes) and honor skipCache.
  4. skipCache is honored for the new query. MeteredTimestampedKeyValueStoreWithHeaders forwards isSkipCache() from TimestampedKeyWithHeadersQuery onto the raw KeyQuery it builds, so the caching layer can honor it. (Making skipCache() work for the existing KeyQuery/TimestampedKeyQuery on the metered stores is a pre-existing, cross-cutting no-op fix — out of scope here, to be addressed in a follow-up JIRA.)

The metered header store services TimestampedKeyWithHeadersQuery by forwarding a raw byte-level
KeyQuery to the wrapped store and deserializing the stored ValueTimestampHeaders into a Record
(a ReadOnlyRecord); an absent or tombstoned key yields a null result.

Testing

  • MeteredTimestampedKeyValueStoreWithHeadersTestskipCache is propagated to the forwarded
    KeyQuery for TimestampedKeyWithHeadersQuery.
  • RocksDBTimestampedStoreWithHeadersTest — the native store serves KeyQuery and RangeQuery
    (previously UNKNOWN_QUERY_TYPE); a genuinely unsupported query still returns UNKNOWN_QUERY_TYPE.
  • TimestampedKeyValueStoreBuilderWithHeadersTest — IQv2 query handling parameterized over build path
    (native / adapter / in-memory) × caching on/off, plus a native-vs-adapter result-parity test.
  • TimestampedKeyWithHeadersQueryIntegrationTest — end-to-end value/timestamp/headers round-trip,
    empty-headers, tombstone/absent, UNKNOWN_QUERY_TYPE against a non-headers store, and a
    caching-enabled cache-hit (read-your-writes) case.

Reviewers: Alieh Saeedi asaeedi@confluent.io

Jess668 added 5 commits June 24, 2026 16:07
Proof-of-concept for KIP-1356, which exposes record headers through
Interactive Queries (IQv2) for KIP-1271 headers-aware state stores.

This PoC covers only the first of all proposed query types,
TimestampedKeyWithHeadersQuery (parallel of TimestampedKeyQuery):

- New @evolving query class
  org.apache.kafka.streams.query.TimestampedKeyWithHeadersQuery, returning
  ValueTimestampHeaders<V> instead of ValueAndTimestamp<V>.
- Handler in MeteredTimestampedKeyValueStoreWithHeaders that mirrors the
  existing runTimestampedKeyQuery (serialize key, forward a raw KeyQuery
  down the wrapper chain, deserialize the bytes) but keeps the full
  ValueTimestampHeaders instead of stripping the headers.
- Wired RocksDBTimestampedStoreWithHeaders.query() to serve raw KeyQuery
  via StoreQueryUtils.handleBasicQueries. KIP-1271 had hardwired this layer
  to UNKNOWN_QUERY_TYPE, which makes the forwarded query fail once the
  record cache is flushed (a cache miss falls through to this layer). The
  metered store still performs the header-aware deserialization; other
  query types remain unsupported.
- End-to-end integration test TimestampedKeyWithHeadersQueryIntegrationTest
  (caching disabled, so every query provably reaches the RocksDB layer)
  asserting value/timestamp/headers round-trip incl. empty-headers and
  tombstone cases, plus the unknown-query-type failure against a
  non-headers store.

The remaining three query types (TimestampedRangeWithHeadersQuery,
WindowKeyWithHeadersQuery, WindowRangeWithHeadersQuery) are out of scope
for this PoC.
Introduce org.apache.kafka.streams.processor.api.ReadOnlyRecord, a read-only view of a record's key/value/timestamp/headers, and make the existing PAPI Record implement it. This is the result type returned by the headers-aware IQv2 query types added by KIP-1356.
- Add the headers-aware point query TimestampedKeyWithHeadersQuery, the parallel of TimestampedKeyQuery, returning a ReadOnlyRecord<K, V> carrying key/value/timestamp/headers (rather than value-and-timestamp only).

- MeteredTimestampedKeyValueStoreWithHeaders handles it by forwarding a raw byte-level KeyQuery to the wrapped store and deserializing the stored ValueTimestampHeaders into a Record (a ReadOnlyRecord); an absent or tombstoned key yields a null result.

- Remove the query(...) override on RocksDBTimestampedStoreWithHeaders, which returned UNKNOWN_QUERY_TYPE for everything. The store now inherits RocksDBStore.query(), dispatching basic queries via StoreQueryUtils.handleBasicQueries. This lets the new query reach the persistent layer and restores build-path parity for the existing query types (KeyQuery, RangeQuery, ...), which previously succeeded against adapter-built header stores but failed against natively-built ones.
skipCache() on KeyQuery/TimestampedKeyQuery (and the new TimestampedKeyWithHeadersQuery) was a no-op: the metered query handlers rebuilt a fresh raw KeyQuery via KeyQuery.withKey(...) and dropped the flag, so it never reached CachingKeyValueStore -- the only layer that reads isSkipCache(). Propagate isSkipCache() onto the forwarded raw KeyQuery in MeteredKeyValueStore, MeteredTimestampedKeyValueStore, and MeteredTimestampedKeyValueStoreWithHeaders.

For the header store, also remove the CachingKeyValueStoreWithHeaders.query() override, which forwarded every query straight to the wrapped store and never consulted the cache. Inheriting CachingKeyValueStore.query() makes point queries consult the record cache (fixing read-your-writes for not-yet-flushed writes) and lets the propagated skipCache flag be honored; the cached value bytes are the serialized ValueTimestampHeaders, which the metered layer deserializes.
- ReadOnlyRecordTest: Record is assignable to ReadOnlyRecord and exposes the same key/value/timestamp/headers (non-null empty headers; null key/value tolerated).
- skipCache propagation: MeteredKeyValueStore, MeteredTimestampedKeyValueStore and MeteredTimestampedKeyValueStoreWithHeaders forward isSkipCache() onto the raw KeyQuery for KeyQuery/TimestampedKeyQuery/TimestampedKeyWithHeadersQuery.
- RocksDBTimestampedStoreWithHeadersTest: the native store now serves KeyQuery and RangeQuery (previously UNKNOWN_QUERY_TYPE); a genuinely unsupported query still returns UNKNOWN_QUERY_TYPE.
- TimestampedKeyValueStoreBuilderWithHeadersTest: consolidated IQv2 query tests, parameterized over build path (native/adapter/in-memory) x caching, plus native-vs-adapter result parity; dropped the duplicated per-store setup and the mock-cache helper.
- TimestampedKeyWithHeadersQueryIntegrationTest: fix the stale ValueTimestampHeaders result type to ReadOnlyRecord (it no longer compiled against the query type) and add a caching-enabled cache-hit (read-your-writes) case.
@github-actions github-actions Bot added triage PRs from the community streams labels Jun 24, 2026

@aliehsaeedii aliehsaeedii left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Jess668. Could you please define ReeadOnlyRecord and the consequent changes in another PR? Thanks. I left some comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This propagates skipCache for every plain key-value store, not just header-aware ones — skipCache() was silently a no-op on KeyQuery here before, so this changes read-your-writes behavior PR-wide (same applies to the equivalent change in MeteredTimestampedKeyValueStore). It looks like a genuine fix, but it reaches beyond the "headers-aware stores" scope of KIP-1356.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will propagate skipCache only in the new runTimestampedKeyWithHeadersQuery handler (a single occurrence), and reverted the changes to MeteredKeyValueStore/MeteredTimestampedKeyValueStore (and dropped it from the header store's existing-query handlers).

// A null wrapper means the key is absent or tombstoned, which we surface as a null result.
final ReadOnlyRecord<K, V> record = valueTimestampHeaders == null
? null
: new Record<>(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new Record<>(...) throws StreamsException if valueTimestampHeaders.timestamp() is negative, so a corrupted/unexpected stored timestamp surfaces as an exception propagating out of query() rather than as a failed QueryResult. Is a negative stored timestamp actually impossible here? If so a brief comment would help; if not, consider mapping it to a failed result instead.

// layer can honor it; the result bytes are the serialized ValueTimestampHeaders, which we
// deserialize below to recover value, timestamp, and headers.
KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers()));
if (typedKeyQuery.isSkipCache()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if (typedKeyQuery.isSkipCache()) { rawKeyQuery = rawKeyQuery.skipCache(); } block is now repeated across ~5 handler methods (here and in MeteredKeyValueStore / MeteredTimestampedKeyValueStore). Consider extracting a small helper that builds the raw KeyQuery and applies skipCache, to keep the propagation in one place.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll handle the general fix in a separate PR after this one merges: honor skipCache for KeyQuery/TimestampedKeyQuery across the metered key-value stores, with the repeated propagation collapsed into a single shared helper. Will file a JIRA and link it here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forwardedRawKeyQuery(...) is copy-pasted verbatim into three test classes (MeteredKeyValueStoreTest, MeteredTimestampedKeyValueStoreTest, and here). Consider moving it to a shared test util to avoid the duplication.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Came from the same cross-cutting skipCache change. Scoping it to the new query handler removed the other two copies, so forwardedRawKeyQuery(...) is now only here. I'll add the shared test util (with the production helper) in the follow-up PR that fixes KeyQuery/TimestampedKeyQuery skipCache across the metered stores.

Jess668 added 2 commits June 25, 2026 13:15
Per review: propagating skipCache for the existing KeyQuery/TimestampedKeyQuery on the plain metered stores (MeteredKeyValueStore/MeteredTimestampedKeyValueStore) is a pre-existing, cross-cutting bug fix outside KIP-1356's scope. Revert those changes (and their tests), and drop skipCache from the header store's existing-query handlers (runKeyQuery/runTimestampedKeyQuery) too. This PR now propagates skipCache only in the new runTimestampedKeyWithHeadersQuery handler. The general KeyQuery/TimestampedKeyQuery fix, with the repeated propagation collapsed into one shared helper, will follow in a separate JIRA after this merges.
Reference TimestampedKeyWithHeadersQuery with {@code} instead of {@link} so ReadOnlyRecord's javadoc does not depend on the query class being present (it can then stand alone, e.g. in a separate ReadOnlyRecord PR, without breaking :streams:javadoc).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants