Skip to content

Commit 53b0ffb

Browse files
adriangbclaude
andauthored
fix: validate inter-file ordering in eq_properties() (#20329)
## Summary Discovered this bug while working on #19724. TLDR: just because the files themselves are sorted doesn't mean the partition streams are sorted. - **`eq_properties()` in `FileScanConfig` blindly trusted `output_ordering`** (set from Parquet `sorting_columns` metadata) without verifying that files within a group are in the correct inter-file order - `EnforceSorting` then removed `SortExec` based on this unvalidated ordering, producing **wrong results** when filesystem order didn't match data order - Added `validated_output_ordering()` that filters orderings using `MinMaxStatistics::new_from_files()` + `is_sorted()` to verify inter-file sort order before reporting them to the optimizer ## Changes ### `datafusion/datasource/src/file_scan_config.rs` - Added `validated_output_ordering()` method on `FileScanConfig` that validates each output ordering against actual file group statistics - Changed `eq_properties()` to call `self.validated_output_ordering()` instead of `self.output_ordering.clone()` ### `datafusion/sqllogictest/test_files/sort_pushdown.slt` Added 8 new regression tests (Tests 4-11): | Test | Scenario | Key assertion | |------|----------|---------------| | **4** | Reversed filesystem order (inferred ordering) | SortExec retained — wrong inter-file order detected | | **5** | Overlapping file ranges (inferred ordering) | SortExec retained — overlapping ranges detected | | **6** | `WITH ORDER` + reversed filesystem order | SortExec retained despite explicit ordering | | **7** | Correctly ordered multi-file group (positive) | SortExec eliminated — validation passes | | **8** | DESC ordering with wrong inter-file DESC order | SortExec retained for DESC direction | | **9** | Multi-column sort key (overlapping vs non-overlapping) | Conservative rejection with overlapping stats; passes with clean boundaries | | **10** | Correctly ordered + `WITH ORDER` (positive) | SortExec eliminated — both ordering and stats agree | | **11** | Multiple partitions (one file per group) | `SortPreservingMergeExec` merges; no per-partition sort needed | ## Test plan - [x] `cargo test --test sqllogictests -- sort_pushdown` — all new + existing tests pass - [x] `cargo test -p datafusion-datasource` — 97 unit tests + 6 doc tests pass - [x] Existing Test 1 (single-file sort pushdown with `WITH ORDER`) still eliminates SortExec (no regression) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 98cc753 commit 53b0ffb

File tree

5 files changed

+660
-47
lines changed

5 files changed

+660
-47
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,7 @@ mod test {
826826
let plan_string = get_plan_string(&aggregate_exec_partial).swap_remove(0);
827827
assert_snapshot!(
828828
plan_string,
829-
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)], ordering_mode=Sorted"
829+
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]"
830830
);
831831

832832
let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?;

datafusion/datasource/src/file_scan_config.rs

