@@ -52,6 +52,7 @@ use datafusion_physical_plan::aggregates::{
5252 AggregateExec , AggregateMode , PhysicalGroupBy ,
5353} ;
5454use datafusion_physical_plan:: coalesce_batches:: CoalesceBatchesExec ;
55+ use datafusion_physical_plan:: coalesce_partitions:: CoalescePartitionsExec ;
5556use datafusion_physical_plan:: execution_plan:: ExecutionPlan ;
5657use datafusion_physical_plan:: expressions:: col;
5758use datafusion_physical_plan:: filter:: FilterExec ;
@@ -3471,3 +3472,47 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> {
34713472
34723473 Ok ( ( ) )
34733474}
3475+
3476+ #[ test]
3477+ fn test_replace_order_preserving_variants_with_fetch ( ) -> Result < ( ) > {
3478+ // Create a base plan
3479+ let parquet_exec = parquet_exec ( ) ;
3480+
3481+ let sort_expr = PhysicalSortExpr {
3482+ expr : Arc :: new ( Column :: new ( "id" , 0 ) ) ,
3483+ options : SortOptions :: default ( ) ,
3484+ } ;
3485+
3486+ let ordering = LexOrdering :: new ( vec ! [ sort_expr] ) ;
3487+
3488+ // Create a SortPreservingMergeExec with fetch=5
3489+ let spm_exec = Arc :: new (
3490+ SortPreservingMergeExec :: new ( ordering, parquet_exec. clone ( ) ) . with_fetch ( Some ( 5 ) ) ,
3491+ ) ;
3492+
3493+ // Create distribution context
3494+ let dist_context = DistributionContext :: new (
3495+ spm_exec,
3496+ true ,
3497+ vec ! [ DistributionContext :: new( parquet_exec, false , vec![ ] ) ] ,
3498+ ) ;
3499+
3500+ // Apply the function
3501+ let result = replace_order_preserving_variants ( dist_context) ?;
3502+
3503+ // Verify the plan was transformed to CoalescePartitionsExec
3504+ result
3505+ . plan
3506+ . as_any ( )
3507+ . downcast_ref :: < CoalescePartitionsExec > ( )
3508+ . expect ( "Expected CoalescePartitionsExec" ) ;
3509+
3510+ // Verify fetch was preserved
3511+ assert_eq ! (
3512+ result. plan. fetch( ) ,
3513+ Some ( 5 ) ,
3514+ "Fetch value was not preserved after transformation"
3515+ ) ;
3516+
3517+ Ok ( ( ) )
3518+ }
0 commit comments