[SPARK-57415][SQL] Parquet vectorized reader performance improvements (umbrella) · Issue #56011 · apache/spark · GitHub
Skip to content

[SPARK-57415][SQL] Parquet vectorized reader performance improvements (umbrella) #56011

Description

@iemejia

JIRA: https://issues.apache.org/jira/browse/SPARK-57415

Overview

This is an umbrella issue tracking a series of performance improvements to the Parquet vectorized reader in Spark SQL. The changes target allocation reduction, bulk-read optimizations, and JIT-friendly code patterns across multiple encoding paths.

All PRs are independent and can be reviewed/merged in any order. Together they yield significant throughput gains (1.2x to 9x depending on the encoding and data shape) for Parquet reads with no user-facing behavioral changes.

Summary

# JIRA PR Status Focus Key Speedup
1 SPARK-56892 #55919 Merged DELTA_BINARY_PACKED bulk reads + widening up to 8.2x
2 SPARK-56893 #55920 Merged Dictionary decode hasNull fast path 1.22-1.62x
3 SPARK-56894 #55921 In Review BYTE_STREAM_SPLIT vectorized reader + widening + FLBA 1.5-4.5x
4 SPARK-56895 #55922 Merged RLE PACKED batch ByteBuffer slice 1.4-2.4x
5 SPARK-56896 #55923 Merged Timestamp/date updater bulk reads 2.1-3.3x
6 SPARK-56897 #55924 Open DELTA_BYTE_ARRAY allocation reduction 1.35-1.74x
7 SPARK-56907 #55932 Open DELTA_LENGTH_BYTE_ARRAY allocation reduction 1.1-1.69x
8 SPARK-57420 #56479 Merged Benchmark workflow: skip TPC-DS gen + early CPU check ~5-10 min saved per run

Pull Requests

1. DELTA_BINARY_PACKED bulk read optimization

PR: #55919 (SPARK-56892)

Replaces per-element lambda dispatch in readIntegers/readLongs with bulk paths that compute prefix sums in-place and write via putInts/putLongs. Also eliminates 3 allocations per value in readUnsignedLongs by replacing BigInteger(Long.toUnsignedString(v)) with a zero-allocation byte[] loop encoder. Adds readIntegersAsLongs and readIntegersAsDoubles widening overrides that skip the int narrowing step entirely.

Benchmarks on AMD EPYC 7763 (JDK 17/21/25):

Type Speedup
INT32 reads 1.1-1.6x
INT32 skip 1.3-1.8x
INT64 reads 1.8-3.7x
INT64 skip 2.3-4.0x
readIntegersAsLongs (INT32 -> Long) 2.4-2.7x
readIntegersAsDoubles (INT32 -> Double) 2.1-2.4x
readUnsignedLongs 7.3-8.2x

2. Dictionary decoding hasNull fast path + per-class updater overrides

PR: #55920 (SPARK-56893)

Adds a hasNull() fast path that skips per-element null checks when the column has no nulls (common case). Per-class decodeDictionaryIds overrides give C2 monomorphic call sites, enabling full inlining of type-specific decode expressions.

Benchmarks on AMD EPYC 9V74 (baseline vs optimized on same CPU):

Scenario JDK 17 JDK 21 JDK 25
No nulls 1.21-1.22x 1.56-1.62x 1.24-1.25x
10% nulls ~1.0x 1.24-1.29x ~1.0x
50% nulls ~1.0x 1.25-1.26x ~1.0x

JDK 21 benefits dramatically across all null fractions due to monomorphic devirtualization. JDK 17/25 benefit primarily in the no-nulls fast path.


3. Vectorized BYTE_STREAM_SPLIT reader

PR: #55921 (SPARK-56894)

Adds a new VectorizedByteStreamSplitValuesReader that decodes BSS-encoded pages (FLOAT, DOUBLE, INT32, INT64, FIXED_LEN_BYTE_ARRAY) using batch byte-gathering instead of falling back to parquet-mr per-value reads. Includes widening overrides, FLBA batch allocation reduction, and FixedLenByteArrayUpdater routing through the batch path.

Type JDK 17 JDK 21 JDK 25
INT32 4.5x 3.8x 4.2x
INT64 2.8x 2.0x 1.8x
FLOAT 4.3x 3.6x 4.3x
DOUBLE 2.7x 2.0x 1.8x
readIntegersAsLongs 3.8x 3.0x 3.5x
readFloatsAsDoubles 3.8x 3.2x 3.9x
FLBA(12) readBinary 1.7x 1.6x 1.5x

