KAFKA-20703: IQv2 TimestampedKeyWithHeadersQuery for headers-aware key-value stores (KIP-1356)#22666
KAFKA-20703: IQv2 TimestampedKeyWithHeadersQuery for headers-aware key-value stores (KIP-1356)#22666Jess668 wants to merge 7 commits into
Conversation
Proof-of-concept for KIP-1356, which exposes record headers through Interactive Queries (IQv2) for KIP-1271 headers-aware state stores. This PoC covers only the first of all proposed query types, TimestampedKeyWithHeadersQuery (parallel of TimestampedKeyQuery): - New @evolving query class org.apache.kafka.streams.query.TimestampedKeyWithHeadersQuery, returning ValueTimestampHeaders<V> instead of ValueAndTimestamp<V>. - Handler in MeteredTimestampedKeyValueStoreWithHeaders that mirrors the existing runTimestampedKeyQuery (serialize key, forward a raw KeyQuery down the wrapper chain, deserialize the bytes) but keeps the full ValueTimestampHeaders instead of stripping the headers. - Wired RocksDBTimestampedStoreWithHeaders.query() to serve raw KeyQuery via StoreQueryUtils.handleBasicQueries. KIP-1271 had hardwired this layer to UNKNOWN_QUERY_TYPE, which makes the forwarded query fail once the record cache is flushed (a cache miss falls through to this layer). The metered store still performs the header-aware deserialization; other query types remain unsupported. - End-to-end integration test TimestampedKeyWithHeadersQueryIntegrationTest (caching disabled, so every query provably reaches the RocksDB layer) asserting value/timestamp/headers round-trip incl. empty-headers and tombstone cases, plus the unknown-query-type failure against a non-headers store. The remaining three query types (TimestampedRangeWithHeadersQuery, WindowKeyWithHeadersQuery, WindowRangeWithHeadersQuery) are out of scope for this PoC.
Introduce org.apache.kafka.streams.processor.api.ReadOnlyRecord, a read-only view of a record's key/value/timestamp/headers, and make the existing PAPI Record implement it. This is the result type returned by the headers-aware IQv2 query types added by KIP-1356.
- Add the headers-aware point query TimestampedKeyWithHeadersQuery, the parallel of TimestampedKeyQuery, returning a ReadOnlyRecord<K, V> carrying key/value/timestamp/headers (rather than value-and-timestamp only). - MeteredTimestampedKeyValueStoreWithHeaders handles it by forwarding a raw byte-level KeyQuery to the wrapped store and deserializing the stored ValueTimestampHeaders into a Record (a ReadOnlyRecord); an absent or tombstoned key yields a null result. - Remove the query(...) override on RocksDBTimestampedStoreWithHeaders, which returned UNKNOWN_QUERY_TYPE for everything. The store now inherits RocksDBStore.query(), dispatching basic queries via StoreQueryUtils.handleBasicQueries. This lets the new query reach the persistent layer and restores build-path parity for the existing query types (KeyQuery, RangeQuery, ...), which previously succeeded against adapter-built header stores but failed against natively-built ones.
skipCache() on KeyQuery/TimestampedKeyQuery (and the new TimestampedKeyWithHeadersQuery) was a no-op: the metered query handlers rebuilt a fresh raw KeyQuery via KeyQuery.withKey(...) and dropped the flag, so it never reached CachingKeyValueStore -- the only layer that reads isSkipCache(). Propagate isSkipCache() onto the forwarded raw KeyQuery in MeteredKeyValueStore, MeteredTimestampedKeyValueStore, and MeteredTimestampedKeyValueStoreWithHeaders. For the header store, also remove the CachingKeyValueStoreWithHeaders.query() override, which forwarded every query straight to the wrapped store and never consulted the cache. Inheriting CachingKeyValueStore.query() makes point queries consult the record cache (fixing read-your-writes for not-yet-flushed writes) and lets the propagated skipCache flag be honored; the cached value bytes are the serialized ValueTimestampHeaders, which the metered layer deserializes.
- ReadOnlyRecordTest: Record is assignable to ReadOnlyRecord and exposes the same key/value/timestamp/headers (non-null empty headers; null key/value tolerated). - skipCache propagation: MeteredKeyValueStore, MeteredTimestampedKeyValueStore and MeteredTimestampedKeyValueStoreWithHeaders forward isSkipCache() onto the raw KeyQuery for KeyQuery/TimestampedKeyQuery/TimestampedKeyWithHeadersQuery. - RocksDBTimestampedStoreWithHeadersTest: the native store now serves KeyQuery and RangeQuery (previously UNKNOWN_QUERY_TYPE); a genuinely unsupported query still returns UNKNOWN_QUERY_TYPE. - TimestampedKeyValueStoreBuilderWithHeadersTest: consolidated IQv2 query tests, parameterized over build path (native/adapter/in-memory) x caching, plus native-vs-adapter result parity; dropped the duplicated per-store setup and the mock-cache helper. - TimestampedKeyWithHeadersQueryIntegrationTest: fix the stale ValueTimestampHeaders result type to ReadOnlyRecord (it no longer compiled against the query type) and add a caching-enabled cache-hit (read-your-writes) case.
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks @Jess668. Could you please define ReeadOnlyRecord and the consequent changes in another PR? Thanks. I left some comments
There was a problem hiding this comment.
This propagates skipCache for every plain key-value store, not just header-aware ones — skipCache() was silently a no-op on KeyQuery here before, so this changes read-your-writes behavior PR-wide (same applies to the equivalent change in MeteredTimestampedKeyValueStore). It looks like a genuine fix, but it reaches beyond the "headers-aware stores" scope of KIP-1356.
There was a problem hiding this comment.
I will propagate skipCache only in the new runTimestampedKeyWithHeadersQuery handler (a single occurrence), and reverted the changes to MeteredKeyValueStore/MeteredTimestampedKeyValueStore (and dropped it from the header store's existing-query handlers).
| // A null wrapper means the key is absent or tombstoned, which we surface as a null result. | ||
| final ReadOnlyRecord<K, V> record = valueTimestampHeaders == null | ||
| ? null | ||
| : new Record<>( |
There was a problem hiding this comment.
new Record<>(...) throws StreamsException if valueTimestampHeaders.timestamp() is negative, so a corrupted/unexpected stored timestamp surfaces as an exception propagating out of query() rather than as a failed QueryResult. Is a negative stored timestamp actually impossible here? If so a brief comment would help; if not, consider mapping it to a failed result instead.
| // layer can honor it; the result bytes are the serialized ValueTimestampHeaders, which we | ||
| // deserialize below to recover value, timestamp, and headers. | ||
| KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers())); | ||
| if (typedKeyQuery.isSkipCache()) { |
There was a problem hiding this comment.
The if (typedKeyQuery.isSkipCache()) { rawKeyQuery = rawKeyQuery.skipCache(); } block is now repeated across ~5 handler methods (here and in MeteredKeyValueStore / MeteredTimestampedKeyValueStore). Consider extracting a small helper that builds the raw KeyQuery and applies skipCache, to keep the propagation in one place.
There was a problem hiding this comment.
I'll handle the general fix in a separate PR after this one merges: honor skipCache for KeyQuery/TimestampedKeyQuery across the metered key-value stores, with the repeated propagation collapsed into a single shared helper. Will file a JIRA and link it here.
There was a problem hiding this comment.
forwardedRawKeyQuery(...) is copy-pasted verbatim into three test classes (MeteredKeyValueStoreTest, MeteredTimestampedKeyValueStoreTest, and here). Consider moving it to a shared test util to avoid the duplication.
There was a problem hiding this comment.
Came from the same cross-cutting skipCache change. Scoping it to the new query handler removed the other two copies, so forwardedRawKeyQuery(...) is now only here. I'll add the shared test util (with the production helper) in the follow-up PR that fixes KeyQuery/TimestampedKeyQuery skipCache across the metered stores.
Per review: propagating skipCache for the existing KeyQuery/TimestampedKeyQuery on the plain metered stores (MeteredKeyValueStore/MeteredTimestampedKeyValueStore) is a pre-existing, cross-cutting bug fix outside KIP-1356's scope. Revert those changes (and their tests), and drop skipCache from the header store's existing-query handlers (runKeyQuery/runTimestampedKeyQuery) too. This PR now propagates skipCache only in the new runTimestampedKeyWithHeadersQuery handler. The general KeyQuery/TimestampedKeyQuery fix, with the repeated propagation collapsed into one shared helper, will follow in a separate JIRA after this merges.

This PR is the first increment of KIP-1356 — the key/value point query — for
TimestampedKeyValueStoreWithHeaders. The range/window/session header query types described in theKIP will follow in subsequent PRs.
Depends on #22677. The ReadOnlyRecord change is reviewed in #22677; this PR will be rebased to drop it once #22677 merges.
Summary
TimestampedKeyWithHeadersQuery<K, V>(new,@Evolving,org.apache.kafka.streams.query) —the headers-aware parallel of
TimestampedKeyQuery, returning aReadOnlyRecordthat carries thekey, value, timestamp, and headers.
RocksDBTimestampedStoreWithHeadersno longeroverrides
query(...)to returnUNKNOWN_QUERY_TYPE; it inheritsRocksDBStore.query(), so theexisting query types (
KeyQuery,RangeQuery, …) behave identically on both build paths.CachingKeyValueStoreWithHeadersnolonger overrides
query(...)to bypass the cache; it inheritsCachingKeyValueStore.query(), sopoint queries consult the record cache (read-your-writes) and honor
skipCache.skipCacheis honored for the new query.MeteredTimestampedKeyValueStoreWithHeadersforwardsisSkipCache()fromTimestampedKeyWithHeadersQueryonto the rawKeyQueryit builds, so the caching layer can honor it. (MakingskipCache()work for the existingKeyQuery/TimestampedKeyQueryon the metered stores is a pre-existing, cross-cutting no-op fix — out of scope here, to be addressed in a follow-up JIRA.)The metered header store services
TimestampedKeyWithHeadersQueryby forwarding a raw byte-levelKeyQueryto the wrapped store and deserializing the storedValueTimestampHeadersinto aRecord(a
ReadOnlyRecord); an absent or tombstoned key yields anullresult.Testing
MeteredTimestampedKeyValueStoreWithHeadersTest—skipCacheis propagated to the forwardedKeyQueryforTimestampedKeyWithHeadersQuery.RocksDBTimestampedStoreWithHeadersTest— the native store servesKeyQueryandRangeQuery(previously
UNKNOWN_QUERY_TYPE); a genuinely unsupported query still returnsUNKNOWN_QUERY_TYPE.TimestampedKeyValueStoreBuilderWithHeadersTest— IQv2 query handling parameterized over build path(native / adapter / in-memory) × caching on/off, plus a native-vs-adapter result-parity test.
TimestampedKeyWithHeadersQueryIntegrationTest— end-to-end value/timestamp/headers round-trip,empty-headers, tombstone/absent,
UNKNOWN_QUERY_TYPEagainst a non-headers store, and acaching-enabled cache-hit (read-your-writes) case.
Reviewers: Alieh Saeedi asaeedi@confluent.io