Skip to content

Commit a4bbb71

Browse files
committed
Wrap immutable plan parts into Arc
- Closes #19852 Improve performance of query planning and plan state re-set by making node clone cheap. - Store projection as `Option<Arc<[usize]>>` instead of `Option<Vec<usize>>` in `FilterExec`, `HashJoinExec`, `NestedLoopJoinExec`. - Store exprs as `Arc<[ProjectionExpr]>` instead of Vec in `ProjectionExprs`. - Store arced aggregation, filter, group by expressions within `AggregateExec`.
1 parent b80bf2c commit a4bbb71

File tree

22 files changed

+509
-290
lines changed

22 files changed

+509
-290
lines changed

datafusion/common/src/stats.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,13 @@ impl Statistics {
391391
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
392392
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
393393
/// "b"}`.
394-
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
395-
let Some(projection) = projection else {
394+
pub fn project(self, projection: Option<&impl AsRef<[usize]>>) -> Self {
395+
let projection = projection.map(AsRef::as_ref);
396+
self.project_impl(projection)
397+
}
398+
399+
fn project_impl(mut self, projection: Option<&[usize]>) -> Self {
400+
let Some(projection) = projection.map(AsRef::as_ref) else {
396401
return self;
397402
};
398403

@@ -410,7 +415,7 @@ impl Statistics {
410415
.map(Slot::Present)
411416
.collect();
412417

413-
for idx in projection {
418+
for idx in projection.iter() {
414419
let next_idx = self.column_statistics.len();
415420
let slot = std::mem::replace(
416421
columns.get_mut(*idx).expect("projection out of bounds"),
@@ -1066,7 +1071,7 @@ mod tests {
10661071

10671072
#[test]
10681073
fn test_project_none() {
1069-
let projection = None;
1074+
let projection: Option<Vec<usize>> = None;
10701075
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
10711076
assert_eq!(stats, make_stats(vec![10, 20, 30]));
10721077
}

datafusion/common/src/utils/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ use std::thread::available_parallelism;
7070
/// ```
7171
pub fn project_schema(
7272
schema: &SchemaRef,
73-
projection: Option<&Vec<usize>>,
73+
projection: Option<&impl AsRef<[usize]>>,
7474
) -> Result<SchemaRef> {
7575
let schema = match projection {
76-
Some(columns) => Arc::new(schema.project(columns)?),
76+
Some(columns) => Arc::new(schema.project(columns.as_ref())?),
7777
None => Arc::clone(schema),
7878
};
7979
Ok(schema)

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1007,7 +1007,7 @@ impl DefaultPhysicalPlanner {
10071007
// project the output columns excluding the async functions
10081008
// The async functions are always appended to the end of the schema.
10091009
.apply_projection(Some(
1010-
(0..input.schema().fields().len()).collect(),
1010+
(0..input.schema().fields().len()).collect::<Vec<_>>(),
10111011
))?
10121012
.with_batch_size(session_state.config().batch_size())
10131013
.build()?

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ async fn test_hash_join_swap_on_joins_with_projections(
762762
"ProjectionExec won't be added above if HashJoinExec contains embedded projection",
763763
);
764764

765-
assert_eq!(swapped_join.projection, Some(vec![0_usize]));
765+
assert_eq!(swapped_join.projection.as_deref().unwrap(), &[0_usize]);
766766
assert_eq!(swapped.schema().fields.len(), 1);
767767
assert_eq!(swapped.schema().fields[0].name(), "small_col");
768768
Ok(())

datafusion/physical-expr/src/projection.rs

Lines changed: 72 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use arrow::datatypes::{Field, Schema, SchemaRef};
2929
use datafusion_common::stats::{ColumnStatistics, Precision};
3030
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3131
use datafusion_common::{
32-
Result, ScalarValue, assert_or_internal_err, internal_datafusion_err, plan_err,
32+
Result, ScalarValue, Statistics, assert_or_internal_err, internal_datafusion_err,
33+
plan_err,
3334
};
3435

3536
use datafusion_physical_expr_common::metrics::ExecutionPlanMetricsSet;
@@ -125,7 +126,8 @@ impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
125126
/// indices.
126127
#[derive(Debug, Clone, PartialEq, Eq)]
127128
pub struct ProjectionExprs {
128-
exprs: Vec<ProjectionExpr>,
129+
/// [`Arc`] used for a cheap clone, which improves physical plan optimization performance.
130+
exprs: Arc<[ProjectionExpr]>,
129131
}
130132

131133
impl std::fmt::Display for ProjectionExprs {
@@ -137,22 +139,24 @@ impl std::fmt::Display for ProjectionExprs {
137139

138140
impl From<Vec<ProjectionExpr>> for ProjectionExprs {
139141
fn from(value: Vec<ProjectionExpr>) -> Self {
140-
Self { exprs: value }
142+
Self {
143+
exprs: value.into(),
144+
}
141145
}
142146
}
143147

144148
impl From<&[ProjectionExpr]> for ProjectionExprs {
145149
fn from(value: &[ProjectionExpr]) -> Self {
146150
Self {
147-
exprs: value.to_vec(),
151+
exprs: value.iter().cloned().collect(),
148152
}
149153
}
150154
}
151155

152156
impl FromIterator<ProjectionExpr> for ProjectionExprs {
153157
fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
154158
Self {
155-
exprs: exprs.into_iter().collect::<Vec<_>>(),
159+
exprs: exprs.into_iter().collect(),
156160
}
157161
}
158162
}
@@ -164,12 +168,17 @@ impl AsRef<[ProjectionExpr]> for ProjectionExprs {
164168
}
165169

166170
impl ProjectionExprs {
167-
pub fn new<I>(exprs: I) -> Self
168-
where
169-
I: IntoIterator<Item = ProjectionExpr>,
170-
{
171+
/// Make a new [`ProjectionExprs`] from expressions iterator.
172+
pub fn new(exprs: impl IntoIterator<Item = ProjectionExpr>) -> Self {
173+
Self {
174+
exprs: exprs.into_iter().collect(),
175+
}
176+
}
177+
178+
/// Make a new [`ProjectionExprs`] from expressions.
179+
pub fn from_expressions(exprs: impl Into<Arc<[ProjectionExpr]>>) -> Self {
171180
Self {
172-
exprs: exprs.into_iter().collect::<Vec<_>>(),
181+
exprs: exprs.into(),
173182
}
174183
}
175184

@@ -285,13 +294,14 @@ impl ProjectionExprs {
285294
{
286295
let exprs = self
287296
.exprs
288-
.into_iter()
297+
.iter()
298+
.cloned()
289299
.map(|mut proj| {
290300
proj.expr = f(proj.expr)?;
291301
Ok(proj)
292302
})
293-
.collect::<Result<Vec<_>>>()?;
294-
Ok(Self::new(exprs))
303+
.collect::<Result<Arc<_>>>()?;
304+
Ok(Self::from_expressions(exprs))
295305
}
296306

297307
/// Apply another projection on top of this projection, returning the combined projection.
@@ -361,7 +371,7 @@ impl ProjectionExprs {
361371
/// applied on top of this projection.
362372
pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
363373
let mut new_exprs = Vec::with_capacity(other.exprs.len());
364-
for proj_expr in &other.exprs {
374+
for proj_expr in other.exprs.iter() {
365375
let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
366376
.ok_or_else(|| {
367377
internal_datafusion_err!(
@@ -602,12 +612,12 @@ impl ProjectionExprs {
602612
/// ```
603613
pub fn project_statistics(
604614
&self,
605-
mut stats: datafusion_common::Statistics,
615+
mut stats: Statistics,
606616
output_schema: &Schema,
607-
) -> Result<datafusion_common::Statistics> {
617+
) -> Result<Statistics> {
608618
let mut column_statistics = vec![];
609619

610-
for proj_expr in &self.exprs {
620+
for proj_expr in self.exprs.iter() {
611621
let expr = &proj_expr.expr;
612622
let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
613623
std::mem::take(&mut stats.column_statistics[col.index()])
@@ -754,13 +764,52 @@ impl Projector {
754764
}
755765
}
756766

757-
impl IntoIterator for ProjectionExprs {
758-
type Item = ProjectionExpr;
759-
type IntoIter = std::vec::IntoIter<ProjectionExpr>;
767+
/// Describes an immutable reference counted projection.
768+
///
769+
/// This structure represents projecting a set of columns by index.
770+
/// [`Arc`] is used to make it cheap to clone.
771+
pub type ProjectionRef = Arc<[usize]>;
760772

761-
fn into_iter(self) -> Self::IntoIter {
762-
self.exprs.into_iter()
763-
}
773+
/// Combine two projections.
774+
///
775+
/// If `p1` is [`None`] then there are no changes.
776+
/// Otherwise, if passed `p2` is not [`None`] then it is remapped
777+
/// according to the `p1`. Otherwise, there are no changes.
778+
///
779+
/// # Example
780+
///
781+
/// If stored projection is [0, 2] and we call `apply_projection([0, 2, 3])`,
782+
/// then the resulting projection will be [0, 3].
783+
///
784+
/// # Error
785+
///
786+
/// Returns an internal error if `p1` contains index that is greater than `p2` len.
787+
///
788+
pub fn combine_projections(
789+
p1: Option<&ProjectionRef>,
790+
p2: Option<&ProjectionRef>,
791+
) -> Result<Option<ProjectionRef>> {
792+
let Some(p1) = p1 else {
793+
return Ok(None);
794+
};
795+
let Some(p2) = p2 else {
796+
return Ok(Some(Arc::clone(p1)));
797+
};
798+
799+
Ok(Some(
800+
p1.iter()
801+
.map(|i| {
802+
let idx = *i;
803+
assert_or_internal_err!(
804+
idx < p2.len(),
805+
"unable to apply projection: index {} is greater than new projection len {}",
806+
idx,
807+
p2.len(),
808+
);
809+
Ok(p2[*i])
810+
})
811+
.collect::<Result<Arc<[usize]>>>()?,
812+
))
764813
}
765814

766815
/// The function operates in two modes:

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use datafusion_physical_plan::aggregates::{
4949
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
5050
use datafusion_physical_plan::execution_plan::EmissionType;
5151
use datafusion_physical_plan::joins::{
52-
CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
52+
CrossJoinExec, HashJoinExec, HashJoinExecBuilder, PartitionMode, SortMergeJoinExec,
5353
};
5454
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
5555
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -305,18 +305,19 @@ pub fn adjust_input_keys_ordering(
305305
Vec<(PhysicalExprRef, PhysicalExprRef)>,
306306
Vec<SortOptions>,
307307
)| {
308-
HashJoinExec::try_new(
308+
HashJoinExecBuilder::new(
309309
Arc::clone(left),
310310
Arc::clone(right),
311311
new_conditions.0,
312-
filter.clone(),
313-
join_type,
314-
// TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
315-
projection.clone(),
316-
PartitionMode::Partitioned,
317-
*null_equality,
318-
*null_aware,
312+
*join_type,
319313
)
314+
.with_filter(filter.clone())
315+
// TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
316+
.with_projection_ref(projection.clone())
317+
.with_partition_mode(PartitionMode::Partitioned)
318+
.with_null_equality(*null_equality)
319+
.with_null_aware(*null_aware)
320+
.build()
320321
.map(|e| Arc::new(e) as _)
321322
};
322323
return reorder_partitioned_join_keys(
@@ -638,17 +639,20 @@ pub fn reorder_join_keys_to_inputs(
638639
right_keys,
639640
} = join_keys;
640641
let new_join_on = new_join_conditions(&left_keys, &right_keys);
641-
return Ok(Arc::new(HashJoinExec::try_new(
642-
Arc::clone(left),
643-
Arc::clone(right),
644-
new_join_on,
645-
filter.clone(),
646-
join_type,
647-
projection.clone(),
648-
PartitionMode::Partitioned,
649-
*null_equality,
650-
*null_aware,
651-
)?));
642+
return Ok(Arc::new(
643+
HashJoinExecBuilder::new(
644+
Arc::clone(left),
645+
Arc::clone(right),
646+
new_join_on,
647+
*join_type,
648+
)
649+
.with_filter(filter.clone())
650+
.with_projection_ref(projection.clone())
651+
.with_partition_mode(PartitionMode::Partitioned)
652+
.with_null_equality(*null_equality)
653+
.with_null_aware(*null_aware)
654+
.build()?,
655+
));
652656
}
653657
}
654658
} else if let Some(SortMergeJoinExec {

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ fn handle_hash_join(
723723
.collect();
724724

725725
let column_indices = build_join_column_index(plan);
726-
let projected_indices: Vec<_> = if let Some(projection) = &plan.projection {
726+
let projected_indices: Vec<_> = if let Some(projection) = plan.projection.as_ref() {
727727
projection.iter().map(|&i| &column_indices[i]).collect()
728728
} else {
729729
column_indices.iter().collect()

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion_physical_expr::expressions::Column;
3434
use datafusion_physical_plan::execution_plan::EmissionType;
3535
use datafusion_physical_plan::joins::utils::ColumnIndex;
3636
use datafusion_physical_plan::joins::{
37-
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
37+
CrossJoinExec, HashJoinExec, HashJoinExecBuilder, NestedLoopJoinExec, PartitionMode,
3838
StreamJoinPartitionMode, SymmetricHashJoinExec,
3939
};
4040
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
@@ -191,30 +191,18 @@ pub(crate) fn try_collect_left(
191191
{
192192
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
193193
} else {
194-
Ok(Some(Arc::new(HashJoinExec::try_new(
195-
Arc::clone(left),
196-
Arc::clone(right),
197-
hash_join.on().to_vec(),
198-
hash_join.filter().cloned(),
199-
hash_join.join_type(),
200-
hash_join.projection.clone(),
201-
PartitionMode::CollectLeft,
202-
hash_join.null_equality(),
203-
hash_join.null_aware,
204-
)?)))
194+
Ok(Some(Arc::new(
195+
HashJoinExecBuilder::from(hash_join)
196+
.with_partition_mode(PartitionMode::CollectLeft)
197+
.build()?,
198+
)))
205199
}
206200
}
207-
(true, false) => Ok(Some(Arc::new(HashJoinExec::try_new(
208-
Arc::clone(left),
209-
Arc::clone(right),
210-
hash_join.on().to_vec(),
211-
hash_join.filter().cloned(),
212-
hash_join.join_type(),
213-
hash_join.projection.clone(),
214-
PartitionMode::CollectLeft,
215-
hash_join.null_equality(),
216-
hash_join.null_aware,
217-
)?))),
201+
(true, false) => Ok(Some(Arc::new(
202+
HashJoinExecBuilder::from(hash_join)
203+
.with_partition_mode(PartitionMode::CollectLeft)
204+
.build()?,
205+
))),
218206
(false, true) => {
219207
// Don't swap null-aware anti joins as they have specific side requirements
220208
if hash_join.join_type().supports_swap() && !hash_join.null_aware {
@@ -254,17 +242,11 @@ pub(crate) fn partitioned_hash_join(
254242
PartitionMode::Partitioned
255243
};
256244

257-
Ok(Arc::new(HashJoinExec::try_new(
258-
Arc::clone(left),
259-
Arc::clone(right),
260-
hash_join.on().to_vec(),
261-
hash_join.filter().cloned(),
262-
hash_join.join_type(),
263-
hash_join.projection.clone(),
264-
partition_mode,
265-
hash_join.null_equality(),
266-
hash_join.null_aware,
267-
)?))
245+
Ok(Arc::new(
246+
HashJoinExecBuilder::from(hash_join)
247+
.with_partition_mode(partition_mode)
248+
.build()?,
249+
))
268250
}
269251
}
270252

datafusion/physical-optimizer/src/projection_pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ fn try_push_down_join_filter(
135135
);
136136

137137
let new_lhs_length = lhs_rewrite.data.0.schema().fields.len();
138-
let projections = match projections {
138+
let projections = match projections.as_ref() {
139139
None => match join.join_type() {
140140
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
141141
// Build projections that ignore the newly projected columns.

0 commit comments

Comments
 (0)