Auto spilling join#97813
Conversation
| throw Exception(ErrorCodes::LOGICAL_ERROR, "QueryPipeline is already completed"); | ||
| } | ||
|
|
||
| static void checkSource(const ProcessorPtr & source, bool can_have_totals) |
There was a problem hiding this comment.
These functions were simply unused, so I removed them.
alexey-milovidov
left a comment
There was a problem hiding this comment.
Do not merge until investigating and fixing the bug in transactions:
Logical error: 'txn->getState() != MergeTreeTransaction::COMMITTED' (STID: 2508-2b69)
|
I fixed the issue that you introduced with server side fuzzing. Now please unblock the PR. |
This way we still do the conversion on multiple threads while also having all allocated memory in a single place and not shared in two join algorithms (concurrent hash and grace hash).
|
|
| M(JoinNonJoinedTransformBlockCount, "Number of blocks emitted by NonJoinedBlocksTransform.", ValueType::Number) \ | ||
| M(JoinNonJoinedTransformRowCount, "Number of non-joined rows emitted by NonJoinedBlocksTransform.", ValueType::Number) \ | ||
| M(JoinDelayedJoinedTransformBlockCount, "Number of blocks emitted by DelayedJoinedBlocksWorkerTransform.", ValueType::Number) \ | ||
| M(JoinDelayedJoinedTransformRowCount, "Number of rows emitted by DelayedJoinedBlocksWorkerTransform.", ValueType::Number) \ | ||
| M(JoinSpilledToDisk, "Number of times a hash join was switched to GraceHashJoin due to memory limit.", ValueType::Number) \ |
There was a problem hiding this comment.
I am not 100% happy about these names and metrics, but this is the best I could come up with.
There was a problem hiding this comment.
Would it be better to be more explicit about the emitter of the metric, i.e. the SpillingHashJoin? If yes, an alternative could be to use SpillingHashJoinSwitchedToGraceJoin instead of JoinSpilledToDisk. Is this what you are after?
| /// The decision should be done at latest in onBuildPhaseFinish, after that the returned value should not change. | ||
| /// This is important for SpillingHashJoin, which can change algorithms runtime, and parallel non-joined blocks | ||
| /// processing depends on the algorithm used. | ||
| virtual bool canProcessNonJoinedBlocksInParallel() const { return supportParallelNonJoinedBlocksProcessing(); } |
There was a problem hiding this comment.
Also not happy about the naming here: support vs can is not clear.
The first one supposed to mean whether the regarding processors (NonJoinedBlocksTransform) should be included in the pipeline or not.
The second one supposed to mean whether those processors can be actually used during execution. Decision is made latest on onBuildPhaseFinished.
There was a problem hiding this comment.
What about adding the term Now or RightNow to make it more clear that it's a runtime decision that may change: canProcessNonJoinedBlocksInParallelNow()
Or differentiating it from supportParallelNonJoinedBlocksProcessing() by emphasizing that it may request enabling/disabling the NonJoinedBlocksTransform at runtime: isParallelNonJoinedProcessingEnabled()
| SET max_threads = 6; | ||
| SET max_bytes_before_external_join = 0; | ||
|
|
||
| SELECT * FROM ( |
There was a problem hiding this comment.
This change removes the GLOBAL ANY INNER JOIN ... LIMIT 0 assertion from this test and effectively runs GLOBAL ANY LEFT JOIN twice. That weakens coverage for empty-right-table semantics in sharded execution (LEFT and INNER exercise different code paths/invariants).
Please keep an INNER case in this test (or add a separate deterministic test) so we do not regress coverage while adapting it for spilling.
| } | ||
|
|
||
| /// Notify the join that the query plan requires left-side read-in-order preservation. | ||
| /// SpillingHashJoin overrides this to forbid switching to GraceHashJoin at runtime. |
There was a problem hiding this comment.
The new IJoin::keepLeftPipelineInOrder contract says SpillingHashJoin "overrides this to forbid switching to GraceHashJoin at runtime", but SpillingHashJoin currently does not override keepLeftPipelineInOrder.
This creates a mismatch between API contract and implementation and can mislead future call sites that rely on this hook to preserve in-order semantics.
Please either:
- implement
SpillingHashJoin::keepLeftPipelineInOrderwith the intended behavior, or - adjust the base-interface comment to reflect current behavior.
| DECLARE(UInt64, archive_adaptive_buffer_max_size_bytes, 8 * DBMS_DEFAULT_BUFFER_SIZE, R"( | ||
| Limits the maximum size of the adaptive buffer used when writing to archive files (for example, tar archives)", 0) \ | ||
| DECLARE(UInt64, max_bytes_before_external_join, 0, R"( | ||
| If set to a non-zero value and `join_algorithm` is `hash`, `parallel_hash`, `default`, or `auto`, the hash join will automatically be converted to grace hash join to enable spilling to disk when the right-side data exceeds this many bytes. When set to 0 (default), automatic spilling is disabled. It prevents read in order through join optimization. |
There was a problem hiding this comment.
The setting description sentence "It prevents read in order through join optimization" is a bit unclear/awkward.
Could we rephrase to explicitly name the optimization, e.g. "Enabling this setting disables the read_in_order_through_join optimization"? This will make the user-facing behavior easier to understand.
| std::string getName() const override; | ||
| const TableJoin & getTableJoin() const override { return *table_join; } | ||
|
|
||
| bool addBlockToJoin(const Block & block, bool check_limits) override; |
There was a problem hiding this comment.
SpillingHashJoin does not override IJoin::addBlockToJoin(const Block &, size_t num_rows, bool), so FillingRightJoinSideTransform calls the base overload and num_rows is dropped.
This regresses correctness for right blocks with zero columns but non-zero rows (the exact PREWHERE/CROSS JOIN case handled in HashJoin::addBlockToJoin(const Block &, size_t, bool)). In the in-memory path, SpillingHashJoin::addBlockToJoin(const Block &, bool) forwards to HashJoin::addBlockToJoin(const Block &, bool), which uses Block::rows() and treats such blocks as 0 rows.
Please override the num_rows overload in SpillingHashJoin and propagate num_rows to the wrapped join (HashJoin/ConcurrentHashJoin) so row counts stay correct when the right block has no columns.
| /// Because hasDelayedBlocks returns true, the read-in-order-through-join optimisation | ||
| /// in optimizeReadInOrder.cpp will NOT propagate through SpillingHashJoin (same as | ||
| /// GraceHashJoin), since spilling may reorder rows. | ||
| class SpillingHashJoin final : public IJoin |
There was a problem hiding this comment.
Enabling max_bytes_before_external_join wraps hash joins in SpillingHashJoin, but this class does not override isCloneSupported / clone.
That makes join-clone-dependent optimizations silently stop applying (e.g. optimizeJoinLegacy and tryConvertOuterJoinToInnerJoinLegacy both bail out when isCloneSupported is false), even in the common case where the query stays in-memory and never spills.
Please either forward clone support to the active in-memory implementation (and document behavior after switching), or explicitly gate/disable this optimization loss to avoid surprising performance regressions when users only enable auto-spill protection.
LLVM Coverage ReportChanged lines: 81.10% (382/471) · Uncovered code |
Cherry pick #97813 to 26.4: Auto spilling join
Backport #97813 to 26.4: Auto spilling join

Resolves #83677
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Add automatic spilling to hash and parallel hash joins by converting them to grace hash join when memory limit is reached. This behavior is controlled by
max_bytes_before_external_join.Documentation entry for user-facing changes
Note
Medium Risk
Introduces a new join implementation that can switch from in-memory hash joins to disk-spilling
GraceHashJoinat runtime, affecting core query execution and concurrency paths. Risk is mitigated by being opt-in viamax_bytes_before_external_joinplus extensive pipeline/test coverage, but regressions could impact join correctness/performance and read-in-order queries.Overview
Adds opt-in automatic spilling for
hash/parallel_hash/default/autojoins via new settingmax_bytes_before_external_join; when the right side exceeds the threshold, the join converts toGraceHashJointo spill to disk (newSpillingHashJoin).Updates join selection (analyzer + planner) to instantiate
SpillingHashJoin, extendsIJoinwithkeepLeftPipelineInOrder()andcanProcessNonJoinedBlocksInParallel(), and adjusts pipeline wiring so delayed-join blocks and parallel non-joined streams can coexist safely.Improves correctness/observability around spilling by enabling per-slot block extraction for
ConcurrentHashJoin, filtering released blocks to avoid duplication, adding new join profile events (non-joined/delayed block counts + spill count), fixing a couple dictionary exception messages, and adding/adjusting stateless tests (including a new03915_spilling_hash_joinsuite and a read-in-order negative case).Written by Cursor Bugbot for commit db5f199. This will update automatically on new commits. Configure here.
Version info
26.5.1.14026.4.1.1125