feat: add datafusion.execution.enable_file_stream_work_stealing config#23294
Conversation
Add an ignored regression test for apache#23293. FileStream sibling work-stealing seeds one shared work queue from every file group and relies on all output partitions being polled concurrently in one process. An executor that runs each partition as an isolated task polls only one partition, which then drains the whole queue and reads files belonging to other partitions, inflating scan output by the partition count. The test builds and drives only partition 0 and asserts it reads solely its own file. It fails on main by design, so it is #[ignore]d with its assertion intact as a caught regression to triage.
FileStream sibling work-stealing (WorkSource::Shared) seeds one shared work queue from every file group and lets whichever output partition goes idle first steal the next file or byte-range morsel. This assumes all output partitions of a scan are polled concurrently in one process. Executors that run each output partition as an isolated task in a separate process (Ballista, datafusion-distributed) never poll the sibling partitions, so the single polled partition drains the whole queue and reads files belonging to other partitions, inflating the scan output by the partition count. That is a correctness bug for those executors, and the existing escape hatches (preserve_order, partitioned_by_file_group) are plan-level flags, not a session config they can set centrally. Add datafusion.execution.enable_file_stream_work_stealing (default true), checked in FileScanConfig::create_sibling_state: when false it returns None so each partition falls back to reading only its own file group. This mirrors the enable_dynamic_filter_pushdown precedent and round-trips through datafusion-proto as a config value. Thread ConfigOptions into DataSource::create_sibling_state so the flag is read from the session config at execute time. Turn the #[ignore]'d reproduction test into a passing regression test that drives only partition 0 and checks both behaviors: with the default (stealing on) partition 0 also reads partition 1's file, and with the flag off it reads only its own. Closes apache#23293.
|
We should probably also backport htis to 54.1.0 |
…param Drop the redundant bool from the test's drive_partition0 helper (assert the shared-queue state directly via create_sibling_state) and document the config parameter on DataSource::create_sibling_state.
alamb
left a comment
There was a problem hiding this comment.
I think this looks good to me - thanks @andygrove
i have some suggestions on documetation and tests, but that could be done as a follow on PR
| /// Should DataFusion keep the columns used for partition_by in the output RecordBatches | ||
| pub keep_partition_by_columns: bool, default = false | ||
|
|
||
| /// When `true` (the default), sibling partition streams of a single file |
There was a problem hiding this comment.
I think this focues a lot on the internal details here. I would recommend describing this flag in terms of its user visible behavior. Maybe something like
/// When `true` (the default), DataFusion's built in FileStream
/// tries to dynamically rebalance data between partitions during query
/// execution.
///
/// Executors that depend on the plan time partition assignments, such
/// as Ballista should set this to `false` to avoid runtime reassignment.
pub enable_file_stream_work_stealing: bool, default = true|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
Hmm I thought there was a API-level setting as well that can be used 🤔 |
You might be able set this flag: |
There is a PR open for that work - apache/datafusion-ballista#1911 |
- Reword the config docstring in user-visible terms (dynamic runtime rebalancing) rather than internal work-queue mechanics. - Note on FileScanConfig::file_groups that files may be reassigned across partitions at runtime when work stealing is enabled unless preserve_order or partitioned_by_file_group is set. - Replace the bespoke isolated-partition test with the file's standard morsel snapshot harness: FileStreamMorselTest gains with_enable_file_stream_work_stealing, and the test reuses two_partition_morsel_test to show that disabling the flag keeps each partition's files local. Regenerate configs.md and information_schema.slt for the reworded docstring.
… config (#23296) ## Which issue does this PR close? - Backport of #23294 to `branch-54`. - Closes #23293 for the 54 release line. ## Rationale for this change `FileStream` sibling work-stealing (`WorkSource::Shared`) seeds one shared work queue from every file group and lets whichever output partition goes idle first steal the next unopened file (or byte-range morsel). This assumes all output partitions of a scan are polled concurrently in one process. Executors that run each output partition as an isolated task in a separate process — Ballista and datafusion-distributed — never poll the sibling partitions. The single polled partition drains the whole shared queue and reads files belonging to other partitions, so every isolated task reads the entire input and the scan output is inflated by the partition count. This is a correctness bug for those executors, not just a performance one. The existing escape hatches (`preserve_order`, `partitioned_by_file_group`) are plan-level flags on `FileScanConfig`, not something a distributed executor can set centrally through the session config, and a plain repartitioned scan does not set `partitioned_by_file_group`. There is no session-level off switch, unlike `datafusion.optimizer.enable_dynamic_filter_pushdown`, which exists precisely so consumers that cannot support runtime cross-partition state can disable it. ## What changes are included in this PR? This is a backport of #23294. The changes are identical in intent; two conflicts were resolved for `branch-54`, which does not yet carry the `output_partitioning` field on `FileScanConfig` or the `enable_migration_aggregate` config that exist on `main`: - Add `datafusion.execution.enable_file_stream_work_stealing` (default `true`). When `false`, `FileScanConfig::create_sibling_state` returns `None`, so each partition falls back to `WorkSource::Local` and reads only its own file group. - Thread `&ConfigOptions` into `DataSource::create_sibling_state` so the flag is read from the session config at `execute` time. As a session config value it round-trips through `datafusion-proto` with no proto schema change. - Regenerate `configs.md` and add the setting to `information_schema.slt`. - Add the regression test `morsel_disabled_work_stealing_keeps_files_local` using the file's standard morsel snapshot harness. ## Are these changes tested? Yes. The `file_stream` test suite (including the new `morsel_disabled_work_stealing_keeps_files_local`) passes, and `cargo clippy` is clean. The existing sibling work-stealing tests continue to pass with the default. ## Are there any user-facing changes? A new session config, `datafusion.execution.enable_file_stream_work_stealing` (default `true`), so existing behavior is unchanged. `DataSource::create_sibling_state` gains a `&ConfigOptions` parameter (an API change for anyone implementing the `DataSource` trait directly).

Which issue does this PR close?
Rationale for this change
FileStreamsibling work-stealing (WorkSource::Shared, added in #21351 and extended by #23285) seeds one shared work queue from every file group and lets whichever output partition goes idle first steal the next unopened file (or byte-range morsel). This assumes all output partitions of a scan are polled concurrently in one process.Executors that run each output partition as an isolated task in a separate process — Ballista and datafusion-distributed — never poll the sibling partitions. The single polled partition drains the whole shared queue and reads files belonging to other partitions, so every isolated task reads the entire input and the scan output is inflated by the partition count. This is a correctness bug for those executors, not just a performance one.
The existing escape hatches (
preserve_order,partitioned_by_file_group) are plan-level flags onFileScanConfig, not something a distributed executor can set centrally through the session config, and a plain repartitioned scan does not setpartitioned_by_file_group. There is no session-level off switch, unlikedatafusion.optimizer.enable_dynamic_filter_pushdown, which exists precisely so consumers that cannot support runtime cross-partition state can disable it.What changes are included in this PR?
datafusion.execution.enable_file_stream_work_stealing(defaulttrue). Whenfalse,FileScanConfig::create_sibling_statereturnsNone, so each partition falls back toWorkSource::Localand reads only its own file group.&ConfigOptionsintoDataSource::create_sibling_stateso the flag is read from the session config atexecutetime. As a session config value it round-trips throughdatafusion-protowith no proto schema change.configs.mdand add the setting toinformation_schema.slt.#[ignore]d reproduction test into a passing regression test that drives only partition 0 (as an isolated task does) and asserts both behaviors: with the default (stealing on) partition 0 also reads partition 1's file, and with the flag off it reads only its own.Are these changes tested?
Yes.
isolated_partition_respects_work_stealing_configindatafusion/datasource/src/file_stream/mod.rscovers both the default (shared-queue) behavior and the flag-off behavior. The existing sibling work-stealing tests continue to pass with the default.information_schemasqllogictests pass with the new setting listed.Are there any user-facing changes?
A new session config,
datafusion.execution.enable_file_stream_work_stealing(defaulttrue), so existing behavior is unchanged.DataSource::create_sibling_stategains a&ConfigOptionsparameter (an API change for anyone implementing theDataSourcetrait directly).