`WindowTransform`: stream `lagInFrame` via `StreamingLagTransform` to… by DavidHe-2008 · Pull Request #105822 · ClickHouse/ClickHouse · GitHub
Skip to content

WindowTransform: stream lagInFrame via StreamingLagTransform to…#105822

Open
DavidHe-2008 wants to merge 16 commits into
ClickHouse:masterfrom
DavidHe-2008:fix-window-lag-streaming
Open

WindowTransform: stream lagInFrame via StreamingLagTransform to…#105822
DavidHe-2008 wants to merge 16 commits into
ClickHouse:masterfrom
DavidHe-2008:fix-window-lag-streaming

Conversation

@DavidHe-2008

@DavidHe-2008 DavidHe-2008 commented May 26, 2026

Copy link
Copy Markdown

This is the first time I'm creating a PR in ClickHouse respository, there might be things not correct.  Please forgive and I'll correct the issues. It involves several classes.

Motivation

This is for issue: #105466
Queries of the form:

SELECT sum(prev_count) FROM (SELECT lagInFrame(Count) OVER (PARTITION BY MetricName, Attributes, ResourceAttributes ORDER BY TimeUnix) AS prev_count

On a MergeTree table ordered by (MetricName, TimeUnix) previously required a FinishSortingTransform that buffered all rows within each MetricName group before sort-completing them by Attributes and ResourceAttributes. For wide partitions this means O(rows per prefix group) peak memory and an O(N log N) sort pass — even though lagInFrame with offset 1 only needs the single previous row per partition key.

Approach
Introduces StreamingLagTransform, a new ISimpleTransform that replaces the FinishSortingTransform + WindowTransform pair when:

All window functions are lagInFrame with offset 1 (one-, two-, or three-argument form; offset argument must be a constant equal to 1).
The SortingStep is in FinishSorting mode (storage ordering already covers the partition prefix).
The prefix description is a strict prefix of the window PARTITION BY.
The transform keeps a small hash map — one Field per distinct suffix-partition key, keyed by SipHash128 — and emits the stored previous value on each row. When the prefix key changes (e.g. MetricName changes), the map is cleared.

A new SortingStep::Type::MergeOnly replaces FinishSorting in the activated path: it merges K sorted input streams into one (by the full storage ORDER BY) without finish-sorting, so StreamingLagTransform receives rows in the correct per-partition window order.

The optimization is gated by query_plan_reuse_storage_ordering_for_window_functions (default off) and wired in as a bottom-up pass in optimizeTree after the existing read-in-order pass.

Changelog category (leave one):

  • Performance Improvement

Changelog entry:

Added a streaming execution path for lagInFrame(col, 1) window functions when data is already read in storage ORDER BY order. Instead of buffering an entire prefix group in FinishSortingTransform + WindowTransform, a new StreamingLagTransform processes rows in a single pass with a small per-partition hash map, reducing peak memory from O(rows per prefix group) to O(distinct partitions per prefix group). Enabled via query_plan_reuse_storage_ordering_for_window_functions = 1.

Performance
Before (master):

SELECT sum(prev_count)
FROM (
SELECT lagInFrame(Count) OVER (PARTITION BY MetricName, Attributes, ResourceAttributes ORDER BY TimeUnix) AS prev_count
FROM ch_metrics_histogram_table
)

1 row in set. Elapsed: 1.939 sec. Processed 3.00 million rows, 873.63 MB (1.55 million rows/s., 450.56 MB/s.)
Peak memory usage: 2.57 GiB.

After (this PR):

SELECT sum(prev_count)
FROM (
SELECT lagInFrame(Count) OVER (PARTITION BY MetricName, Attributes, ResourceAttributes ORDER BY TimeUnix) AS prev_count
FROM ch_metrics_histogram_table
)
SETTINGS query_plan_reuse_storage_ordering_for_window_functions = 1, max_threads = 4

1 row in set. Elapsed: 4.317 sec. Processed 3.00 million rows, 873.63 MB (694.99 thousand rows/s., 202.39 MB/s.)
Peak memory usage: 154.85 MiB.
Peak memory: 2.57 GiB → 154.85 MiB (17× reduction). The streaming path trades read parallelism for memory — reads are serialized per prefix group so rows arrive pre-sorted, which is the correct trade-off for workloads hitting OOM.

@CLAassistant

CLAassistant commented May 26, 2026

Copy link
Copy Markdown

@PedroTadim PedroTadim added the can be tested Allows running workflows for external contributors label May 26, 2026
@clickhouse-gh

clickhouse-gh Bot commented May 26, 2026

Copy link
Copy Markdown
Contributor

Workflow [PR], commit [a065f28]

Summary:

job_name test_name status info comment
AST fuzzer (arm_asan_ubsan) FAIL
Logical error: Block structure mismatch in A stream: different columns: (STID: 0993-27f0) FAIL cidb, issue

AI Review

Summary

This PR adds a streaming lagInFrame(..., 1) execution path that replaces FinishSortingTransform + WindowTransform when storage ordering can be reused. The recent float guard fixes the plain Float64 / Nullable(Float64) partition-key case, but the guard is still incomplete for runtime-typed partition keys, so the optimized path can still return different results from WindowTransform.

Missing context / blind spots
  • ⚠️ I could not fetch the current Praktika CI JSON in this environment because .claude/tools/fetch_ci_report.js hit a GitHub token policy error, and .claude/tools/fetch_perf_report.py requires a local clickhouse binary in PATH.
Findings

❌ Blockers

  • [src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp:1997] contains_float only walks declared child types, so it misses runtime-typed containers such as Dynamic and the dynamic/shared part of JSON / Object. A query like lagInFrame(v) OVER (PARTITION BY d ORDER BY ts) on d Dynamic with +0.0 and -0.0 values still enables StreamingLagTransform. ColumnDynamic::compareAt compares the nested Float64 values and treats those keys as equal, while ColumnDynamic::updateHashWithValue hashes the variant name plus the raw float bits, so one logical partition is split into multiple state_map_ entries and lagInFrame results diverge from WindowTransform.
    Suggested fix: conservatively reject partition keys whose type hasDynamicStructure() (which also covers wrappers such as Array(Dynamic), Map(String, Dynamic), Tuple(Dynamic, ...), Nullable(Dynamic), LowCardinality(Dynamic), and JSON / Object with dynamic paths), or canonicalize float-bearing dynamic/object values before hashing.
Tests
  • ⚠️ Add a stateless regression that uses allow_suspicious_types_in_group_by = 1 and a Dynamic (or JSON) suffix partition key containing +0.0 / -0.0, and assert that EXPLAIN pipeline does not activate StreamingLag and that results match the non-optimized path.
Final Verdict
  • Status: ⚠️ Request changes
  • Minimum required actions: extend the float guard to cover runtime-typed partition keys (Dynamic / dynamic JSON / Object, including when wrapped inside other container types), and add a focused regression test for that path.

@PedroTadim

Copy link
Copy Markdown
Member

I am not the best reviewer. Let's wait for the CI result

@PedroTadim

Copy link
Copy Markdown
Member

Please update the PR description with the changelog entry type

@DavidHe-2008

Copy link
Copy Markdown
Author

Please update the PR description with the changelog entry type

Done.

@PedroTadim

Copy link
Copy Markdown
Member

Merge with master to trigger another CI run please

@clickhouse-gh clickhouse-gh Bot added the pr-performance Pull request with some performance improvements label May 26, 2026
@DavidHe-2008

DavidHe-2008 commented May 26, 2026 via email

Copy link
Copy Markdown
Author

Comment thread src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp
Comment thread src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp Outdated
Comment thread src/Processors/Transforms/StreamingLagTransform.cpp Outdated
Comment thread src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp Outdated
Comment thread src/Processors/Transforms/StreamingLagTransform.cpp
Comment thread src/Processors/QueryPlan/SortingStep.h
Comment thread src/Processors/Transforms/StreamingLagTransform.cpp Outdated
Comment thread src/Processors/QueryPlan/WindowStep.cpp
Comment thread src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp Outdated
Comment thread src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp
@DavidHe-2008

Copy link
Copy Markdown
Author

@PedroTadim Hi Pedro

There is one issue left: about the hash generation with updateHashWithValue , which is a method of IColumn, I created a feature for it:
#105941

This PR is to cut the memory for lagInFrame. I don't want to get other things here and make it too big.

Please check.

@PedroTadim

Copy link
Copy Markdown
Member

@PedroTadim Hi Pedro

There is one issue left: about the hash generation with updateHashWithValue , which is a method of IColumn, I created a feature for it: #105941

This PR is to cut the memory for lagInFrame. I don't want to get other things here and make it too big.

Please check.

Seems good to me so far. Maybe a performance test could be added? Also, @CheSema could give a quick review?

@DavidHe-2008

Copy link
Copy Markdown
Author

@PedroTadim Hi Pedro
There is one issue left: about the hash generation with updateHashWithValue , which is a method of IColumn, I created a feature for it: #105941
This PR is to cut the memory for lagInFrame. I don't want to get other things here and make it too big.
Please check.

Seems good to me so far. Maybe a performance test could be added? Also, @CheSema could give a quick review?

@PedroTadim Hi Pedro

A performance test is added. tests/performance/lag_in_frame_streaming.xml.

@DavidHe-2008

Copy link
Copy Markdown
Author

This MSan failure is unrelated to this PR. The stack trace goes through ColumnNullable::insert → DataTypeNullable::createColumnConst → IExecutableFunction::defaultImplementationForNulls → Aggregator → AggregatingTransform. None of these files are touched by this PR. The issue is a pre-existing uninitialized-value in the nullable column serialization path triggered by the AST fuzzer.

@DavidHe-2008

Copy link
Copy Markdown
Author

@PedroTadim Hi Pedro

Do we follow the priorities of the issues to start working on an new issue?

Thank you.
David.

@PedroTadim

Copy link
Copy Markdown
Member

This MSan failure is unrelated to this PR. The stack trace goes through ColumnNullable::insert → DataTypeNullable::createColumnConst → IExecutableFunction::defaultImplementationForNulls → Aggregator → AggregatingTransform. None of these files are touched by this PR. The issue is a pre-existing uninitialized-value in the nullable column serialization path triggered by the AST fuzzer.

This is worth reporting an issue.

@PedroTadim

Copy link
Copy Markdown
Member

@PedroTadim Hi Pedro

Do we follow the priorities of the issues to start working on an new issue?

Thank you. David.

It would be better to get this PR merged before continuing. Let's wait for the review.

@DavidHe-2008

Copy link
Copy Markdown
Author

This MSan failure is unrelated to this PR. The stack trace goes through ColumnNullable::insert → DataTypeNullable::createColumnConst → IExecutableFunction::defaultImplementationForNulls → Aggregator → AggregatingTransform. None of these files are touched by this PR. The issue is a pre-existing uninitialized-value in the nullable column serialization path triggered by the AST fuzzer.

This is worth reporting an issue.

Ok, I'll report it.

@PedroTadim

Copy link
Copy Markdown
Member

This MSan failure is unrelated to this PR. The stack trace goes through ColumnNullable::insert → DataTypeNullable::createColumnConst → IExecutableFunction::defaultImplementationForNulls → Aggregator → AggregatingTransform. None of these files are touched by this PR. The issue is a pre-existing uninitialized-value in the nullable column serialization path triggered by the AST fuzzer.

This is worth reporting an issue.

Ok, I'll report it.

I can do it

@DavidHe-2008

Copy link
Copy Markdown
Author

@PedroTadim Hi Pedro

Do you have a preference for what issue I should work on next?

@alexey-milovidov alexey-milovidov added the memory When memory usage is higher than expected label Jun 1, 2026
@alexey-milovidov

Copy link
Copy Markdown
Member

Please edit the git history so that all commits are properly attributed to you. Currently there are a few commits from "root".

@DavidHe-2008 DavidHe-2008 force-pushed the fix-window-lag-streaming branch 2 times, most recently from 929a145 to d4cd82c Compare June 1, 2026 11:26
@DavidHe-2008

DavidHe-2008 commented Jun 1, 2026

Copy link
Copy Markdown
Author

Please edit the git history so that all commits are properly attributed to you. Currently there are a few commits from "root".
@alexey-milovidov

Hi Alexey

The 2 commits from "root" are corrected.

… cut peak memory

Introduces `StreamingLagTransform`, a single-pass streaming replacement for the
`FinishSortingTransform + WindowTransform` pipeline when all window functions are
`lagInFrame` with offset 1 and the table is read in storage order.

Instead of buffering and finish-sorting every row within a prefix-key group,
the transform keeps a small hash map — one entry per distinct suffix-partition key
— and emits the previous value on each row in O(1).  Peak memory is
O(distinct partitions within one prefix group) rather than O(all rows).

The optimization is gated by `query_plan_reuse_storage_ordering_for_window_functions`
(default off) and activates when:
  1. All window functions are `lagInFrame` with offset 1 (one-, two-, or
     three-argument forms; the offset argument must be a constant equal to 1).
  2. The `SortingStep` is in `FinishSorting` mode (storage ordering already
     provides the partition prefix).
  3. The prefix description is a strict prefix of the window `PARTITION BY`.

When activated, `SortingStep` is converted to `MergeOnly` (merge K streams into
one without finish-sorting) and `WindowStep` creates `StreamingLagTransform`
instead of `WindowTransform`.
`lagInFrame(col, 1)` returns the immediately preceding row only when that
row lies within the frame.  The streaming path assumed it always does, which
is wrong for frames like `ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING`
where the preceding row is outside the frame and the correct result is the
configured default for every row.

Add a frame check: only activate the optimization when the frame start is
`UNBOUNDED PRECEDING` (guaranteeing `frame_start == partition_start`) and
the frame end is not a fixed `N PRECEDING` offset (which would place
`frame_end` before the preceding row).  This covers the common default frame
(`RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`) and explicit
`ROWS BETWEEN UNBOUNDED PRECEDING AND ...` variants, and falls back to
`WindowTransform` for all other frames.
…rage and column correspondence

Previously, `merge_prefix_size` was computed with `std::min(prefix + ORDER_BY, full_key_size)`,
silently truncating required `window ORDER BY` columns when the storage key was shorter.
This could produce a `MergeOnly` sort description that did not cover all ORDER BY columns,
corrupting per-partition ordering seen by `StreamingLagTransform`.

Additionally, the window `ORDER BY` columns were never verified against the actual storage
key columns at the corresponding positions — only sizes were checked. A query with
`ORDER BY (MetricName, TimeUnix)` and `window ORDER BY NotTimeUnix` would pass the old size
check while mapping to the wrong storage key column.

The fix:
1. Reject the optimization if `prefix_description.size() + window_order_by.size() > full_key_size`.
2. For each window ORDER BY column, strip the plan-time table qualifier (e.g. `__table1.TimeUnix`
   → `TimeUnix`) and verify it matches the storage key column at that position.
…suffix partition hashing

The previous `MapColView` / `hashMapRow` fast path fed raw byte slices into `SipHash`
without including the array element count or per-string length prefixes that
`ColumnArray::updateHashWithValue` and `ColumnString::updateHashWithValue` always emit
via `hash.update(size)` before each payload.  Without those length fields, distinct map
values with the same total byte content (possible with binary strings containing `\0`)
could hash to the same `suffix_key`, causing unrelated partitions to share one
`state_map_` entry and produce incorrect `lagInFrame` results.

Drop the `MapColView` struct, `tryGetMapView`, and `hashMapRow` entirely.  All suffix
partition columns — including `Map` columns — now use the generic
`IColumn::updateHashWithValue` path, which is injective by construction.
…ering, and collation for window ORDER BY columns

Previously only column names were compared against the storage key.  Three
additional attributes must match for the `MergeOnly` optimisation to be safe:

1. **Direction** — the physical direction the storage delivers for position j is
   `input_order->direction * (reverse_flags[j] ? -1 : 1)`.  If the window ORDER BY
   specifies a different direction the merged streams are in the wrong order and
   `StreamingLagTransform` produces incorrect results.

2. **Nulls ordering** — MergeTree always stores nullable/float columns with
   `nulls_direction = +1` (NULLS LAST for ASC, NULLS FIRST for DESC).  A window
   column with `nulls_direction == -1` is incompatible and must fall back to
   `WindowTransform`.

3. **Collation** — storage keys cannot carry collations, so any window ORDER BY
   column with a `collator` set is incompatible.

Tests added: direction mismatch (ASC storage + DESC window → no streaming), and
direction match (DESC storage + DESC window → streaming, with correctness check).
@DavidHe-2008 DavidHe-2008 force-pushed the fix-window-lag-streaming branch from d4cd82c to 7bfdba4 Compare June 1, 2026 11:33
@DavidHe-2008

Copy link
Copy Markdown
Author

@alexey-milovidov Hi Alexey, when do you think this PR can be merged or need more fixes?
There is one issue left: about the hash generation with updateHashWithValue , which is a method of IColumn, I created a feature for it: #105941

This PR is to cut the memory for lagInFrame. I don't want to get other things here and make it too big.

@DavidHe-2008

Copy link
Copy Markdown
Author

@PedroTadim Hi Pedro Can I start looking at another issue ?

@PedroTadim

Copy link
Copy Markdown
Member

@PedroTadim Hi Pedro Can I start looking at another issue ?

It would be nicer to get this one finished first, but I can't find any reviewer :/ Everyone is too busy. I am not the best codebase expert to give a review.

alexey-milovidov and others added 2 commits July 3, 2026 04:14
`StreamingLagTransform` delimits partitions by a `SipHash` of the raw
bytes of the partition-key columns (`IColumn::updateHashWithValue`),
whereas `WindowTransform` delimits them with `compareAt`. The two
disagree for floating-point values: `compareAt` treats `-0.0` and `+0.0`
as equal and treats all `NaN` values as equal, while the raw-byte hash
does not. A float-bearing partition key would therefore be split into
several `state_map_` entries and produce `lagInFrame` results different
from the non-optimized `WindowTransform` path — even though the
optimization is gated behind
`query_plan_reuse_storage_ordering_for_window_functions`.

Gate `optimizeStreamingWindowFunctions` so the optimization is skipped
whenever any partition-key column (prefix or suffix) is, or recursively
contains, a floating-point type; such queries fall back to
`WindowTransform` and stay correct. Detection uses
`IDataType::forEachChild`, so `Nullable(Float64)`, `Array(Float64)`,
`Tuple(..., Float64)`, `Map(..., Float64)`, `LowCardinality(Float64)` and
arbitrary nestings are covered. Lifting this restriction requires the
value canonicalization tracked in
ClickHouse#105941.

Also drop a duplicate `#include <Columns/ColumnConst.h>` introduced by the
merge with master, and extend `03891_lag_in_frame_streaming` with
`Float64` and `Nullable(Float64)` partition-key cases asserting the
optimization does not activate and that results match the unoptimized
path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
for (const auto & desc : prefix_description)
{
const auto * entry = window_input.findByName(desc.column_name);
if (!entry || contains_float(*entry->type))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contains_float only walks declared child types via forEachChild, so it misses runtime-typed partition keys. Dynamic is the clearest case here: PARTITION BY d is legal when allow_suspicious_types_in_group_by = 1 (see 04259_window_partition_by_dynamic_variant_validation.sql), but contains_float(*entry->type) still returns false for d Dynamic, even when the current rows stored in d are +0.0, -0.0, or different NaN payloads.

At that point the July 3 fix is bypassed. ColumnDynamic::compareAt compares the nested Float64 values and treats those partition keys as equal, while ColumnDynamic::updateHashWithValue hashes the variant name plus the raw float bits, so StreamingLagTransform splits one logical partition into multiple state_map_ entries and returns different lagInFrame results than WindowTransform. The same hole exists for types with runtime dynamic structure wrapped inside other containers (Array(Dynamic), Map(String, Dynamic), the dynamic/shared part of JSON/Object, etc.).

Please either conservatively reject partition keys whose type hasDynamicStructure() here, or canonicalize float-bearing dynamic/object values before hashing.

@clickhouse-gh

clickhouse-gh Bot commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

LLVM Coverage Report

Metric Baseline Current Δ
Lines 85.50% 85.50% +0.00%
Functions 92.70% 92.60% -0.10%
Branches 77.70% 77.70% +0.00%

Changed lines: Changed C/C++ lines covered: 175/239 (73.22%) · Uncovered code

Full report · Diff report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors memory When memory usage is higher than expected performance pr-performance Pull request with some performance improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants