WindowTransform: stream lagInFrame via StreamingLagTransform to…#105822
WindowTransform: stream lagInFrame via StreamingLagTransform to…#105822DavidHe-2008 wants to merge 16 commits into
WindowTransform: stream lagInFrame via StreamingLagTransform to…#105822Conversation
|
Workflow [PR], commit [a065f28] Summary: ❌
AI ReviewSummaryThis PR adds a streaming Missing context / blind spots
Findings❌ Blockers
Tests
Final Verdict
|
|
I am not the best reviewer. Let's wait for the CI result |
|
Please update the PR description with the changelog entry type |
Done. |
|
Merge with master to trigger another CI run please |
|
Hi Pedro
I did a commit and it should trigger another CI run.
Thank
David.
…On Tue, May 26, 2026 at 1:03 PM Pedro Ferreira ***@***.***> wrote:
*PedroTadim* left a comment (ClickHouse/ClickHouse#105822)
<#105822 (comment)>
Merge with master to trigger another CI run please
—
Reply to this email directly, view it on GitHub
<#105822?email_source=notifications&email_token=AL7BCA73U3OIWD3PMADXNBT44XE7FA5CNFSNUABFM5UWIORPF5TWS5BNNB2WEL2JONZXKZKDN5WW2ZLOOQXTINJUGY2TQOJZGYZ2M4TFMFZW63VGMF2XI2DPOKSWK5TFNZ2KYZTPN52GK4S7MNWGSY3L#issuecomment-4546589963>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AL7BCAYXUAPJPMMVPHCJKJL44XE7FAVCNFSM6AAAAACZNDJDM2VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHM2DKNBWGU4DSOJWGM>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
|
@PedroTadim Hi Pedro There is one issue left: about the hash generation with updateHashWithValue , which is a method of IColumn, I created a feature for it: This PR is to cut the memory for lagInFrame. I don't want to get other things here and make it too big. Please check. |
Seems good to me so far. Maybe a performance test could be added? Also, @CheSema could give a quick review? |
@PedroTadim Hi Pedro A performance test is added. tests/performance/lag_in_frame_streaming.xml. |
|
This MSan failure is unrelated to this PR. The stack trace goes through ColumnNullable::insert → DataTypeNullable::createColumnConst → IExecutableFunction::defaultImplementationForNulls → Aggregator → AggregatingTransform. None of these files are touched by this PR. The issue is a pre-existing uninitialized-value in the nullable column serialization path triggered by the AST fuzzer. |
|
@PedroTadim Hi Pedro Do we follow the priorities of the issues to start working on an new issue? Thank you. |
This is worth reporting an issue. |
It would be better to get this PR merged before continuing. Let's wait for the review. |
Ok, I'll report it. |
I can do it |
|
@PedroTadim Hi Pedro Do you have a preference for what issue I should work on next? |
|
Please edit the git history so that all commits are properly attributed to you. Currently there are a few commits from "root". |
929a145 to
d4cd82c
Compare
Hi Alexey The 2 commits from "root" are corrected. |
… cut peak memory
Introduces `StreamingLagTransform`, a single-pass streaming replacement for the
`FinishSortingTransform + WindowTransform` pipeline when all window functions are
`lagInFrame` with offset 1 and the table is read in storage order.
Instead of buffering and finish-sorting every row within a prefix-key group,
the transform keeps a small hash map — one entry per distinct suffix-partition key
— and emits the previous value on each row in O(1). Peak memory is
O(distinct partitions within one prefix group) rather than O(all rows).
The optimization is gated by `query_plan_reuse_storage_ordering_for_window_functions`
(default off) and activates when:
1. All window functions are `lagInFrame` with offset 1 (one-, two-, or
three-argument forms; the offset argument must be a constant equal to 1).
2. The `SortingStep` is in `FinishSorting` mode (storage ordering already
provides the partition prefix).
3. The prefix description is a strict prefix of the window `PARTITION BY`.
When activated, `SortingStep` is converted to `MergeOnly` (merge K streams into
one without finish-sorting) and `WindowStep` creates `StreamingLagTransform`
instead of `WindowTransform`.
`lagInFrame(col, 1)` returns the immediately preceding row only when that row lies within the frame. The streaming path assumed it always does, which is wrong for frames like `ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING` where the preceding row is outside the frame and the correct result is the configured default for every row. Add a frame check: only activate the optimization when the frame start is `UNBOUNDED PRECEDING` (guaranteeing `frame_start == partition_start`) and the frame end is not a fixed `N PRECEDING` offset (which would place `frame_end` before the preceding row). This covers the common default frame (`RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`) and explicit `ROWS BETWEEN UNBOUNDED PRECEDING AND ...` variants, and falls back to `WindowTransform` for all other frames.
…rage and column correspondence Previously, `merge_prefix_size` was computed with `std::min(prefix + ORDER_BY, full_key_size)`, silently truncating required `window ORDER BY` columns when the storage key was shorter. This could produce a `MergeOnly` sort description that did not cover all ORDER BY columns, corrupting per-partition ordering seen by `StreamingLagTransform`. Additionally, the window `ORDER BY` columns were never verified against the actual storage key columns at the corresponding positions — only sizes were checked. A query with `ORDER BY (MetricName, TimeUnix)` and `window ORDER BY NotTimeUnix` would pass the old size check while mapping to the wrong storage key column. The fix: 1. Reject the optimization if `prefix_description.size() + window_order_by.size() > full_key_size`. 2. For each window ORDER BY column, strip the plan-time table qualifier (e.g. `__table1.TimeUnix` → `TimeUnix`) and verify it matches the storage key column at that position.
…suffix partition hashing The previous `MapColView` / `hashMapRow` fast path fed raw byte slices into `SipHash` without including the array element count or per-string length prefixes that `ColumnArray::updateHashWithValue` and `ColumnString::updateHashWithValue` always emit via `hash.update(size)` before each payload. Without those length fields, distinct map values with the same total byte content (possible with binary strings containing `\0`) could hash to the same `suffix_key`, causing unrelated partitions to share one `state_map_` entry and produce incorrect `lagInFrame` results. Drop the `MapColView` struct, `tryGetMapView`, and `hashMapRow` entirely. All suffix partition columns — including `Map` columns — now use the generic `IColumn::updateHashWithValue` path, which is injective by construction.
…ering, and collation for window ORDER BY columns Previously only column names were compared against the storage key. Three additional attributes must match for the `MergeOnly` optimisation to be safe: 1. **Direction** — the physical direction the storage delivers for position j is `input_order->direction * (reverse_flags[j] ? -1 : 1)`. If the window ORDER BY specifies a different direction the merged streams are in the wrong order and `StreamingLagTransform` produces incorrect results. 2. **Nulls ordering** — MergeTree always stores nullable/float columns with `nulls_direction = +1` (NULLS LAST for ASC, NULLS FIRST for DESC). A window column with `nulls_direction == -1` is incompatible and must fall back to `WindowTransform`. 3. **Collation** — storage keys cannot carry collations, so any window ORDER BY column with a `collator` set is incompatible. Tests added: direction mismatch (ASC storage + DESC window → no streaming), and direction match (DESC storage + DESC window → streaming, with correctness check).
… NaN), add float test
…sort_description`
…`streams_fan_out` is true
…ier, not last dot segment
…olumn type before storing
…vertFieldToType` for default value cast
d4cd82c to
7bfdba4
Compare
|
@alexey-milovidov Hi Alexey, when do you think this PR can be merged or need more fixes? This PR is to cut the memory for lagInFrame. I don't want to get other things here and make it too big. |
|
@PedroTadim Hi Pedro Can I start looking at another issue ? |
It would be nicer to get this one finished first, but I can't find any reviewer :/ Everyone is too busy. I am not the best codebase expert to give a review. |
`StreamingLagTransform` delimits partitions by a `SipHash` of the raw bytes of the partition-key columns (`IColumn::updateHashWithValue`), whereas `WindowTransform` delimits them with `compareAt`. The two disagree for floating-point values: `compareAt` treats `-0.0` and `+0.0` as equal and treats all `NaN` values as equal, while the raw-byte hash does not. A float-bearing partition key would therefore be split into several `state_map_` entries and produce `lagInFrame` results different from the non-optimized `WindowTransform` path — even though the optimization is gated behind `query_plan_reuse_storage_ordering_for_window_functions`. Gate `optimizeStreamingWindowFunctions` so the optimization is skipped whenever any partition-key column (prefix or suffix) is, or recursively contains, a floating-point type; such queries fall back to `WindowTransform` and stay correct. Detection uses `IDataType::forEachChild`, so `Nullable(Float64)`, `Array(Float64)`, `Tuple(..., Float64)`, `Map(..., Float64)`, `LowCardinality(Float64)` and arbitrary nestings are covered. Lifting this restriction requires the value canonicalization tracked in ClickHouse#105941. Also drop a duplicate `#include <Columns/ColumnConst.h>` introduced by the merge with master, and extend `03891_lag_in_frame_streaming` with `Float64` and `Nullable(Float64)` partition-key cases asserting the optimization does not activate and that results match the unoptimized path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| for (const auto & desc : prefix_description) | ||
| { | ||
| const auto * entry = window_input.findByName(desc.column_name); | ||
| if (!entry || contains_float(*entry->type)) |
There was a problem hiding this comment.
contains_float only walks declared child types via forEachChild, so it misses runtime-typed partition keys. Dynamic is the clearest case here: PARTITION BY d is legal when allow_suspicious_types_in_group_by = 1 (see 04259_window_partition_by_dynamic_variant_validation.sql), but contains_float(*entry->type) still returns false for d Dynamic, even when the current rows stored in d are +0.0, -0.0, or different NaN payloads.
At that point the July 3 fix is bypassed. ColumnDynamic::compareAt compares the nested Float64 values and treats those partition keys as equal, while ColumnDynamic::updateHashWithValue hashes the variant name plus the raw float bits, so StreamingLagTransform splits one logical partition into multiple state_map_ entries and returns different lagInFrame results than WindowTransform. The same hole exists for types with runtime dynamic structure wrapped inside other containers (Array(Dynamic), Map(String, Dynamic), the dynamic/shared part of JSON/Object, etc.).
Please either conservatively reject partition keys whose type hasDynamicStructure() here, or canonicalize float-bearing dynamic/object values before hashing.
LLVM Coverage ReportChanged lines: Changed C/C++ lines covered: 175/239 (73.22%) · Uncovered code |

This is the first time I'm creating a PR in ClickHouse respository, there might be things not correct. Please forgive and I'll correct the issues. It involves several classes.
Motivation
This is for issue: #105466
Queries of the form:
SELECT sum(prev_count) FROM (SELECT lagInFrame(Count) OVER (PARTITION BY MetricName, Attributes, ResourceAttributes ORDER BY TimeUnix) AS prev_count
On a MergeTree table ordered by (MetricName, TimeUnix) previously required a FinishSortingTransform that buffered all rows within each MetricName group before sort-completing them by Attributes and ResourceAttributes. For wide partitions this means O(rows per prefix group) peak memory and an O(N log N) sort pass — even though lagInFrame with offset 1 only needs the single previous row per partition key.
Approach
Introduces StreamingLagTransform, a new ISimpleTransform that replaces the FinishSortingTransform + WindowTransform pair when:
All window functions are lagInFrame with offset 1 (one-, two-, or three-argument form; offset argument must be a constant equal to 1).
The SortingStep is in FinishSorting mode (storage ordering already covers the partition prefix).
The prefix description is a strict prefix of the window PARTITION BY.
The transform keeps a small hash map — one Field per distinct suffix-partition key, keyed by SipHash128 — and emits the stored previous value on each row. When the prefix key changes (e.g. MetricName changes), the map is cleared.
A new SortingStep::Type::MergeOnly replaces FinishSorting in the activated path: it merges K sorted input streams into one (by the full storage ORDER BY) without finish-sorting, so StreamingLagTransform receives rows in the correct per-partition window order.
The optimization is gated by query_plan_reuse_storage_ordering_for_window_functions (default off) and wired in as a bottom-up pass in optimizeTree after the existing read-in-order pass.
Changelog category (leave one):
Changelog entry:
Added a streaming execution path for
lagInFrame(col, 1)window functions when data is already read in storageORDER BYorder. Instead of buffering an entire prefix group inFinishSortingTransform+WindowTransform, a newStreamingLagTransformprocesses rows in a single pass with a small per-partition hash map, reducing peak memory from O(rows per prefix group) to O(distinct partitions per prefix group). Enabled viaquery_plan_reuse_storage_ordering_for_window_functions = 1.Performance
Before (master):
SELECT sum(prev_count)
FROM (
SELECT lagInFrame(Count) OVER (PARTITION BY MetricName, Attributes, ResourceAttributes ORDER BY TimeUnix) AS prev_count
FROM ch_metrics_histogram_table
)
1 row in set. Elapsed: 1.939 sec. Processed 3.00 million rows, 873.63 MB (1.55 million rows/s., 450.56 MB/s.)
Peak memory usage: 2.57 GiB.
After (this PR):
SELECT sum(prev_count)
FROM (
SELECT lagInFrame(Count) OVER (PARTITION BY MetricName, Attributes, ResourceAttributes ORDER BY TimeUnix) AS prev_count
FROM ch_metrics_histogram_table
)
SETTINGS query_plan_reuse_storage_ordering_for_window_functions = 1, max_threads = 4
1 row in set. Elapsed: 4.317 sec. Processed 3.00 million rows, 873.63 MB (694.99 thousand rows/s., 202.39 MB/s.)
Peak memory usage: 154.85 MiB.
Peak memory: 2.57 GiB → 154.85 MiB (17× reduction). The streaming path trades read parallelism for memory — reads are serialized per prefix group so rows arrive pre-sorted, which is the correct trade-off for workloads hitting OOM.