feat: sort file groups by statistics during sort pushdown (Sort pushdown phase 2)#21182
feat: sort file groups by statistics during sort pushdown (Sort pushdown phase 2)#21182zhuqi-lucas wants to merge 6 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Implements statistics-driven file group ordering as part of sort pushdown, enabling sort elimination when within-file ordering matches and files are non-overlapping, plus a best-effort stats-based reorder fallback when exact pushdown isn’t possible.
Changes:
- Add file-group reordering by min/max statistics (and non-overlap validation) to enable
SortExecelimination for exactly ordered, non-overlapping files. - Extend Parquet sort pushdown to return
Exactwhen Parquet ordering metadata satisfies the requested ordering. - Add/adjust SLT + Rust tests and a new benchmark to validate and measure the optimization.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/sqllogictest/test_files/sort_pushdown.slt | Adds SLT coverage for stats reorder, sort elimination, LIMIT behavior, multi-partition behavior, and inferred ordering from Parquet metadata. |
| datafusion/physical-optimizer/src/pushdown_sort.rs | Updates module docs to reflect new capabilities (Exact elimination + stats-based ordering). |
| datafusion/datasource/src/file_scan_config.rs | Implements core stats-based reordering, non-overlap validation, “exact” preservation logic, and cross-group redistribution. Adds unit tests. |
| datafusion/datasource-parquet/src/source.rs | Returns Exact when Parquet natural ordering satisfies the requested sort. |
| datafusion/core/tests/physical_optimizer/pushdown_sort.rs | Updates a prefix-match test to reflect Exact pushdown / sort elimination behavior. |
| benchmarks/src/sort_pushdown.rs | Adds a benchmark to measure sort elimination and LIMIT benefits on sorted, non-overlapping parquet files. |
| benchmarks/src/lib.rs | Registers the new sort_pushdown benchmark module. |
| benchmarks/src/bin/dfbench.rs | Exposes sort-pushdown as a new dfbench subcommand. |
| benchmarks/bench.sh | Adds bench.sh targets to run the new sort pushdown benchmarks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Very exciting! I hope I have wifi on the plane later today so I can review. |
adriangb
left a comment
There was a problem hiding this comment.
Just some comment for now. My flight ended up being a 4 hour delay on the tarmac debacle.
benchmarks/src/bin/dfbench.rs
Outdated
|
|
||
| use datafusion_benchmarks::{ | ||
| cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_tpch, tpcds, tpch, | ||
| cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_pushdown, sort_tpch, tpcds, |
There was a problem hiding this comment.
Adding sort_pushdown. To keep the PR smaller and so we can run comparison benchmarks, could you split the benchmarks out into their own PR?
There was a problem hiding this comment.
Thanks for the review! Good idea — will split the benchmarks into a follow-up PR to keep this one focused on the core optimization.
| inner: Arc::new(new_source) as Arc<dyn FileSource>, | ||
| }) | ||
|
|
||
| // TODO Phase 2: Add support for other optimizations: |
There was a problem hiding this comment.
I do think there's one more trick we could have up our sleeves: instead of only reversing row group orders we could pass the desired sort order into the opener and have it re-sort the row groups based on stats to try to match the scan's desired ordering. This might be especially effective once we have morselized scans since we could terminate after a single row group for TopK queries.
There was a problem hiding this comment.
Great idea! Row-group-level statistics reordering would be a natural extension of our file-level reordering but at finer granularity. Especially powerful with morselized scans where TopK could terminate after a single row group. Will track as a follow-up.
| ############################################################### | ||
| # Statistics-based file sorting and sort elimination tests | ||
| ############################################################### |
There was a problem hiding this comment.
We could also commit these alongside the benchmarks so we can then look at just the diff.
There was a problem hiding this comment.
Makes sense. The SLT changes to existing tests (updated EXPLAIN outputs showing reordered files) need to stay with the core PR since they validate the new behavior. I will split just the benchmark code into its own PR as suggested above.
| /// | ||
| /// # Sort Pushdown Architecture | ||
| /// | ||
| /// ```text |
There was a problem hiding this comment.
This diagram is amazing, thank you so much for the detailed docs!
There was a problem hiding this comment.
Thank you for taking the time to review! Really appreciate it.
| /// │ | ||
| /// └─► try_sort_file_groups_by_statistics() | ||
| /// (best-effort: reorder files by min/max stats) | ||
| /// └─► Inexact if reordered, Unsupported if already in order |
There was a problem hiding this comment.
Unsupported if already in order
I didn't understand this part
There was a problem hiding this comment.
Good catch — let me clarify. When FileSource returns Unsupported, we fall back to try_sort_file_groups_by_statistics() which reorders files by min/max stats. But if the files are already in the correct order (any_reordered = false), we return Unsupported rather than Inexact — because we did not actually change anything. Returning Inexact would make the optimizer think we optimized the plan, but it is identical to the original. Will improve the wording in the comment.
| // When there are multiple groups, redistribute files using consecutive | ||
| // assignment so that each group remains non-overlapping AND groups are | ||
| // ordered relative to each other. This enables: | ||
| // - No SortExec per partition (files in each group are sorted & non-overlapping) | ||
| // - SPM cheaply merges ordered streams (O(n) merge) | ||
| // - Parallel I/O across partitions | ||
| // | ||
| // Before (bin-packing may interleave): | ||
| // Group 0: [file_01(1-10), file_03(21-30)] ← gap, interleaved with group 1 | ||
| // Group 1: [file_02(11-20), file_04(31-40)] |
There was a problem hiding this comment.
Are there scenarios where ending up with lopsided partitions negates the benefits?
There was a problem hiding this comment.
In practice the impact is minimal: file count is typically much larger than partition count, so the imbalance is at most 1 extra file (e.g. 51 vs 50 files). Even with some imbalance, parallel I/O across partitions still beats single-partition sequential reads. For LIMIT queries it matters even less since the first partition hits the limit and stops early regardless of size.
There was a problem hiding this comment.
After further analysis, I am considering removing the redistribution logic entirely. The three benefits listed in the comment are not actually unique to redistribution:
- No SortExec per partition — true regardless of redistribution, as long as files within each group are non-overlapping
- SPM cheaply merges ordered streams — SPM is O(n) merge whether groups are interleaved or consecutive
- Parallel I/O across partitions — actually better with interleaved groups, since SPM alternates pulling from both partitions, keeping both I/O streams active
The only real difference is that consecutive assignment makes each partition's file reads more sequential (fewer open/close alternations). But interleaved groups give better I/O parallelism because both partitions are actively scanning simultaneously.
Given the marginal benefit vs added complexity (new function + tests), I think we should remove redistribute_files_across_groups_by_statistics and just keep the core optimization: per-partition sort elimination via statistics-based non-overlapping detection.
What do you think?
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-file-groups-by-statistics (a79cbdf) to 7cbc6b4 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-file-groups-by-statistics (a79cbdf) to 7cbc6b4 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-file-groups-by-statistics (a79cbdf) to 7cbc6b4 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
a79cbdf to
5e3eaac
Compare
|
Thanks for the review @adriangb! I've addressed all feedback:
|
I'd like to do the opposite: commit the benchmarks and SLT tests as a precursor PR, then rebase this branch so we can see just the diff in benchmarks / SLT. |
Good point, i agree. |
Sort files within each file group by min/max statistics during sort pushdown to better align with the requested ordering. When files are non-overlapping and within-file ordering is guaranteed (e.g. Parquet with sorting_columns metadata), the SortExec is completely eliminated. Key changes: - ParquetSource::try_pushdown_sort returns Exact when natural ordering satisfies the request, enabling sort elimination - FileScanConfig sorts files within groups by statistics and verifies non-overlapping property to determine Exact vs Inexact - Multi-group files are redistributed consecutively to preserve both sort elimination and I/O parallelism across partitions - Statistics-based file reordering as fallback when FileSource returns Unsupported (benefits TopK via better dynamic filter pruning) - New sort_pushdown benchmark for measuring sort elimination speedup Closes apache#17348 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… improve docs - Remove dead stats computation in reverse_file_groups branch (reverse path is always Inexact, so all_non_overlapping is unused) - Add reverse prefix matching documentation to pushdown_sort module Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ting Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ring Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The existing doc comment explains that multi-file partitions break output ordering. Add a note about the exception: when sort pushdown verifies files are non-overlapping via statistics, output_ordering is preserved and SortExec can be eliminated. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… comment - Remove benchmark code (sort_pushdown.rs, bench.sh, dfbench.rs changes) to be submitted as a separate follow-up PR per reviewer request - Clarify "Unsupported if already in order" in architecture diagram: explain that Unsupported means no change was made to the plan Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
5e3eaac to
f9de9be
Compare
Which issue does this PR close?
Closes #17348
Closes #19329
Rationale for this change
This PR implements the core optimization described in the EPIC Sort pushdown / partially sorted scans: using file-level min/max statistics to optimize scan order and eliminate unnecessary sort operators.
Currently, when a query has
ORDER BY, DataFusion always inserts aSortExeceven when the data is already sorted across files. This PR enables:sorting_columnsmetadata (noWITH ORDERneeded)What changes are included in this PR?
Architecture
Three Optimization Paths
Path 1: Sort Elimination (Exact) — removes SortExec entirely
When the file source's natural ordering satisfies the query (e.g., Parquet files with
sorting_columnsmetadata), and files within each group are non-overlapping, theSortExecis completely eliminated.Path 2: Reverse Scan (Inexact) — existing optimization, enhanced
When the requested order is the reverse of the natural ordering,
reverse_row_groups=trueis set. SortExec stays but benefits from approximate ordering.Path 3: Statistics-Based File Reordering — new fallback
When the FileSource returns
Unsupported, files are reordered by their min/max statistics to approximate the requested order. This benefits TopK queries via better dynamic filter pruning.Multi-Partition Design
For multiple execution partitions, the optimization works per-partition:
When bin-packing interleaves file ranges across groups, files are redistributed using consecutive assignment to ensure groups are ordered relative to each other:
Automatic Ordering Inference
DataFusion already infers ordering from Parquet
sorting_columnsmetadata (viaordering_from_parquet_metadata). With this PR, the inferred ordering flows through sort pushdown automatically — users don't needWITH ORDERfor sorted Parquet files.Files Changed
datasource-parquet/src/source.rsExactwhen natural ordering satisfies requestdatasource/src/file_scan_config.rsphysical-optimizer/src/pushdown_sort.rscore/tests/physical_optimizer/pushdown_sort.rssqllogictest/test_files/sort_pushdown.sltbenchmarks/src/sort_pushdown.rsbenchmarks/{lib,bin/dfbench,bench}.{rs,sh}Benchmark Results
300k rows, 8 non-overlapping sorted parquet files, single partition:
ORDER BY col ASC(full scan)ORDER BY col ASC LIMIT 100ORDER BY col ASC(wide,SELECT *)ORDER BY col ASC LIMIT 100(wide)LIMIT queries benefit most (67-74%) because sort elimination + limit pushdown means only the first few rows are read.
Tests
Unit Tests (12 new)
SLT Integration Tests (5 new groups)
target_partitions=2→ SPM + per-partition sort eliminationIntegration Tests
Test plan
cargo test -p datafusion-datasource(111 tests pass)cargo test -p datafusion-datasource-parquet(96 tests pass)cargo test -p datafusion-physical-optimizer(27 tests pass)cargo test -p datafusion --test core_integration(919 tests pass)cargo test -p datafusionall tests (1997+ pass)cargo clippy— 0 warnings🤖 Generated with Claude Code