Lines changed: 118 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -736,7 +736,7 @@ impl DataSource for FileScanConfig {
736736
let schema = self.file_source.table_schema().table_schema();
737737
let mut eq_properties = EquivalenceProperties::new_with_orderings(
738738
Arc::clone(schema),
739-
self.output_ordering.clone(),
739+
self.validated_output_ordering(),
740740
)
741741
.with_constraints(self.constraints.clone());
742742

@@ -926,6 +926,40 @@ impl DataSource for FileScanConfig {
926926
}
927927

928928
impl FileScanConfig {
929+
/// Returns only the output orderings that are validated against actual
930+
/// file group statistics.
931+
///
932+
/// For example, individual files may be ordered by `col1 ASC`,
933+
/// but if we have files with these min/max statistics in a single partition / file group:
934+
///
935+
/// - file1: min(col1) = 10, max(col1) = 20
936+
/// - file2: min(col1) = 5, max(col1) = 15
937+
///
938+
/// Because reading file1 followed by file2 would produce out-of-order output (there is overlap
939+
/// in the ranges), we cannot retain `col1 ASC` as a valid output ordering.
940+
///
941+
/// Similarly this would not be a valid order (non-overlapping ranges but not ordered):
942+
///
943+
/// - file1: min(col1) = 20, max(col1) = 30
944+
/// - file2: min(col1) = 10, max(col1) = 15
945+
///
946+
/// On the other hand if we had:
947+
///
948+
/// - file1: min(col1) = 5, max(col1) = 15
949+
/// - file2: min(col1) = 16, max(col1) = 25
950+
///
951+
/// Then we know that reading file1 followed by file2 will produce ordered output,
952+
/// so `col1 ASC` would be retained.
953+
///
954+
/// Note that we are checking for ordering *within* *each* file group / partition,
955+
/// files in different partitions are read independently and do not affect each other's ordering.
956+
/// Merging of the multiple partition streams into a single ordered stream is handled
957+
/// upstream e.g. by `SortPreservingMergeExec`.
958+
fn validated_output_ordering(&self) -> Vec<LexOrdering> {
959+
let schema = self.file_source.table_schema().table_schema();
960+
validate_orderings(&self.output_ordering, schema, &self.file_groups, None)
961+
}
962+
929963
/// Get the file schema (schema of the files without partition columns)
930964
pub fn file_schema(&self) -> &SchemaRef {
931965
self.file_source.table_schema().file_schema()
@@ -1300,6 +1334,51 @@ fn ordered_column_indices_from_projection(
13001334
.collect::<Option<Vec<usize>>>()
13011335
}
13021336

1337+
/// Check whether a given ordering is valid for all file groups by verifying
1338+
/// that files within each group are sorted according to their min/max statistics.
1339+
///
1340+
/// For single-file (or empty) groups, the ordering is trivially valid.
1341+
/// For multi-file groups, we check that the min/max statistics for the sort
1342+
/// columns are in order and non-overlapping (or touching at boundaries).
1343+
///
1344+
/// `projection` maps projected column indices back to table-schema indices
1345+
/// when validating after projection; pass `None` when validating at
1346+
/// table-schema level.
1347+
fn is_ordering_valid_for_file_groups(
1348+
file_groups: &[FileGroup],
1349+
ordering: &LexOrdering,
1350+
schema: &SchemaRef,
1351+
projection: Option<&[usize]>,
1352+
) -> bool {
1353+
file_groups.iter().all(|group| {
1354+
if group.len() <= 1 {
1355+
return true; // single-file groups are trivially sorted
1356+
}
1357+
match MinMaxStatistics::new_from_files(ordering, schema, projection, group.iter())
1358+
{
1359+
Ok(stats) => stats.is_sorted(),
1360+
Err(_) => false, // can't prove sorted → reject
1361+
}
1362+
})
1363+
}
1364+
1365+
/// Filters orderings to retain only those valid for all file groups,
1366+
/// verified via min/max statistics.
1367+
fn validate_orderings(
1368+
orderings: &[LexOrdering],
1369+
schema: &SchemaRef,
1370+
file_groups: &[FileGroup],
1371+
projection: Option<&[usize]>,
1372+
) -> Vec<LexOrdering> {
1373+
orderings
1374+
.iter()
1375+
.filter(|ordering| {
1376+
is_ordering_valid_for_file_groups(file_groups, ordering, schema, projection)
1377+
})
1378+
.cloned()
1379+
.collect()
1380+
}
1381+
13031382
/// The various listing tables does not attempt to read all files
13041383
/// concurrently, instead they will read files in sequence within a
13051384
/// partition. This is an important property as it allows plans to
@@ -1366,52 +1445,47 @@ fn get_projected_output_ordering(
13661445
let projected_orderings =
13671446
project_orderings(&base_config.output_ordering, projected_schema);
13681447

1369-
let mut all_orderings = vec![];
1370-
for new_ordering in projected_orderings {
1371-
// Check if any file groups are not sorted
1372-
if base_config.file_groups.iter().any(|group| {
1373-
if group.len() <= 1 {
1374-
// File groups with <= 1 files are always sorted
1375-
return false;
1376-
}
1377-
1378-
let Some(indices) = base_config
1379-
.file_source
1380-
.projection()
1381-
.as_ref()
1382-
.map(|p| ordered_column_indices_from_projection(p))
1383-
else {
1384-
// Can't determine if ordered without a simple projection
1385-
return true;
1386-
};
1387-
1388-
let statistics = match MinMaxStatistics::new_from_files(
1389-
&new_ordering,
1448+
let indices = base_config
1449+
.file_source
1450+
.projection()
1451+
.as_ref()
1452+
.map(|p| ordered_column_indices_from_projection(p));
1453+
1454+
match indices {
1455+
Some(Some(indices)) => {
1456+
// Simple column projection — validate with statistics
1457+
validate_orderings(
1458+
&projected_orderings,
13901459
projected_schema,
1391-
indices.as_deref(),
1392-
group.iter(),
1393-
) {
1394-
Ok(statistics) => statistics,
1395-
Err(e) => {
1396-
log::trace!("Error fetching statistics for file group: {e}");
1397-
// we can't prove that it's ordered, so we have to reject it
1398-
return true;
1399-
}
1400-
};
1401-
1402-
!statistics.is_sorted()
1403-
}) {
1404-
debug!(
1405-
"Skipping specified output ordering {:?}. \
1406-
Some file groups couldn't be determined to be sorted: {:?}",
1407-
base_config.output_ordering[0], base_config.file_groups
1408-
);
1409-
continue;
1460+
&base_config.file_groups,
1461+
Some(indices.as_slice()),
1462+
)
1463+
}
1464+
None => {
1465+
// No projection — validate with statistics (no remapping needed)
1466+
validate_orderings(
1467+
&projected_orderings,
1468+
projected_schema,
1469+
&base_config.file_groups,
1470+
None,
1471+
)
1472+
}
1473+
Some(None) => {
1474+
// Complex projection (expressions, not simple columns) — can't
1475+
// determine column indices for statistics. Still valid if all
1476+
// file groups have at most one file.
1477+
if base_config.file_groups.iter().all(|g| g.len() <= 1) {
1478+
projected_orderings
1479+
} else {
1480+
debug!(
1481+
"Skipping specified output orderings. \
1482+
Some file groups couldn't be determined to be sorted: {:?}",
1483+
base_config.file_groups
1484+
);
1485+
vec![]
1486+
}
14101487
}
1411-
1412-
all_orderings.push(new_ordering);
14131488
}
1414-
all_orderings
14151489
}
14161490

14171491
/// Convert type to a type suitable for use as a `ListingTable`

datafusion/datasource/src/statistics.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,12 @@ impl MinMaxStatistics {
266266
}
267267

268268
/// Check if the min/max statistics are in order and non-overlapping
269+
/// (or touching at boundaries)
269270
pub fn is_sorted(&self) -> bool {
270271
self.max_by_sort_order
271272
.iter()
272273
.zip(self.min_by_sort_order.iter().skip(1))
273-
.all(|(max, next_min)| max < next_min)
274+
.all(|(max, next_min)| max <= next_min)
274275
}
275276
}
276277

datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,4 +274,4 @@ logical_plan
274274
02)--TableScan: test_table projection=[constant_col]
275275
physical_plan
276276
01)SortPreservingMergeExec: [constant_col@0 ASC NULLS LAST]
277-
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], file_type=parquet
277+
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], output_ordering=[constant_col@0 ASC NULLS LAST], file_type=parquet

0 commit comments

Comments
 (0)