From 45f710baf8a837eba94e4dae0f95ccb2eb5539fb Mon Sep 17 00:00:00 2001 From: Vitthal Mirji Date: Mon, 28 Jul 2025 12:47:23 +0530 Subject: [PATCH 1/4] init: Implementing partition_statistics for EmptyExec --- datafusion/physical-plan/src/empty.rs | 63 ++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 40b4ec61dc102..6638f32bb8524 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -21,11 +21,11 @@ use std::any::Any; use std::sync::Arc; use crate::memory::MemoryStream; -use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; use crate::{ execution_plan::{Boundedness, EmissionType}, DisplayFormatType, ExecutionPlan, Partitioning, }; +use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -34,6 +34,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; use crate::execution_plan::SchedulingType; +use datafusion_common::stats::Precision; use log::trace; /// Execution plan for empty relation with produce_one_row=false @@ -165,23 +166,28 @@ impl ExecutionPlan for EmptyExec { ); } } + // Build explicit stats: exact zero rows and bytes, with unknown columns + let mut stats = Statistics::default() + .with_num_rows(Precision::Exact(0)) + .with_total_byte_size(Precision::Exact(0)); + + // Add unknown column stats for each field in schema + for _field in self.schema.fields() { + stats = stats.add_column_statistics( + datafusion_common::stats::ColumnStatistics::new_unknown(), + ); + } - let batch = self - .data() - .expect("Create empty RecordBatch should not fail"); - Ok(common::compute_record_batch_statistics( - &[batch], - &self.schema, - None, - )) + Ok(stats) } } #[cfg(test)] mod tests { use super::*; - use crate::test; use crate::with_new_children_if_necessary; + use crate::{common, test}; + use arrow_schema::Schema; #[tokio::test] async fn empty() -> Result<()> { @@ -229,4 +235,41 @@ mod tests { assert!(empty.execute(20, task_ctx).is_err()); Ok(()) } + + #[test] + fn empty_partition_statistics_explicit_zero() -> Result<()> { + let schema: Arc = Arc::new(Schema::empty()); + let exec1: EmptyExec = EmptyExec::new(schema.clone()); + // default partition = 1 + + // global stats + let stats_all_1: Statistics = exec1.partition_statistics(None)?; + assert_eq!(stats_all_1.num_rows, Precision::Exact(0)); + assert_eq!(stats_all_1.total_byte_size, Precision::Exact(0)); + assert_eq!(stats_all_1.column_statistics.len(), schema.fields().len()); + + // partition 0 + let stats0_1: Statistics = exec1.partition_statistics(Some(0))?; + assert_eq!(stats0_1.num_rows, Precision::Exact(0)); + assert_eq!(stats0_1.total_byte_size, Precision::Exact(0)); + assert_eq!(stats0_1.column_statistics.len(), schema.fields().len()); + + // invalid partition for default + assert!(exec1.partition_statistics(Some(1)).is_err()); + + // Now with 2 partitions + let exec2: EmptyExec = EmptyExec::new(schema.clone()).with_partitions(2); + + // valid partitions 0 and 1 + for part in 0..2 { + let stats = exec2.partition_statistics(Some(part))?; + assert_eq!(stats.num_rows, Precision::Exact(0)); + assert_eq!(stats.total_byte_size, Precision::Exact(0)); + assert_eq!(stats.column_statistics.len(), schema.fields().len()); + } + + // invalid partition 2 + assert!(exec2.partition_statistics(Some(2)).is_err()); + Ok(()) + } } From 45c6a0c660d7534ab75ca4cdb2e8de8b393a2b93 Mon Sep 17 00:00:00 2001 From: Vitthal Mirji Date: Mon, 28 Jul 2025 13:14:57 +0530 Subject: [PATCH 2/4] Added a `TreeRender` branch to `DisplayAs` to provide informative display of the operator: `"EmptyExec: partitions=X, fields=Y"` --- datafusion/physical-plan/src/empty.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 6638f32bb8524..4f7f4e22b23a5 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -98,8 +98,7 @@ impl DisplayAs for EmptyExec { write!(f, "EmptyExec") } DisplayFormatType::TreeRender => { - // TODO: collect info - write!(f, "") + write!(f, "EmptyExec: schema={}, partitions={}", self.schema, self.partitions) } } } From 42409f0909ffab8a321e3bc65cca02a49e372904 Mon Sep 17 00:00:00 2001 From: Vitthal Mirji Date: Mon, 28 Jul 2025 13:17:35 +0530 Subject: [PATCH 3/4] Added a `TreeRender` branch to `DisplayAs` to provide informative display of the operator: `"EmptyExec: partitions=X, fields=Y"` --- datafusion/physical-plan/src/empty.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 4f7f4e22b23a5..5287876aa2692 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -98,7 +98,11 @@ impl DisplayAs for EmptyExec { write!(f, "EmptyExec") } DisplayFormatType::TreeRender => { - write!(f, "EmptyExec: schema={}, partitions={}", self.schema, self.partitions) + write!( + f, + "EmptyExec: schema={}, partitions={}", + self.schema, self.partitions + ) } } } From 855a115d0cec1671646fd4aca81a28549b9c24c2 Mon Sep 17 00:00:00 2001 From: Vitthal Mirji Date: Mon, 28 Jul 2025 14:18:31 +0530 Subject: [PATCH 4/4] lint making it clear cloning the Arc, not the inner data --- datafusion/physical-plan/src/empty.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 5287876aa2692..ae9957343b267 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -242,7 +242,7 @@ mod tests { #[test] fn empty_partition_statistics_explicit_zero() -> Result<()> { let schema: Arc = Arc::new(Schema::empty()); - let exec1: EmptyExec = EmptyExec::new(schema.clone()); + let exec1: EmptyExec = EmptyExec::new(Arc::clone(&schema)); // default partition = 1 // global stats @@ -261,7 +261,7 @@ mod tests { assert!(exec1.partition_statistics(Some(1)).is_err()); // Now with 2 partitions - let exec2: EmptyExec = EmptyExec::new(schema.clone()).with_partitions(2); + let exec2: EmptyExec = EmptyExec::new(Arc::clone(&schema)).with_partitions(2); // valid partitions 0 and 1 for part in 0..2 {