diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 15d3261ca5132..1cbac023a4ec2 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -71,3 +71,7 @@ insta = { workspace = true } [[bench]] name = "projection_unnecessary" harness = false + +[[bench]] +name = "required_indices" +harness = false diff --git a/datafusion/optimizer/benches/required_indices.rs b/datafusion/optimizer/benches/required_indices.rs new file mode 100644 index 0000000000000..8ad20b0fef55d --- /dev/null +++ b/datafusion/optimizer/benches/required_indices.rs @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Microbench comparing old vs new required index collection paths. + +use std::hint::black_box; +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, Schema}; +use criterion::{Criterion, criterion_group, criterion_main}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion_common::{Column, DFSchemaRef, Result, TableReference, ToDFSchema}; +use datafusion_expr::expr::Exists; +use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder, Subquery}; +use datafusion_expr::{Expr, col, lit}; + +fn collect_old(plan: &LogicalPlan, schema: &DFSchemaRef) -> Result> { + let mut indices = Vec::new(); + plan.apply_expressions(|expr| { + let mut cols: std::collections::HashSet<&Column> = expr.column_refs(); + outer_columns(expr, &mut cols); + for col in cols.iter() { + if let Some(idx) = schema.maybe_index_of_column(col) { + indices.push(idx); + } + } + Ok(TreeNodeRecursion::Continue) + })?; + indices.sort_unstable(); + indices.dedup(); + Ok(indices) +} + +fn collect_new(plan: &LogicalPlan, schema: &DFSchemaRef) -> Result> { + let mut indices = Vec::new(); + plan.apply_expressions(|expr| { + collect_expr_indices(&mut indices, schema, expr); + Ok(TreeNodeRecursion::Continue) + })?; + indices.sort_unstable(); + indices.dedup(); + Ok(indices) +} + +fn bench_required_indices(c: &mut Criterion) { + let (wide_plan, wide_schema) = wide_projection_plan(200).unwrap(); + let (outer_plan, outer_schema) = outer_ref_plan().unwrap(); + + let mut group = c.benchmark_group("required_indices"); + + group.bench_function("new_wide", |b| { + b.iter(|| black_box(collect_new(&wide_plan, &wide_schema).unwrap())) + }); + group.bench_function("old_wide", |b| { + b.iter(|| black_box(collect_old(&wide_plan, &wide_schema).unwrap())) + }); + group.bench_function("new_outer_ref", |b| { + b.iter(|| black_box(collect_new(&outer_plan, &outer_schema).unwrap())) + }); + group.bench_function("old_outer_ref", |b| { + b.iter(|| black_box(collect_old(&outer_plan, &outer_schema).unwrap())) + }); + + group.finish(); +} + +fn wide_projection_plan(num_exprs: usize) -> Result<(LogicalPlan, DFSchemaRef)> { + let fields: Vec = (0..num_exprs) + .map(|i| Field::new(format!("col{i}"), DataType::Int32, false)) + .collect(); + let schema = Schema::new(fields); + let df_schema = Arc::new(schema.to_dfschema()?); + + let exprs: Vec = (0..num_exprs) + .map(|i| col(format!("col{i}")) + lit(i as i32)) + .collect(); + + let base = LogicalPlan::EmptyRelation(datafusion_expr::EmptyRelation { + produce_one_row: true, + schema: Arc::clone(&df_schema), + }); + + let plan = LogicalPlanBuilder::from(base).project(exprs)?.build()?; + + Ok((plan, df_schema)) +} + +fn outer_ref_plan() -> Result<(LogicalPlan, DFSchemaRef)> { + let fields: Vec = (0..10) + .map(|i| Field::new(format!("col{i}"), DataType::Int32, false)) + .collect(); + let schema = Schema::new(fields); + let df_schema = Arc::new(schema.to_dfschema()?); + + let outer_col = Column::new(None::, "col0"); + + let subquery_input = LogicalPlan::EmptyRelation(datafusion_expr::EmptyRelation { + produce_one_row: true, + schema: Arc::clone(&df_schema), + }); + let subquery_plan = LogicalPlanBuilder::from(subquery_input) + .filter(Expr::Column(outer_col.clone()).eq(lit(1)))? + .build()?; + + let exists_expr = Expr::Exists(Exists { + subquery: Subquery { + subquery: Arc::new(subquery_plan), + outer_ref_columns: vec![Expr::Column(outer_col.clone())], + spans: Default::default(), + }, + negated: false, + }); + + let base = LogicalPlan::EmptyRelation(datafusion_expr::EmptyRelation { + produce_one_row: true, + schema: Arc::clone(&df_schema), + }); + let plan = LogicalPlanBuilder::from(base) + .project(vec![Expr::Column(outer_col.clone())])? + .filter(exists_expr)? + .build()?; + + Ok((plan, df_schema)) +} + +fn collect_expr_indices(indices: &mut Vec, schema: &DFSchemaRef, expr: &Expr) { + expr.apply(|expr| { + match expr { + Expr::Column(col) | Expr::OuterReferenceColumn(_, col) => { + push_column_index(indices, schema, col); + } + Expr::ScalarSubquery(subquery) => { + collect_outer_ref_exprs(indices, schema, &subquery.outer_ref_columns); + } + Expr::Exists(exists) => { + collect_outer_ref_exprs( + indices, + schema, + &exists.subquery.outer_ref_columns, + ); + } + Expr::InSubquery(insubquery) => { + collect_outer_ref_exprs( + indices, + schema, + &insubquery.subquery.outer_ref_columns, + ); + } + _ => {} + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("traversal should not fail"); +} + +fn collect_outer_ref_exprs( + indices: &mut Vec, + schema: &DFSchemaRef, + exprs: &[Expr], +) { + exprs.iter().for_each(|outer_expr| { + outer_expr + .apply(|expr| { + match expr { + Expr::Column(col) | Expr::OuterReferenceColumn(_, col) => { + push_column_index(indices, schema, col); + } + Expr::ScalarSubquery(subquery) => collect_outer_ref_exprs( + indices, + schema, + &subquery.outer_ref_columns, + ), + Expr::Exists(exists) => collect_outer_ref_exprs( + indices, + schema, + &exists.subquery.outer_ref_columns, + ), + Expr::InSubquery(insubquery) => collect_outer_ref_exprs( + indices, + schema, + &insubquery.subquery.outer_ref_columns, + ), + _ => {} + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("outer reference traversal should not fail"); + }); +} + +fn push_column_index(indices: &mut Vec, schema: &DFSchemaRef, col: &Column) { + if let Some(idx) = schema.maybe_index_of_column(col) { + indices.push(idx); + } +} + +fn outer_columns<'a>( + expr: &'a Expr, + columns: &mut std::collections::HashSet<&'a Column>, +) { + expr.apply(|expr| { + match expr { + Expr::OuterReferenceColumn(_, col) => { + columns.insert(col); + } + Expr::ScalarSubquery(subquery) => { + outer_columns_helper_multi(&subquery.outer_ref_columns, columns); + } + Expr::Exists(exists) => { + outer_columns_helper_multi(&exists.subquery.outer_ref_columns, columns); + } + Expr::InSubquery(insubquery) => { + outer_columns_helper_multi( + &insubquery.subquery.outer_ref_columns, + columns, + ); + } + _ => {} + }; + Ok(TreeNodeRecursion::Continue) + }) + .expect("outer reference traversal should not fail"); +} + +fn outer_columns_helper_multi<'a, 'b>( + exprs: impl IntoIterator, + columns: &'b mut std::collections::HashSet<&'a Column>, +) { + exprs.into_iter().for_each(|e| outer_columns(e, columns)); +} + +criterion_group!(benches, bench_required_indices); +criterion_main!(benches); diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 548eadffa242e..3c4d6782ae264 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -21,7 +21,6 @@ mod required_indices; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use std::collections::HashSet; use std::sync::Arc; use datafusion_common::{ @@ -675,56 +674,6 @@ fn rewrite_expr(expr: Expr, input: &Projection) -> Result> { }) } -/// Accumulates outer-referenced columns by the -/// given expression, `expr`. -/// -/// # Parameters -/// -/// * `expr` - The expression to analyze for outer-referenced columns. -/// * `columns` - A mutable reference to a `HashSet` where detected -/// columns are collected. -fn outer_columns<'a>(expr: &'a Expr, columns: &mut HashSet<&'a Column>) { - // inspect_expr_pre doesn't handle subquery references, so find them explicitly - expr.apply(|expr| { - match expr { - Expr::OuterReferenceColumn(_, col) => { - columns.insert(col); - } - Expr::ScalarSubquery(subquery) => { - outer_columns_helper_multi(&subquery.outer_ref_columns, columns); - } - Expr::Exists(exists) => { - outer_columns_helper_multi(&exists.subquery.outer_ref_columns, columns); - } - Expr::InSubquery(insubquery) => { - outer_columns_helper_multi( - &insubquery.subquery.outer_ref_columns, - columns, - ); - } - _ => {} - }; - Ok(TreeNodeRecursion::Continue) - }) - // unwrap: closure above never returns Err, so can not be Err here - .unwrap(); -} - -/// A recursive subroutine that accumulates outer-referenced columns by the -/// given expressions (`exprs`). -/// -/// # Parameters -/// -/// * `exprs` - The expressions to analyze for outer-referenced columns. -/// * `columns` - A mutable reference to a `HashSet` where detected -/// columns are collected. -fn outer_columns_helper_multi<'a, 'b>( - exprs: impl IntoIterator, - columns: &'b mut HashSet<&'a Column>, -) { - exprs.into_iter().for_each(|e| outer_columns(e, columns)); -} - /// Splits requirement indices for a join into left and right children based on /// the join type. /// diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index c1e0885c9b5f2..3449da4b1761b 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -17,8 +17,7 @@ //! [`RequiredIndices`] helper for OptimizeProjection -use crate::optimize_projections::outer_columns; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{Column, DFSchemaRef, Result}; use datafusion_expr::{Expr, LogicalPlan}; @@ -112,20 +111,36 @@ impl RequiredIndices { /// * `input_schema`: The input schema to analyze for index requirements. /// * `expr`: An expression for which we want to find necessary field indices. fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) { - // TODO could remove these clones (and visit the expression directly) - let mut cols = expr.column_refs(); - // Get outer-referenced (subquery) columns: - outer_columns(expr, &mut cols); - self.indices.reserve(cols.len()); - for col in cols { - if let Some(idx) = input_schema.maybe_index_of_column(col) { - self.indices.push(idx); + expr.apply(|expr| { + match expr { + Expr::Column(col) | Expr::OuterReferenceColumn(_, col) => { + push_column_index(&mut self.indices, input_schema, col); + } + Expr::ScalarSubquery(subquery) => collect_outer_ref_exprs( + &mut self.indices, + input_schema, + &subquery.outer_ref_columns, + ), + Expr::Exists(exists) => collect_outer_ref_exprs( + &mut self.indices, + input_schema, + &exists.subquery.outer_ref_columns, + ), + Expr::InSubquery(insubquery) => collect_outer_ref_exprs( + &mut self.indices, + input_schema, + &insubquery.subquery.outer_ref_columns, + ), + _ => {} } - } + Ok(TreeNodeRecursion::Continue) + }) + // traversal above is infallible + .expect("traversal should not fail"); } /// Adds the indices of the fields referred to by the given expressions - /// `within the given schema. + /// within the given schema. /// /// # Parameters /// @@ -224,3 +239,45 @@ impl RequiredIndices { self } } + +fn collect_outer_ref_exprs( + indices: &mut Vec, + input_schema: &DFSchemaRef, + exprs: &[Expr], +) { + exprs.iter().for_each(|outer_expr| { + outer_expr + .apply(|expr| { + match expr { + Expr::Column(col) | Expr::OuterReferenceColumn(_, col) => { + push_column_index(indices, input_schema, col); + } + Expr::ScalarSubquery(subquery) => collect_outer_ref_exprs( + indices, + input_schema, + &subquery.outer_ref_columns, + ), + Expr::Exists(exists) => collect_outer_ref_exprs( + indices, + input_schema, + &exists.subquery.outer_ref_columns, + ), + Expr::InSubquery(insubquery) => collect_outer_ref_exprs( + indices, + input_schema, + &insubquery.subquery.outer_ref_columns, + ), + _ => {} + } + Ok(TreeNodeRecursion::Continue) + }) + // traversal above is infallible + .expect("outer reference traversal should not fail"); + }); +} + +fn push_column_index(indices: &mut Vec, input_schema: &DFSchemaRef, col: &Column) { + if let Some(idx) = input_schema.maybe_index_of_column(col) { + indices.push(idx); + } +}