Skip to content

Commit f4cd5b0

Browse files
committed
Add required_indices bench comparing old vs new
1 parent ba87d0c commit f4cd5b0

1 file changed

Lines changed: 234 additions & 0 deletions

File tree

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::datatypes::{DataType, Field, Schema};
19+
use criterion::{Criterion, criterion_group, criterion_main};
20+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
21+
use datafusion_common::{Column, DFSchemaRef, Result, TableReference, ToDFSchema};
22+
use datafusion_expr::expr::Exists;
23+
use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder, Subquery};
24+
use datafusion_expr::{Expr, col, lit};
25+
use std::hint::black_box;
26+
use std::sync::Arc;
27+
28+
fn collect_old(plan: &LogicalPlan, schema: &DFSchemaRef) -> Result<Vec<usize>> {
29+
let mut indices = Vec::new();
30+
plan.apply_expressions(|expr| {
31+
let mut cols: std::collections::HashSet<&Column> = expr.column_refs();
32+
outer_columns(expr, &mut cols);
33+
for col in cols.iter() {
34+
if let Some(idx) = schema.maybe_index_of_column(col) {
35+
indices.push(idx);
36+
}
37+
}
38+
Ok(TreeNodeRecursion::Continue)
39+
})?;
40+
indices.sort_unstable();
41+
indices.dedup();
42+
Ok(indices)
43+
}
44+
45+
fn collect_new(plan: &LogicalPlan, schema: &DFSchemaRef) -> Result<Vec<usize>> {
46+
let mut indices = Vec::new();
47+
plan.apply_expressions(|expr| {
48+
collect_expr_indices(&mut indices, schema, expr);
49+
Ok(TreeNodeRecursion::Continue)
50+
})?;
51+
indices.sort_unstable();
52+
indices.dedup();
53+
Ok(indices)
54+
}
55+
56+
fn bench_required_indices(c: &mut Criterion) {
57+
let (wide_plan, wide_schema) = wide_projection_plan(200).unwrap();
58+
let (outer_plan, outer_schema) = outer_ref_plan().unwrap();
59+
60+
let mut group = c.benchmark_group("required_indices");
61+
62+
group.bench_function("new_wide", |b| {
63+
b.iter(|| black_box(collect_new(&wide_plan, &wide_schema).unwrap()))
64+
});
65+
group.bench_function("old_wide", |b| {
66+
b.iter(|| black_box(collect_old(&wide_plan, &wide_schema).unwrap()))
67+
});
68+
group.bench_function("new_outer_ref", |b| {
69+
b.iter(|| black_box(collect_new(&outer_plan, &outer_schema).unwrap()))
70+
});
71+
group.bench_function("old_outer_ref", |b| {
72+
b.iter(|| black_box(collect_old(&outer_plan, &outer_schema).unwrap()))
73+
});
74+
75+
group.finish();
76+
}
77+
78+
fn wide_projection_plan(num_exprs: usize) -> Result<(LogicalPlan, DFSchemaRef)> {
79+
let fields: Vec<Field> = (0..num_exprs)
80+
.map(|i| Field::new(format!("c{i}"), DataType::Int32, false))
81+
.collect();
82+
let schema = Schema::new(fields);
83+
let df_schema = Arc::new(schema.to_dfschema()?);
84+
85+
let exprs: Vec<Expr> = (0..num_exprs)
86+
.map(|i| col(format!("c{i}")) + lit(i as i32))
87+
.collect();
88+
89+
let plan = LogicalPlanBuilder::empty(false).project(exprs)?.build()?;
90+
91+
Ok((plan, df_schema))
92+
}
93+
94+
fn outer_ref_plan() -> Result<(LogicalPlan, DFSchemaRef)> {
95+
// Base schema with a handful of columns
96+
let fields: Vec<Field> = (0..10)
97+
.map(|i| Field::new(format!("c{i}"), DataType::Int32, false))
98+
.collect();
99+
let schema = Schema::new(fields);
100+
let df_schema = Arc::new(schema.to_dfschema()?);
101+
102+
let outer_col = Column::new(None::<TableReference>, "c0");
103+
104+
// Subquery that references the outer column
105+
let subquery_plan = LogicalPlanBuilder::empty(false)
106+
.filter(Expr::Column(outer_col.clone()).eq(lit(1)))?
107+
.build()?;
108+
109+
let exists_expr = Expr::Exists(Exists {
110+
subquery: Subquery {
111+
subquery: Arc::new(subquery_plan),
112+
outer_ref_columns: vec![Expr::Column(outer_col.clone())],
113+
spans: Default::default(),
114+
},
115+
negated: false,
116+
});
117+
118+
let plan = LogicalPlanBuilder::empty(false)
119+
.project(vec![Expr::Column(outer_col.clone())])?
120+
.filter(exists_expr)?
121+
.build()?;
122+
123+
Ok((plan, df_schema))
124+
}
125+
126+
fn collect_expr_indices(indices: &mut Vec<usize>, schema: &DFSchemaRef, expr: &Expr) {
127+
expr.apply(|expr| {
128+
match expr {
129+
Expr::Column(col) | Expr::OuterReferenceColumn(_, col) => {
130+
push_column_index(indices, schema, col);
131+
}
132+
Expr::ScalarSubquery(subquery) => {
133+
collect_outer_ref_exprs(indices, schema, &subquery.outer_ref_columns);
134+
}
135+
Expr::Exists(exists) => {
136+
collect_outer_ref_exprs(
137+
indices,
138+
schema,
139+
&exists.subquery.outer_ref_columns,
140+
);
141+
}
142+
Expr::InSubquery(insubquery) => {
143+
collect_outer_ref_exprs(
144+
indices,
145+
schema,
146+
&insubquery.subquery.outer_ref_columns,
147+
);
148+
}
149+
_ => {}
150+
}
151+
Ok(TreeNodeRecursion::Continue)
152+
})
153+
.expect("traversal should not fail");
154+
}
155+
156+
fn collect_outer_ref_exprs(
157+
indices: &mut Vec<usize>,
158+
schema: &DFSchemaRef,
159+
exprs: &[Expr],
160+
) {
161+
exprs.iter().for_each(|outer_expr| {
162+
outer_expr
163+
.apply(|expr| {
164+
match expr {
165+
Expr::Column(col) | Expr::OuterReferenceColumn(_, col) => {
166+
push_column_index(indices, schema, col);
167+
}
168+
Expr::ScalarSubquery(subquery) => collect_outer_ref_exprs(
169+
indices,
170+
schema,
171+
&subquery.outer_ref_columns,
172+
),
173+
Expr::Exists(exists) => collect_outer_ref_exprs(
174+
indices,
175+
schema,
176+
&exists.subquery.outer_ref_columns,
177+
),
178+
Expr::InSubquery(insubquery) => collect_outer_ref_exprs(
179+
indices,
180+
schema,
181+
&insubquery.subquery.outer_ref_columns,
182+
),
183+
_ => {}
184+
}
185+
Ok(TreeNodeRecursion::Continue)
186+
})
187+
.expect("outer reference traversal should not fail");
188+
});
189+
}
190+
191+
fn push_column_index(indices: &mut Vec<usize>, schema: &DFSchemaRef, col: &Column) {
192+
if let Some(idx) = schema.maybe_index_of_column(col) {
193+
indices.push(idx);
194+
}
195+
}
196+
197+
// Minimal copy of the old helper to collect outer ref columns for the "old" path
198+
fn outer_columns<'a>(
199+
expr: &'a Expr,
200+
columns: &mut std::collections::HashSet<&'a Column>,
201+
) {
202+
expr.apply(|expr| {
203+
match expr {
204+
Expr::OuterReferenceColumn(_, col) => {
205+
columns.insert(col);
206+
}
207+
Expr::ScalarSubquery(subquery) => {
208+
outer_columns_helper_multi(&subquery.outer_ref_columns, columns);
209+
}
210+
Expr::Exists(exists) => {
211+
outer_columns_helper_multi(&exists.subquery.outer_ref_columns, columns);
212+
}
213+
Expr::InSubquery(insubquery) => {
214+
outer_columns_helper_multi(
215+
&insubquery.subquery.outer_ref_columns,
216+
columns,
217+
);
218+
}
219+
_ => {}
220+
};
221+
Ok(TreeNodeRecursion::Continue)
222+
})
223+
.unwrap();
224+
}
225+
226+
fn outer_columns_helper_multi<'a, 'b>(
227+
exprs: impl IntoIterator<Item = &'a Expr>,
228+
columns: &'b mut std::collections::HashSet<&'a Column>,
229+
) {
230+
exprs.into_iter().for_each(|e| outer_columns(e, columns));
231+
}
232+
233+
criterion_group!(benches, bench_required_indices);
234+
criterion_main!(benches);

0 commit comments

Comments
 (0)