Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,12 @@ config_namespace! {
/// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
pub enable_dynamic_filter_pushdown: bool, default = true

/// When set to true, the physical planner will use the ExpressionAnalyzer
/// framework for expression-level statistics estimation (NDV, selectivity,
/// min/max, null fraction). When false, existing behavior without
/// expression-level statistics support is used.
pub enable_expression_analyzer: bool, default = false

/// When set to true, the optimizer will insert filters before a join between
/// a nullable and non-nullable column to filter out nulls on the nullable side. This
/// filter can add additional overhead when the file format does not fully support
Expand Down
38 changes: 38 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use datafusion_optimizer::{
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule,
};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
Expand Down Expand Up @@ -184,6 +185,8 @@ pub struct SessionState {
///
/// It will be invoked on `CREATE FUNCTION` statements.
/// thus, changing dialect o PostgreSql is required
/// Registry for expression-level statistics analyzers (NDV, selectivity, etc.)
expression_analyzer_registry: Arc<ExpressionAnalyzerRegistry>,
function_factory: Option<Arc<dyn FunctionFactory>>,
cache_factory: Option<Arc<dyn CacheFactory>>,
/// Cache logical plans of prepared statements for later execution.
Expand All @@ -202,6 +205,10 @@ impl Debug for SessionState {
.field("runtime_env", &self.runtime_env)
.field("catalog_list", &self.catalog_list)
.field("serializer_registry", &self.serializer_registry)
.field(
"expression_analyzer_registry",
&self.expression_analyzer_registry,
)
.field("file_formats", &self.file_formats)
.field("execution_props", &self.execution_props)
.field("table_options", &self.table_options)
Expand Down Expand Up @@ -909,6 +916,11 @@ impl SessionState {
&self.serializer_registry
}

/// Return the [`ExpressionAnalyzerRegistry`] for expression-level statistics
pub fn expression_analyzer_registry(&self) -> &Arc<ExpressionAnalyzerRegistry> {
&self.expression_analyzer_registry
}

/// Return version of the cargo package that produced this query
pub fn version(&self) -> &str {
env!("CARGO_PKG_VERSION")
Expand Down Expand Up @@ -988,6 +1000,7 @@ pub struct SessionStateBuilder {
aggregate_functions: Option<Vec<Arc<AggregateUDF>>>,
window_functions: Option<Vec<Arc<WindowUDF>>>,
serializer_registry: Option<Arc<dyn SerializerRegistry>>,
expression_analyzer_registry: Option<Arc<ExpressionAnalyzerRegistry>>,
file_formats: Option<Vec<Arc<dyn FileFormatFactory>>>,
config: Option<SessionConfig>,
table_options: Option<TableOptions>,
Expand Down Expand Up @@ -1028,6 +1041,7 @@ impl SessionStateBuilder {
aggregate_functions: None,
window_functions: None,
serializer_registry: None,
expression_analyzer_registry: None,
file_formats: None,
table_options: None,
config: None,
Expand Down Expand Up @@ -1083,6 +1097,7 @@ impl SessionStateBuilder {
),
window_functions: Some(existing.window_functions.into_values().collect_vec()),
serializer_registry: Some(existing.serializer_registry),
expression_analyzer_registry: Some(existing.expression_analyzer_registry),
file_formats: Some(existing.file_formats.into_values().collect_vec()),
config: Some(new_config),
table_options: Some(existing.table_options),
Expand Down Expand Up @@ -1326,6 +1341,15 @@ impl SessionStateBuilder {
self
}

/// Set the [`ExpressionAnalyzerRegistry`] for expression-level statistics
pub fn with_expression_analyzer_registry(
mut self,
expression_analyzer_registry: Arc<ExpressionAnalyzerRegistry>,
) -> Self {
self.expression_analyzer_registry = Some(expression_analyzer_registry);
self
}

/// Set the map of [`FileFormatFactory`]s
pub fn with_file_formats(
mut self,
Expand Down Expand Up @@ -1456,6 +1480,7 @@ impl SessionStateBuilder {
aggregate_functions,
window_functions,
serializer_registry,
expression_analyzer_registry,
file_formats,
table_options,
config,
Expand Down Expand Up @@ -1493,6 +1518,8 @@ impl SessionStateBuilder {
window_functions: HashMap::new(),
serializer_registry: serializer_registry
.unwrap_or_else(|| Arc::new(EmptySerializerRegistry)),
expression_analyzer_registry: expression_analyzer_registry
.unwrap_or_else(|| Arc::new(ExpressionAnalyzerRegistry::new())),
file_formats: HashMap::new(),
table_options: table_options.unwrap_or_else(|| {
TableOptions::default_from_session_config(config.options())
Expand Down Expand Up @@ -1675,6 +1702,13 @@ impl SessionStateBuilder {
&mut self.serializer_registry
}

/// Returns the current expression_analyzer_registry value
pub fn expression_analyzer_registry(
&mut self,
) -> &mut Option<Arc<ExpressionAnalyzerRegistry>> {
&mut self.expression_analyzer_registry
}

/// Returns the current file_formats value
pub fn file_formats(&mut self) -> &mut Option<Vec<Arc<dyn FileFormatFactory>>> {
&mut self.file_formats
Expand Down Expand Up @@ -1750,6 +1784,10 @@ impl Debug for SessionStateBuilder {
.field("runtime_env", &self.runtime_env)
.field("catalog_list", &self.catalog_list)
.field("serializer_registry", &self.serializer_registry)
.field(
"expression_analyzer_registry",
&self.expression_analyzer_registry,
)
.field("file_formats", &self.file_formats)
.field("execution_props", &self.execution_props)
.field("table_options", &self.table_options)
Expand Down
57 changes: 49 additions & 8 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1106,12 +1106,23 @@ impl DefaultPhysicalPlanner {
input_schema.as_arrow(),
)? {
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
FilterExecBuilder::new(
let builder = FilterExecBuilder::new(
Arc::clone(&runtime_expr[0]),
physical_input,
)
.with_batch_size(session_state.config().batch_size())
.build()?
.with_batch_size(session_state.config().batch_size());
let builder = if session_state
.config_options()
.optimizer
.enable_expression_analyzer
{
builder.with_expression_analyzer_registry(Arc::clone(
session_state.expression_analyzer_registry(),
))
} else {
builder
};
builder.build()?
}
PlanAsyncExpr::Async(
async_map,
Expand All @@ -1121,7 +1132,7 @@ impl DefaultPhysicalPlanner {
async_map.async_exprs,
physical_input,
)?;
FilterExecBuilder::new(
let builder = FilterExecBuilder::new(
Arc::clone(&runtime_expr[0]),
Arc::new(async_exec),
)
Expand All @@ -1130,8 +1141,19 @@ impl DefaultPhysicalPlanner {
.apply_projection(Some(
(0..input.schema().fields().len()).collect::<Vec<_>>(),
))?
.with_batch_size(session_state.config().batch_size())
.build()?
.with_batch_size(session_state.config().batch_size());
let builder = if session_state
.config_options()
.optimizer
.enable_expression_analyzer
{
builder.with_expression_analyzer_registry(Arc::clone(
session_state.expression_analyzer_registry(),
))
} else {
builder
};
builder.build()?
}
_ => {
return internal_err!(
Expand Down Expand Up @@ -2892,7 +2914,17 @@ impl DefaultPhysicalPlanner {
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();
Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?))
let mut proj_exec = ProjectionExec::try_new(proj_exprs, input_exec)?;
if session_state
.config_options()
.optimizer
.enable_expression_analyzer
{
proj_exec = proj_exec.with_expression_analyzer_registry(Arc::clone(
session_state.expression_analyzer_registry(),
));
}
Ok(Arc::new(proj_exec))
}
PlanAsyncExpr::Async(
async_map,
Expand All @@ -2904,8 +2936,17 @@ impl DefaultPhysicalPlanner {
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();
let new_proj_exec =
let mut new_proj_exec =
ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?;
if session_state
.config_options()
.optimizer
.enable_expression_analyzer
{
new_proj_exec = new_proj_exec.with_expression_analyzer_registry(
Arc::clone(session_state.expression_analyzer_registry()),
);
}
Ok(Arc::new(new_proj_exec))
}
_ => internal_err!("Unexpected PlanAsyncExpressions variant"),
Expand Down
Loading
Loading