Fix row drop in parallel reads of Map primary key with bucketed serialization#104540
Fix row drop in parallel reads of Map primary key with bucketed serialization#104540groeneai wants to merge 7 commits into
Map primary key with bucketed serialization#104540Conversation
…ialization When a `Map` column is the primary key and the part uses `map_serialization_version_for_zero_level_parts = 'with_buckets'`, the primary key index and the data files end up storing the same logical row with different internal key orders: * The primary key index is written through `SerializationMap::serializeBinary`, which delegates to the basic nested `Array(Tuple(K, V))` serialization and preserves the in-memory positional order of keys. * The data files are written by `SerializationMap::serializeBinaryBulkWithMultipleStreams`, which hashes each key into a bucket and serializes each bucket independently. The read path (`deserializeBinaryBulkWithMultipleStreams` + `collectMapFromBuckets`) reassembles each row's keys in ascending bucket-index order, not insertion order. Under positional `ColumnMap::compareAt` these two representations are different values. `splitPartsWithRangesByPrimaryKey` uses the index for boundary computation but the splitter's downstream `FilterSortedStreamByRange` sees the bucket-order data, so rows that fall inside a layer's PK range under one representation can be dropped under the other. This is reproducible today via `merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability` (testing-only setting), and is also reachable from `FINAL` with `split_parts_ranges_into_intersecting_and_non_intersecting_final = 0`. Mark `Map` as not a safe primary-key type so the splitter falls into the single in-order merging-pipe path, which doesn't depend on this comparison. The fast path for non-`Map` primary keys is unchanged. Includes a stateless regression test covering both the `MergeTree`/non-`FINAL` injection path and the `ReplacingMergeTree` / `FINAL` path. Reported via flaky `01550_create_map_type` and confirmed during PR review of ClickHouse#104515 by @Algunenano and @alexey-milovidov.
|
cc @Algunenano @alexey-milovidov @CurtizJ @Avogar — engine fix for the parallel-reads-of-bucketed-Map-drop-rows issue we discussed during #104515. The fix is in The deeper issue — bucketed Regression test included; both directions verified via stash-revert; 50/50 iterations clean with full randomization. PR description has the full root-cause walkthrough. |
|
Workflow [PR], commit [ea2cc9c] Summary: ❌
AI ReviewSummaryThis PR fixes row drops when bucketed Final VerdictStatus: ✅ Approve |
@groeneai @alexey-milovidov it seems a more serious issue here. |
| @@ -0,0 +1,70 @@ | |||
| -- Regression test for parallel reads of `Map` primary key with `with_buckets` serialization. | |||
| -- | |||
There was a problem hiding this comment.
The test doesn't cover nested PK types — e.g. ORDER BY (id, m) where m is a Map, or ORDER BY tuple(m, ...). The code logic (recursive isSafePrimaryDataKeyType) does handle these cases, but they aren't explicitly exercised by the test.
It's a general issue with the We can try to rewrite the code of Map comparison so it doesn't depend on keys order, but this comparison will be slower. |
Per PR ClickHouse#104540 review feedback from @tiandiwonder: extend the regression test to also exercise the recursive `isSafePrimaryDataKeyType` check. Two new sections added, both using the same 14-row, 3-part, multi-bucket Map shape as the existing top-level-Map cases: 1. `ORDER BY (id, m)` on `MergeTree` and `ReplacingMergeTree` — composite primary key with `Map` as one column. All rows use the same `id` so parts intersect on `id` and the splitter has to compare `m` values at layer boundaries. Exercises the loop in `isSafePrimaryKey` that iterates over every PK column. 2. `ORDER BY c` with `c Tuple(Map(...), UInt32)` on `MergeTree` and `ReplacingMergeTree` — primary key whose type is itself a `Tuple` containing a `Map`. Exercises the recursive `Tuple` branch of `isSafePrimaryDataKeyType`, which must descend into the tuple's elements and mark the whole tuple as unsafe when any element is `Map`. For both shapes, both the injection (`merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability = 1`) and the `FINAL` (`split_parts_ranges_into_intersecting_and_non_intersecting_final = 0`) paths are covered. Verification matrix on local debug build: Without fix (`case TypeIndex::Map: return false;` removed from `PartsSplitter::isSafePrimaryDataKeyType`): - composite-inj groupArray: 13 (FAIL, one row dropped) - composite-final-split-0 count: 13 (FAIL) - composite-final-split-0 groupArray: 13 (FAIL) - tuple-inj groupArray: 13 (FAIL) - tuple-final-split-0 count: 13 (FAIL) - tuple-final-split-0 groupArray: 13 (FAIL) With fix restored: - all four new sections return 14 across `count` and `length(groupArray(...))`. `tests/clickhouse-test 04217_split_intersecting_bucketed_map_row_drop --test-runs 20` is 20/20 green with the current build.
|
Pushed Two new sections (same 14-row / 3-part / multi-bucket shape as the existing cases):
Verification matrix on local debug build:
@Avogar — thanks for the confirmation that @tiandiwonder — the recursive type-check coverage you asked for is in |
tiandiwonder
left a comment
There was a problem hiding this comment.
changlog is too long, suggest:
Fixed missing rows when reading a MergeTree table whose primary key is Map (or contains Map) and whose parts use bucketed Map serialization (map_serialization_version_for_zero_level_parts = 'with_buckets'). The problem affected parallel reads (visible via merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability) and FINAL queries with split_parts_ranges_into_intersecting_and_non_intersecting_final = 0
| /// so `FilterSortedStreamByRange` drops rows that would otherwise fall inside a layer's | ||
| /// PK range. Falling back to the single in-order merging pipe (the `!isSafePrimaryKey` | ||
| /// branch in `splitPartsWithRangesByPrimaryKey`) bypasses the layer split and the | ||
| /// intersecting/non-intersecting separation, both of which depend on this comparison. |
There was a problem hiding this comment.
Optional: the comment explains the with_buckets reorder very clearly, but doesn't say why we also disable the splitter for parts that use the default basic Map serialization (where index and data agree on insertion order). and also the comment is too long:
```
/// Map is unsafe as a splitter primary key: the PK index uses insertion-order
/// `serializeBinary` while `with_buckets` parts write data in bucket-index order,
/// and positional `ColumnMap::compareAt` then disagrees on equality, dropping rows.
/// Disabled unconditionally because the per-part serialization version is not
/// visible here; Map PKs are rare, so the cost is acceptable
There was a problem hiding this comment.
Pushed a38a518e5062 shrinking the comment to 5 lines: states the with_buckets reorder mechanism and the unconditional-disable rationale (per-part serialization version is not visible here; Map PKs are rare). Good catch on the basic-path coverage gap.
Per @tiandiwonder review on PR ClickHouse#104540: the original 13-line comment was verbose and did not state why we also disable the splitter for parts that use `basic` (default) Map serialization, where index and data agree on insertion order. Shortened to 5 lines covering: the `with_buckets` reorder mechanism, and the unconditional disable rationale (per-part serialization version is not visible here; `Map` PKs are rare).
|
@tiandiwonder good call on both reviews. Two updates pushed at
Verified clean build locally; only the comment text changed. CI should be unaffected. |
|
@groeneai please make change log entry short? |
| /// and positional `ColumnMap::compareAt` then disagrees on equality, dropping rows. | ||
| /// Disabled unconditionally because the per-part serialization version is not | ||
| /// visible here; `Map` PKs are rare, so the cost is acceptable. | ||
| case TypeIndex::Map: |
There was a problem hiding this comment.
empirically confirmed on a 4-part bucketed-Map shape: ORDER BY mapKeys(m) and ORDER BY m.keys both go 20→19 under injection=1. heads-up on the fix — a check on getRequiredColumnsWithTypes for Map won't catch them: optimize_functions_to_subcolumns (default-on at Core/Settings.cpp:4664) rewrites mapKeys(m) to m.keys (FunctionsMapMiscellaneous.cpp:515), and ORDER BY m.keys directly reads the Array(K) sub-column with no Map in the resolved expression. has to look at the original PK AST or sub-column origin.
The previous commit only checked the resolved PK expression type, so PKs that resolve to a non-Map type but read from a Map storage column under the hood still hit the same with_buckets index-vs-data mismatch. Cover the remaining shapes by walking the PK expression's source columns. Concretely: ORDER BY mapKeys(m) resolves to Array(K) and ORDER BY m.keys reads the Map sub-column, which also resolves to Array(K). isSafePrimaryDataKeyType on the resolved type accepts both as safe Arrays, so splitPartsWithRangesByPrimaryKey still ran the layer split and dropped one row from a 4-part bucketed-Map table (20 -> 19) when merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=1. Add a new isSafePrimaryKey overload that takes ColumnsDescription and walks primary_key.expression->getRequiredColumnsWithTypes(): for each source column name, look it up via tryGetColumnOrSubcolumn (which resolves m.keys to a Map sub-column with getTypeInStorage()=Map(K, V)). If any source storage column is or contains a Map, the PK is unsafe and the splitter falls back to the single-pipe path. The existing single-arg isSafePrimaryKey is kept as a conservative fallback for callers without storage-column metadata. Both PartsSplitter callers (splitPartsWithRangesByPrimaryKey, findPKRangesForFinalAfterSkipIndex) and optimizeLazyFinal.cpp now thread metadata_snapshot->getColumns() through. Empirical confirmation by tiandiwonder on PR ClickHouse#104540 review (2026-06-05): ORDER BY mapKeys(m) and ORDER BY m.keys both go 20 -> 19 under injection=1. The naive getRequiredColumnsWithTypes-only check does not work because optimize_functions_to_subcolumns rewrites mapKeys(m) -> m.keys, which is a direct Array(K) sub-column read with no Map in the resolved expression. The sub-column origin lookup against ColumnsDescription handles both shapes. Test 04217 extended with 4-part shapes for ORDER BY mapKeys(m), ORDER BY m.keys, and ReplacingMergeTree FINAL with split_parts_ranges_into_intersecting_and_non_intersecting_final=0. Closes: ClickHouse#104540 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
@tiandiwonder thanks for the empirical confirmation -- reproduced both shapes locally on a 4-part bucketed- You were right that a Pushed fix-up commit Also shortened the changelog entry to a single sentence per your earlier ask. Pre-PR validation gate
Session id: cron:clickhouse-ci-task-worker:20260605-112400 |
LLVM Coverage Report
Changed lines: Changed C/C++ lines covered by tests: 53/58 (91.38%) | Lost baseline coverage: none · Uncovered code |
| @@ -0,0 +1,249 @@ | |||
| -- Regression test for parallel reads of `Map` primary key with `with_buckets` serialization. | |||
There was a problem hiding this comment.
optional: add -- Tags: no-object-storage at the top — this test times out >180s on the asan+s3+keeper sync lane (5/33 reruns).
the with_buckets map serialization writes many small per-bucket files per part, so on object storage each is an S3 round-trip under ASan; it runs in ~0.5s on local storage and the splitter bug is storage-agnostic, so the tag loses no coverage.
There was a problem hiding this comment.
Done in 0743540 @tiandiwonder. Added -- Tags: no-object-storage with your S3-round-trip rationale; the splitter bug is storage-agnostic so local-only coverage is unchanged.
The with_buckets Map serialization writes many small per-bucket files per part, so on object storage every read is an S3 round-trip; under ASan the test times out (>180s, 5/33 reruns on the asan+s3+keeper sync lane). It runs in ~0.5s on local storage, and the PartsSplitter bug under test is storage-agnostic, so restricting to local storage loses no coverage. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
optimizeJoinByShards::apply calls splitIntersectingPartsRangesIntoLayers directly, bypassing the isSafePrimaryKey guard added for the splitPartsWithRangesByPrimaryKey callers. With query_plan_join_shard_by_pk_ranges = 1, a join whose common primary-key prefix is a Map (or Map-derived, e.g. mapKeys(m)/m.keys) on a bucketed-Map table (map_serialization_version_for_zero_level_parts = 'with_buckets') therefore still dropped rows: the PK index is built from the in-memory insertion-ordered Map while the data is read back in bucket order, so the layer border filters (FilterSortedStreamByRange) drop rows whose positional ColumnMap::compareAt disagrees. Guard apply() with the same isSafePrimaryKey predicate; if any source has an unsafe PK the sharding optimization is skipped (the join then reads through the normal path). Non-Map PKs are unaffected. Reproduced on the PR-head binary (which already fixes the non-join path): INNER JOIN on a Map PK returned 19/20 rows, mapKeys(m) PK 109/110, with query_plan_join_shard_by_pk_ranges = 1 and external join disabled; both return the full count after this change. Found by clickhouse-gh[bot] review. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Follow-up commit
Session id: cron:clickhouse-worker-slot-1:20260615-135600 |
|
CI finished on
Neither is caused by this PR. |
@groeneai please fix the flaky tests. |
| for (const auto & source : data.sources) | ||
| { | ||
| const auto & metadata = source->getStorageMetadata(); | ||
| if (!isSafePrimaryKey(metadata->getPrimaryKey(), metadata->getColumns())) |
There was a problem hiding this comment.
optional: this uses isSafePrimaryKey but the file doesn't #include <Processors/QueryPlan/PartsSplitter.h> — it compiles via a transitive include.
adding the direct include guards against the include graph changing under it later.
|
@tiandiwonder thanks. Both flaky classes are being handled in separate PRs (not folded into this engine fix), and one is already done:
Neither is caused by this PR, so it should not block merging the Map-PK splitter fix (which is green on its own changes). I will pursue the FunctionsStress fixes independently. |
|
Let's close it in favour of #109178? |
|
Agreed, closing this in favor of #109178. #109178 fixes the root cause: the index stores One thing worth carrying over: #109178 doesn't seem to exercise the parallel split path that dropped rows here. Closing now. |

Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Fixed missing rows when reading a
MergeTreetable whose primary key is aMap, contains aMap, or is derived from aMap(mapKeys(m),m.keys, etc.) and whose parts use bucketedMapserialization.Documentation entry for user-facing changes
Context
This bug was discovered while investigating flaky
01550_create_map_type(#104515 was an earlier setting-pin attempt that was rejected by @Algunenano and @alexey-milovidov in favor of a real engine fix).Minimal repro
Root cause
For a 2-row part of
Map(String, Array(UInt8))with bucketed serialization (11 buckets), one row has the keys{k1, k2}:SerializationMap::serializeBinary-> basic nestedArray(Tuple(K, V))) writes the row as[("k1", [1,2,3]), ("k2", [4,5,6])]-- insertion order.SerializationMap::serializeBinaryBulkWithMultipleStreams) splits keys into per-bucket streams; on read,collectMapFromBucketsreassembles them in ascending bucket-index order. Withhash("k2") % 11 < hash("k1") % 11, the reassembled row is[("k2", [4,5,6]), ("k1", [1,2,3])]-- bucket order.ColumnMap::compareAtis positional on the nestedArray(Tuple(K, V)), so the two representations compare as different values.splitPartsWithRangesByPrimaryKeycomputes layer/border PK values from the index but the downstreamFilterSortedStreamByRangeevaluates the filter against actual data rows, so the row that should fall inside a layer's PK range is dropped.A simple bonus repro of the same underlying behavior (separate from the splitter, but the same root cause):
Fix
Two layers in
PartsSplitter::isSafePrimaryDataKeyType/isSafePrimaryKey:Mapas not a safe primary-key type. This also coversTuple(Map, ...)and similar nested cases via the existing recursive walk.Map(or contains aMap). This covers PKs likeORDER BY mapKeys(m)andORDER BY m.keys, where the resolved expression type isArray(K)-- safe by itself -- but the underlying values still come from awith_bucketsMapand so suffer the same index-vs-data mismatch. (Layer 2 was added in response to a regression confirmed by @tiandiwonder on a 4-part bucketed-Maptable:ORDER BY mapKeys(m)andORDER BY m.keysboth went 20 -> 19 under injection=1.)splitPartsWithRangesByPrimaryKeyalready has a fast path for unsafe primary keys that builds a single in-order merging pipe from all parts without doing the layer split or the intersecting / non-intersecting separation, both of which depend on the index-vs-data comparison being consistent.This is a targeted fix: only
Map-keyed andMap-derived reads lose the splitter optimization, and only when at least one source storage column is aMap(which is uncommon). Non-MapPKs andMapcolumns used outside the primary key are unaffected.What I did NOT fix here
The deeper issue -- that bucketed
Mapserialization is non-idempotent under positionalColumnMap::compareAtsemantics, which also breaks e.g.WHERE a = map('k1', 1, 'k2', 2, 'k3', 3)on a bucketed table -- is left for a separate change. It needs maintainer input on whether the right answer is canonical Map storage on write, an order-insensitiveMapcomparison, or extending the bucketed format to preserve original per-row key positions. This PR only fixes the splitter-level symptom; the broaderMapsemantics question is unchanged.Verification
04217_split_intersecting_bucketed_map_row_droppass with full randomization on debug build (excluding environment-only failures from missingparallel_replicascluster and timezone DB).01550_create_map_typepass with full randomization.git stash/ restore: without this commit the regression test fails withgroupArray=13/count=13on the originalMapPK shapes, and withgroupArray=19/count=20on the newmapKeys(m)andm.keys4-part shapes; with this commit all rows are returned.MapPK split path still exercised:merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=1on a 200-row table withid UInt32PK still returns 200 rows.Mapas a non-PK column still works (read returns full row count).