feat(parquet): row-group morselization for sibling FileStream stealing by Dandandan · Pull Request #21766 · apache/datafusion · GitHub
Skip to content
Closed
597 changes: 561 additions & 36 deletions datafusion/datasource-parquet/src/opener.rs

Large diffs are not rendered by default.

53 changes: 41 additions & 12 deletions datafusion/datasource-parquet/src/row_filter.rs
23 changes: 12 additions & 11 deletions datafusion/datasource-parquet/src/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ use parquet::data_type::Decimal;
use parquet::schema::types::SchemaDescriptor;
use parquet::{bloom_filter::Sbbf, file::metadata::RowGroupMetaData};

/// Starting byte offset of a row group in its parquet file.
///
/// Uses the first column's dictionary page offset when present, otherwise its
/// data page offset — intentionally *not* the metadata location, per
/// <https://github.com/apache/datafusion/issues/5995>.
pub fn row_group_start_offset(metadata: &RowGroupMetaData) -> i64 {
let col = metadata.column(0);
col.dictionary_page_offset()
.unwrap_or_else(|| col.data_page_offset())
}

/// Reduces the [`ParquetAccessPlan`] based on row group level metadata.
///
/// This struct implements the various types of pruning that are applied to a
Expand Down Expand Up @@ -224,17 +235,7 @@ impl RowGroupAccessPlanFilter {
if !self.access_plan.should_scan(idx) {
continue;
}

// Skip the row group if the first dictionary/data page are not
// within the range.
//
// note don't use the location of metadata
// <https://github.com/apache/datafusion/issues/5995>
let col = metadata.column(0);
let offset = col
.dictionary_page_offset()
.unwrap_or_else(|| col.data_page_offset());
if !range.contains(offset) {
if !range.contains(row_group_start_offset(metadata)) {
self.access_plan.skip(idx);
}
}
Expand Down
9 changes: 8 additions & 1 deletion datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion_common::config::ConfigOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_datasource::file_stream::{FileOpener, SharedWorkSource};
use datafusion_datasource::morsel::Morselizer;

use arrow::datatypes::TimeUnit;
Expand Down Expand Up @@ -526,6 +526,7 @@ impl FileSource for ParquetSource {
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
shared_work_source: Option<SharedWorkSource>,
) -> datafusion_common::Result<Box<dyn Morselizer>> {
let expr_adapter_factory = base_config
.expr_adapter_factory
Expand Down Expand Up @@ -553,6 +554,10 @@ impl FileSource for ParquetSource {
.as_ref()
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());

let output_schema = Arc::new(
self.projection
.project_schema(self.table_schema.table_schema())?,
);
Ok(Box::new(ParquetMorselizer {
partition_index: partition,
projection: self.projection.clone(),
Expand Down Expand Up @@ -580,6 +585,8 @@ impl FileSource for ParquetSource {
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
shared_work_source,
output_schema,
}))
}

Expand Down
9 changes: 8 additions & 1 deletion datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;

use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use crate::file_stream::{FileOpener, SharedWorkSource};
use crate::morsel::{FileOpenerMorselizer, Morselizer};
#[expect(deprecated)]
use crate::schema_adapter::SchemaAdapterFactory;
Expand Down Expand Up @@ -82,11 +82,18 @@ pub trait FileSource: Any + Send + Sync {
///
/// It is preferred to implement the [`Morselizer`] API directly by
/// implementing this method.
///
/// `shared_work_source`, when `Some`, is the queue of unopened files
/// shared across sibling streams. File sources that can sub-divide a
/// single file into smaller stealable work units (e.g. parquet row-group
/// splitting) may push donated chunks onto it; sources that cannot simply
/// ignore the parameter.
fn create_morselizer(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
_shared_work_source: Option<SharedWorkSource>,
) -> Result<Box<dyn Morselizer>> {
let opener = self.create_file_opener(object_store, base_config, partition)?;
Ok(Box::new(FileOpenerMorselizer::new(opener)))
Expand Down
9 changes: 7 additions & 2 deletions datafusion/datasource/src/file_scan_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,6 @@ impl DataSource for FileScanConfig {

let source = self.file_source.with_batch_size(batch_size);

let morselizer = source.create_morselizer(object_store, self, partition)?;

// Extract the shared work source from the sibling state if it exists.
// This allows multiple sibling streams to steal work from a single
// shared queue of unopened files.
Expand All @@ -607,6 +605,13 @@ impl DataSource for FileScanConfig {
.and_then(|state| state.downcast_ref::<SharedWorkSource>())
.cloned();

let morselizer = source.create_morselizer(
object_store,
self,
partition,
shared_work_source.clone(),
)?;

let stream = FileStreamBuilder::new(self)
.with_partition(partition)
.with_shared_work_source(shared_work_source)
Expand Down
1 change: 1 addition & 0 deletions datafusion/datasource/src/file_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use self::scan_state::{ScanAndReturn, ScanState};

pub use builder::FileStreamBuilder;
pub use metrics::{FileStreamMetrics, StartableTime};
pub use work_source::SharedWorkSource;

/// A stream that iterates record batch by record batch, file over file.
pub struct FileStream {
Expand Down
35 changes: 31 additions & 4 deletions datafusion/datasource/src/file_stream/scan_state.rs
Loading
Loading