Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 84 additions & 12 deletions datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use std::any::Any;
use std::sync::Arc;

use crate::memory::MemoryStream;
use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
use crate::{
execution_plan::{Boundedness, EmissionType},
DisplayFormatType, ExecutionPlan, Partitioning,
};
use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand All @@ -34,6 +34,7 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;

use crate::execution_plan::SchedulingType;
use datafusion_common::stats::Precision;
use log::trace;

/// Execution plan for empty relation with produce_one_row=false
Expand Down Expand Up @@ -97,8 +98,11 @@ impl DisplayAs for EmptyExec {
write!(f, "EmptyExec")
}
DisplayFormatType::TreeRender => {
// TODO: collect info
write!(f, "")
write!(
f,
"EmptyExec: schema={}, partitions={}",
self.schema, self.partitions
)
}
}
}
Expand Down Expand Up @@ -165,23 +169,33 @@ impl ExecutionPlan for EmptyExec {
);
}
}
// Build explicit stats: exact zero rows and bytes, with explicit known column stats
let mut stats = Statistics::default()
.with_num_rows(Precision::Exact(0))
.with_total_byte_size(Precision::Exact(0));

// Add explicit column stats for each field in schema
for _field in self.schema.fields() {
stats =
stats.add_column_statistics(datafusion_common::stats::ColumnStatistics {
null_count: Precision::Exact(0),
distinct_count: Precision::Exact(0),
min_value: Precision::<datafusion_common::ScalarValue>::Absent,
max_value: Precision::<datafusion_common::ScalarValue>::Absent,
sum_value: Precision::<datafusion_common::ScalarValue>::Absent,
});
}

let batch = self
.data()
.expect("Create empty RecordBatch should not fail");
Ok(common::compute_record_batch_statistics(
&[batch],
&self.schema,
None,
))
Ok(stats)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test;
use crate::with_new_children_if_necessary;
use crate::{common, test};
use arrow_schema::Schema;

#[tokio::test]
async fn empty() -> Result<()> {
Expand Down Expand Up @@ -229,4 +243,62 @@ mod tests {
assert!(empty.execute(20, task_ctx).is_err());
Ok(())
}

#[test]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can find the partition statistics in the dedicated file: https://github.com/apache/datafusion/blob/main/datafusion/core/tests/physical_optimizer/partition_statistics.rs.

It also contains the real exection to check the results

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah well, thank you & apologies for oversight.
Let me run integration tests and align column stats for values instead of Absent.

fn empty_partition_statistics_explicit_zero() -> Result<()> {
let schema: Arc<Schema> = Arc::new(Schema::empty());
let exec1: EmptyExec = EmptyExec::new(Arc::clone(&schema));
// default partition = 1

// global stats
let stats_all_1: Statistics = exec1.partition_statistics(None)?;
assert_eq!(stats_all_1.num_rows, Precision::Exact(0));
assert_eq!(stats_all_1.total_byte_size, Precision::Exact(0));
assert_eq!(stats_all_1.column_statistics.len(), schema.fields().len());
for col_stats in &stats_all_1.column_statistics {
assert_eq!(col_stats.null_count, Precision::Exact(0));
assert_eq!(col_stats.distinct_count, Precision::Exact(0));
assert_eq!(col_stats.min_value, Precision::Absent);
assert_eq!(col_stats.max_value, Precision::Absent);
assert_eq!(col_stats.sum_value, Precision::Absent);
}

// partition 0
let stats0_1: Statistics = exec1.partition_statistics(Some(0))?;
assert_eq!(stats0_1.num_rows, Precision::Exact(0));
assert_eq!(stats0_1.total_byte_size, Precision::Exact(0));
assert_eq!(stats0_1.column_statistics.len(), schema.fields().len());
for col_stats in &stats0_1.column_statistics {
assert_eq!(col_stats.null_count, Precision::Exact(0));
assert_eq!(col_stats.distinct_count, Precision::Exact(0));
assert_eq!(col_stats.min_value, Precision::Absent);
assert_eq!(col_stats.max_value, Precision::Absent);
assert_eq!(col_stats.sum_value, Precision::Absent);
}

// invalid partition for default
assert!(exec1.partition_statistics(Some(1)).is_err());

// Now with 2 partitions
let exec2: EmptyExec = EmptyExec::new(Arc::clone(&schema)).with_partitions(2);

// valid partitions 0 and 1
for part in 0..2 {
let stats = exec2.partition_statistics(Some(part))?;
assert_eq!(stats.num_rows, Precision::Exact(0));
assert_eq!(stats.total_byte_size, Precision::Exact(0));
assert_eq!(stats.column_statistics.len(), schema.fields().len());
for col_stats in &stats.column_statistics {
assert_eq!(col_stats.null_count, Precision::Exact(0));
assert_eq!(col_stats.distinct_count, Precision::Exact(0));
assert_eq!(col_stats.min_value, Precision::Absent);
assert_eq!(col_stats.max_value, Precision::Absent);
assert_eq!(col_stats.sum_value, Precision::Absent);
}
}

// invalid partition 2
assert!(exec2.partition_statistics(Some(2)).is_err());
Ok(())
}
}