4. Batch ByteBuffer slice in RLE PACKED decode

PR: #55922 (SPARK-56895)

Replaces per-group in.slice(bitWidth) (one ByteBuffer allocation per 8 values) with a single bulk slice for the entire PACKED run. Eliminates ~128K short-lived ByteBuffer allocations per 1M-value page.

bitWidth Speedup (readIntegers) Speedup (skipIntegers)
4 2.0x 2.1x
8 2.0x 2.4x
12 1.6x 1.6x
20 1.4x 1.4x

5. Bulk read paths for timestamp/date Parquet vector updaters

PR: #55923 (SPARK-56896)

Replaces per-element readValue loops with two-pass bulk read + in-place conversion for four updaters (LongAsMicrosUpdater, LongAsNanosUpdater, LongAsMicrosRebaseUpdater, DateToTimestampNTZWithRebaseUpdater). Avoids per-element virtual dispatch through VectorizedValuesReader. Note: DateToTimestampNTZUpdater (CORRECTED mode) was already optimized via SPARK-56804.

Updater Speedup (JDK 17/21/25)
LongAsMicrosUpdater 2.1x / 2.9x / 3.3x
LongAsNanosUpdater (new benchmark; ~2.6x in local runs)
LongAsMicrosRebaseUpdater (new benchmark; ~2.1x in local runs)
DateToTimestampNTZWithRebaseUpdater (new benchmark; ~2.0x in local runs)

6. Reduce per-value allocations in DELTA_BYTE_ARRAY decoder

PR: #55924 (SPARK-56897)

Replaces ByteBuffer-based state tracking with a reusable byte[] buffer, eliminating 2 ByteBuffer allocations per decoded value (~8K objects per 4096-value page). Also rewrites skipBinary to avoid column vector reset/swap overhead.

skipBinary (primary improvement):

Case JDK 17 JDK 21 JDK 25
no overlap 1.35x 1.39x 1.49x
half overlap 1.62x 1.62x 1.72x
full overlap 1.61x 1.64x 1.74x

readBinary (neutral to modest improvement):

Case JDK 17 JDK 21 JDK 25
no overlap 0.92x 0.97x 1.06x
half overlap 1.07x 1.11x 1.22x

7. Reduce per-value allocation in DELTA_LENGTH_BYTE_ARRAY decoder

PR: #55932 (SPARK-56907)

Replaces per-value in.slice(length) with a single bulk slice for the entire batch. Replaces per-value skip loop with a single bulk skip.

Case JDK 17 JDK 21 JDK 25
readBinary, payloadLen=8 1.14x 1.09x 1.18x
readBinary, payloadLen=32 1.10x 0.84x 1.17x
skipBinary, payloadLen=8 1.42x 1.69x 1.39x
skipBinary, payloadLen=32 1.40x 1.02x 1.34x

8. Benchmark workflow: skip TPC-DS generation + early CPU check

PR: #56479 (SPARK-57420)

Two improvements to the GHA benchmark workflow used to produce results for this umbrella:

  1. Skip TPC-DS data generation for non-TPCDS benchmarks. Changes contains(inputs.class, '*') to inputs.class == '*' so wildcard patterns like *VectorizedDeltaReaderBenchmark no longer trigger the expensive TPC-DS generation job (~5-10 min saved per run).

  2. Add early CPU model check step that runs immediately after checkout, before compilation. Prints the CPU as a ::notice:: annotation for live visibility in the Actions UI, and optionally fails fast if the runner CPU does not match the expected-cpu input parameter (saves 20-30 min of wasted runs on wrong hardware).


Common Themes

  • Allocation reduction: Replace per-value ByteBuffer.slice() / ByteBuffer.wrap() with bulk reads into reusable buffers
  • Bulk vectorized reads: Replace per-element virtual dispatch with single batch calls backed by System.arraycopy
  • JIT-friendly patterns: Per-class method overrides for monomorphic call sites; avoiding megamorphic profile pollution from shared helpers

Benchmarking

All benchmarks run via GHA workflow on AMD EPYC 7763 with OpenJDK 17/21/25 (both baseline and PR on the same CPU model). Results committed to each PR branch.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions