Skip to content

Conversation

@zhuqi-lucas
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas commented Dec 22, 2025

Which issue does this PR close?

Rationale for this change

After #19064 was completed, I found it couldn't meet our internal project requirements:

  1. Redesign try_reverse_output using EquivalenceProperties

    • The previous implementation used a simple is_reverse method that could only handle basic reverse matching
    • Now leveraging EquivalenceProperties can handle more complex scenarios, including constant column elimination and monotonic functions
  2. Switch to ordering_satisfy method for ordering matching

    • This method internally:
      • Normalizes orderings (removes constant columns)
      • Checks monotonic functions (like date_trunc, CAST, CEIL)
      • Handles prefix matching
  3. Extend sort pushdown support to more operators

    • Added try_pushdown_sort implementation for ProjectionExec, FilterExec, CooperativeExec
    • These operators can now pass sort requests down to their children

What changes are included in this PR?

Core Changes:

  1. ParquetSource::try_reverse_output (datasource-parquet/src/source.rs)

    • Added eq_properties parameter
    • Reverses all orderings in equivalence properties
    • Uses ordering_satisfy to check if reversed ordering satisfies the request
    • Removed file_ordering field and with_file_ordering_info method
  2. FileSource trait (datasource/src/file.rs)

    • Updated try_reverse_output signature with eq_properties parameter
    • Added detailed documentation explaining parameter usage and examples
  3. FileScanConfig::try_pushdown_sort (datasource/src/file_scan_config.rs)

    • Simplified logic to directly call file_source.try_reverse_output
    • No longer needs to pre-check ordering satisfaction or set file ordering info
  4. New operator support

    • FilterExec::try_pushdown_sort - Pushes sort below filters
    • ProjectionExec::try_pushdown_sort - Pushes sort below projections
    • CooperativeExec::try_pushdown_sort - Supports sort pushdown in cooperative execution
  5. Removed obsolete methods

    • Deleted LexOrdering::is_reverse - replaced by ordering_satisfy

Feature Enhancements:

Supported optimization scenarios:

  1. Constant column elimination (Test 7)
   -- File ordering: [timeframe ASC, period_end ASC]
   -- Query: WHERE timeframe = 'quarterly' ORDER BY period_end DESC
   -- Effect: After timeframe becomes constant, reverse scan is enabled
  1. Monotonic function support (Test 8)
   -- File ordering: [ts ASC]
   -- Query: ORDER BY date_trunc('month', ts) DESC
   -- Effect: date_trunc is monotonic, reverse scan satisfies the request

Are these changes tested?

Yes, comprehensive tests have been added:

  • Test 7 (237 lines): Constant column elimination scenarios

    • Single constant column filter
    • Multi-value IN clauses (doesn't trigger optimization)
    • Literal constants in sort expressions
    • Non-leading column filters (edge cases)
  • Test 8 (355 lines): Monotonic function scenarios

    • date_trunc (date truncation)
    • CAST (type conversion)
    • CEIL (ceiling)
    • ABS (negative case - not monotonic over mixed positive/negative range)

All tests verify:

  • Presence of reverse_row_groups=true in physical plans
  • Correctness of query results

Are there any user-facing changes?

API Changes:

  • FileSource::try_reverse_output signature changed (added eq_properties parameter)
  • Removed FileSource::with_file_ordering_info method
  • Removed LexOrdering::is_reverse public method

User-visible improvements:

  • More queries can leverage reverse row group scanning for optimization
  • Especially queries with WHERE clauses that make certain columns constant
  • Queries using monotonic functions (like date functions, type conversions)

Note: This PR returns Inexact results because only row group order is reversed, not row order within row groups. Future enhancements could include:

  • File reordering based on statistics (returning Exact)
  • Partial sort pushdown for prefix matches

Copilot AI review requested due to automatic review settings December 22, 2025 03:00
@github-actions github-actions bot added sqllogictest SQL Logic Tests (.slt) datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Dec 22, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR redesigns the try_reverse_output method to use equivalence properties and the ordering_satisfy API, enabling more sophisticated sort pushdown optimizations. The key improvement is the ability to recognize when reversed file scanning can satisfy requested orderings through constant column elimination, monotonic function reasoning, and prefix matching.

Key Changes:

  • Modified FileSource::try_reverse_output to accept EquivalenceProperties parameter for advanced ordering analysis
  • Replaced simple is_reverse check with ordering_satisfy in ParquetSource to handle complex cases like constant columns from filters
  • Added forward-direction check in FileScanConfig to avoid unnecessary reverse optimization when ordering is already satisfied
  • Implemented try_pushdown_sort in FilterExec and ProjectionExec to propagate sort pushdown through the plan

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
datafusion/datasource/src/file.rs Updated FileSource trait to add eq_properties parameter to try_reverse_output, with comprehensive documentation of the new approach
datafusion/datasource-parquet/src/source.rs Reimplemented try_reverse_output to use ordering_satisfy with equivalence properties instead of simple is_reverse check, enabling constant column and monotonic function handling
datafusion/datasource/src/file_scan_config.rs Added forward-direction ordering check before attempting reverse optimization and enhanced comments explaining the delegation to FileSource
datafusion/physical-plan/src/filter.rs Implemented try_pushdown_sort to propagate sort pushdown requests through filter operators
datafusion/physical-plan/src/projection.rs Implemented try_pushdown_sort to propagate sort pushdown requests through projection operators
datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt Added comprehensive test suite (Test 7.1-7.10) covering constant column filtering, literal constants in sort expressions, and edge cases

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@zhuqi-lucas zhuqi-lucas marked this pull request as draft December 22, 2025 08:43
@zhuqi-lucas zhuqi-lucas marked this pull request as ready for review December 23, 2025 06:47
@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Dec 23, 2025

cc @adriangb @alamb , i submitted a PR to enhance the sort pushdown because it failed my internal project after initial try, can you help review? Thanks!

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very cool! I suggest splitting out the changes to FilterExec for further consideration and moving the tests to a different SLT file (order.slt? start an order_pushdown.slt?)

Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}

fn try_pushdown_sort(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a good improvement but is orthogonal to the change to use EqProperties (just noting)

})
}

fn try_pushdown_sort(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A big picture question on this one: is it beneficial to push down sort order past a filter? If it's "free" of course. But if there's runtime cost to re-ordering rows, it may be cheaper to filter then re-order (if the filter is very selective).

I think in practice, for Parquet, everything ends up happening in the scan itself and filters always get applied first, so this is inconsequential. But in theory it could hurt data sources that e.g. don't support filter pushdown and hence rely on a FilterExec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @adriangb for review, i agree, but currently i think it's good, i only add the following 3 basic cases, and don't add specific sort pushdown, just bypass to child sort pushdown to make it work:

FilterExec::try_pushdown_sort 
ProjectionExec::try_pushdown_sort 
CooperativeExec::try_pushdown_sort 

And they all just bypass self to the child sort pushdown, i was testing and also the new slt testing here, it seems need this, and they are the basic cases i think.

I also created a issue for the following operators to add more try_pushdown_sort implementation, we can investigate more for it:

#19394

SET datafusion.optimizer.enable_sort_pushdown = true;


# Test 7: Sort pushdown with constant column filtering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why in dynamic_filter_pushdown_config.slt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @adriangb , I added sort pushdown cases here because it included topk pushdown testing here, i can add a new test file.

@zhuqi-lucas
Copy link
Contributor Author

Looks very cool! I suggest splitting out the changes to FilterExec for further consideration and moving the tests to a different SLT file (order.slt? start an order_pushdown.slt?)

Thank you @adriangb for review, i addressed the comments to add a new sort_pushdown.slt.

But i still keep the bypass sort pushdown for :

FilterExec::try_pushdown_sort 
ProjectionExec::try_pushdown_sort 
CooperativeExec::try_pushdown_sort 

And they all just bypass self to the child sort pushdown, i was testing and also the new slt testing here, it seems need this, and they are the basic cases i think.

I also created a issue for the following operators to add more try_pushdown_sort implementation, we can investigate more for it:

#19394

What do you think?

@adriangb
Copy link
Contributor

I'm comfortable with ProjectionExec and CooperativeExec but I think FilterExec needs some more discussion since pushing ordering below filters might have a negative impact on performance for some queries / file sources.

@zhuqi-lucas
Copy link
Contributor Author

I'm comfortable with ProjectionExec and CooperativeExec but I think FilterExec needs some more discussion since pushing ordering below filters might have a negative impact on performance for some queries / file sources.

@adriangb Happy holidays! Thank you for the feedback! I completely understand your concern about FilterExec.

If i makes sense right, i've updated the implementation to address the performance issue you mentioned. Now FilterExec::try_pushdown_sort only pushes down the sort when the child node already has an output ordering:

fn try_pushdown_sort(
        &self,
        order: &[PhysicalSortExpr],
    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
        if let Some(_child_ordering) = self.input.output_ordering() {
            match self.input.try_pushdown_sort(order)? {
                SortOrderPushdownResult::Exact { inner } => {
                    let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
                    Ok(SortOrderPushdownResult::Exact { inner: new_exec })
                }
                SortOrderPushdownResult::Inexact { inner } => {
                    let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
                    Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
                }
                SortOrderPushdownResult::Unsupported => {
                    Ok(SortOrderPushdownResult::Unsupported)
                }
            }
        } else {
            Ok(SortOrderPushdownResult::Unsupported)
        }
    }

This way:

  • When the data source can optimize to use pushdown sort, so we pushdown to leverage the existing ordering.
  • When the data source is unsorted, we don't pushdown, so sorting happens after filtering on the smaller filtered dataset

This should avoid the performance degradation you were concerned about while still enabling the optimization for sorted data sources. The new tests validate this with the WHERE timeframe = 'quarterly' cases, which require pushdown through FilterExec to utilize the sorted Parquet file's ordering.

What do you think about this approach?

@adriangb
Copy link
Contributor

I'm comfortable with ProjectionExec and CooperativeExec but I think FilterExec needs some more discussion since pushing ordering below filters might have a negative impact on performance for some queries / file sources.

@adriangb Happy holidays! Thank you for the feedback! I completely understand your concern about FilterExec.

If i makes sense right, i've updated the implementation to address the performance issue you mentioned. Now FilterExec::try_pushdown_sort only pushes down the sort when the child node already has an output ordering:

fn try_pushdown_sort(
        &self,
        order: &[PhysicalSortExpr],
    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
        if let Some(_child_ordering) = self.input.output_ordering() {
            match self.input.try_pushdown_sort(order)? {
                SortOrderPushdownResult::Exact { inner } => {
                    let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
                    Ok(SortOrderPushdownResult::Exact { inner: new_exec })
                }
                SortOrderPushdownResult::Inexact { inner } => {
                    let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
                    Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
                }
                SortOrderPushdownResult::Unsupported => {
                    Ok(SortOrderPushdownResult::Unsupported)
                }
            }
        } else {
            Ok(SortOrderPushdownResult::Unsupported)
        }
    }

This way:

* When the data source can optimize to use pushdown sort, so we pushdown to leverage the existing ordering.

* When the data source is unsorted, we don't pushdown, so sorting happens after filtering on the smaller filtered dataset

This should avoid the performance degradation you were concerned about while still enabling the optimization for sorted data sources. The new tests validate this with the WHERE timeframe = 'quarterly' cases, which require pushdown through FilterExec to utilize the sorted Parquet file's ordering.

What do you think about this approach?

That's a super interesting and smart approach! It absolutely makes sense and nukes the problem for cases where the sort orders match (i.e. you basically know the sort will be eliminated). But I feel for those cases EnforceSorting optimizer should already handle. For other cases it's still not clear if it always makes sense. Consider a DataSource that does accept sort pushdown (at a cost e.g. buffering in memory) but doesn't accept filter pushdown. How do we know when it makes sense to push down the sort?

This issue of not pushing things down past filters seems to have come up for both projection and sort pushdown. In both cases we are in a situation of "always push it down into the DataSource if possible, otherwise it depends on costs and we don't have a cost based evaluator so hard to say". I do think it's worth asking ourselves the question: why are there some FilterExecs left? Could we re-order the optimizer rules so that the filters always get pushed down first? Do we need to implement filter pushdown for more operators? Maybe you could share some of the scenarios / queries where the FilterExec is preventing sort pushdown?

I still think our best bet would be to remove that once piece from this PR so that we can merge the rest then we can continue to discuss the FilterExec case.

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Dec 25, 2025

I'm comfortable with ProjectionExec and CooperativeExec but I think FilterExec needs some more discussion since pushing ordering below filters might have a negative impact on performance for some queries / file sources.

@adriangb Happy holidays! Thank you for the feedback! I completely understand your concern about FilterExec.
If i makes sense right, i've updated the implementation to address the performance issue you mentioned. Now FilterExec::try_pushdown_sort only pushes down the sort when the child node already has an output ordering:

fn try_pushdown_sort(
        &self,
        order: &[PhysicalSortExpr],
    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
        if let Some(_child_ordering) = self.input.output_ordering() {
            match self.input.try_pushdown_sort(order)? {
                SortOrderPushdownResult::Exact { inner } => {
                    let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
                    Ok(SortOrderPushdownResult::Exact { inner: new_exec })
                }
                SortOrderPushdownResult::Inexact { inner } => {
                    let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
                    Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
                }
                SortOrderPushdownResult::Unsupported => {
                    Ok(SortOrderPushdownResult::Unsupported)
                }
            }
        } else {
            Ok(SortOrderPushdownResult::Unsupported)
        }
    }

This way:

* When the data source can optimize to use pushdown sort, so we pushdown to leverage the existing ordering.

* When the data source is unsorted, we don't pushdown, so sorting happens after filtering on the smaller filtered dataset

This should avoid the performance degradation you were concerned about while still enabling the optimization for sorted data sources. The new tests validate this with the WHERE timeframe = 'quarterly' cases, which require pushdown through FilterExec to utilize the sorted Parquet file's ordering.
What do you think about this approach?

That's a super interesting and smart approach! It absolutely makes sense and nukes the problem for cases where the sort orders match (i.e. you basically know the sort will be eliminated). But I feel for those cases EnforceSorting optimizer should already handle. For other cases it's still not clear if it always makes sense. Consider a DataSource that does accept sort pushdown (at a cost e.g. buffering in memory) but doesn't accept filter pushdown. How do we know when it makes sense to push down the sort?

This issue of not pushing things down past filters seems to have come up for both projection and sort pushdown. In both cases we are in a situation of "always push it down into the DataSource if possible, otherwise it depends on costs and we don't have a cost based evaluator so hard to say". I do think it's worth asking ourselves the question: why are there some FilterExecs left? Could we re-order the optimizer rules so that the filters always get pushed down first? Do we need to implement filter pushdown for more operators? Maybe you could share some of the scenarios / queries where the FilterExec is preventing sort pushdown?

I still think our best bet would be to remove that once piece from this PR so that we can merge the rest then we can continue to discuss the FilterExec case.

Thank you @adriangb , i agree, i will remove filterexec in this PR, and we can discuss and investigate more,
and the i am also confused why the filterexec still be there for the following new slt testing, the pushdown filter is:
pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)]

And the filterexec still not removed, i will investigate this also.

# Test 2.1: Query with constant prefix filter and DESC on remaining column
# WHERE timeframe='quarterly' makes the first sort column constant
# ORDER BY period_end DESC should trigger reverse scan because:
# File ordering: [timeframe ASC, period_end ASC]
# After filtering timeframe='quarterly': effectively [period_end ASC]
# Request: [period_end DESC] -> exact reverse!
query TT
EXPLAIN SELECT * FROM timeseries_parquet
WHERE timeframe = 'quarterly'
ORDER BY period_end DESC
LIMIT 2;
----
logical_plan
01)Sort: timeseries_parquet.period_end DESC NULLS FIRST, fetch=2
02)--Filter: timeseries_parquet.timeframe = Utf8View("quarterly")
03)----TableScan: timeseries_parquet projection=[timeframe, period_end, value], partial_filters=[timeseries_parquet.timeframe = Utf8View("quarterly")]
physical_plan
01)SortPreservingMergeExec: [period_end@1 DESC], fetch=2
02)--SortExec: TopK(fetch=2), expr=[period_end@1 DESC], preserve_partitioning=[true]
03)----FilterExec: timeframe@0 = quarterly
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], file_type=parquet, predicate=timeframe@0 = quarterly AND DynamicFilter [ empty ], reverse_row_groups=true, pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)]

@adriangb
Copy link
Contributor

adriangb commented Dec 25, 2025

It seems like that case is just because filter pushdown is not enabled (i.e. #19477). So I think the behavior is correct in this case: we would not want to push the ordering down past the filter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Redesign the try_reverse_output to support more cases

2 participants