Skip to content

Commit 952da0f

Browse files
Blaze Sort->MergeJoin reduces key row and column conversion (#1078)
1 parent f1c7502 commit 952da0f

15 files changed

Lines changed: 1938 additions & 470 deletions

.idea/vcs.xml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -409,20 +409,19 @@ async fn execute_join_with_smj_fallback(
409409
};
410410

411411
// create sorted streams, build side is already sorted
412-
let probed = exec_ctx.stat_input(exec_ctx.execute(&probed_plan)?);
413412
let (left_exec, right_exec) = match broadcast_side {
414413
JoinSide::Left => (
415414
built_sorted,
416415
create_default_ascending_sort_exec(
417-
create_record_batch_stream_exec(probed, exec_ctx.partition_id())?,
416+
probed_plan,
418417
&join_params.right_keys,
419418
Some(exec_ctx.execution_plan_metrics().clone()),
420419
false, // do not record output metric
421420
),
422421
),
423422
JoinSide::Right => (
424423
create_default_ascending_sort_exec(
425-
create_record_batch_stream_exec(probed, exec_ctx.partition_id())?,
424+
probed_plan,
426425
&join_params.left_keys,
427426
Some(exec_ctx.execution_plan_metrics().clone()),
428427
false, // do not record output metric
@@ -432,13 +431,18 @@ async fn execute_join_with_smj_fallback(
432431
};
433432

434433
// run sort merge join
435-
let mut smj_join_params = join_params.clone();
436-
smj_join_params.sort_options = vec![SortOptions::default(); join_params.left_keys.len()];
437-
438-
let smj_exec = Arc::new(SortMergeJoinExec::try_new_with_join_params(
434+
let smj_exec = Arc::new(SortMergeJoinExec::try_new(
435+
join_params.output_schema,
439436
left_exec.clone(),
440437
right_exec.clone(),
441-
smj_join_params,
438+
join_params
439+
.left_keys
440+
.to_vec()
441+
.into_iter()
442+
.zip(join_params.right_keys.to_vec())
443+
.collect(),
444+
join_params.join_type,
445+
vec![SortOptions::default(); join_params.left_keys.len()],
442446
)?);
443447
let mut join_output = smj_exec.execute(exec_ctx.partition_id(), exec_ctx.task_ctx())?;
444448

native-engine/datafusion-ext-plans/src/common/column_pruning.rs

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ use datafusion::{
3030
physical_expr::{PhysicalExprRef, expressions::Column, utils::collect_columns},
3131
physical_plan::{ExecutionPlan, stream::RecordBatchStreamAdapter},
3232
};
33+
use datafusion_ext_commons::downcast_any;
3334
use futures::StreamExt;
3435
use itertools::Itertools;
36+
use parking_lot::Mutex;
3537

3638
pub trait ExecuteWithColumnPruning {
3739
fn execute_projected(
@@ -78,24 +80,55 @@ pub fn prune_columns(exprs: &[PhysicalExprRef]) -> Result<(Vec<PhysicalExprRef>,
7880
.collect();
7981
let mapped_exprs: Vec<PhysicalExprRef> = exprs
8082
.iter()
81-
.map(|expr| {
82-
expr.clone()
83-
.transform_down(&|node: PhysicalExprRef| {
84-
Ok(Transformed::yes(
85-
if let Some(column) = node.as_any().downcast_ref::<Column>() {
86-
let mapped_idx = required_columns_mapping[&column.index()];
87-
Arc::new(Column::new(column.name(), mapped_idx))
88-
} else {
89-
node
90-
},
91-
))
92-
})
93-
.map(|r| r.data)
94-
})
95-
.collect::<Result<_>>()?;
83+
.map(|expr| map_columns(expr, &required_columns_mapping))
84+
.collect();
9685

9786
Ok((
9887
mapped_exprs,
9988
required_columns.into_iter().map(|c| c.index()).collect(),
10089
))
10190
}
91+
92+
pub fn extend_projection_by_expr(
93+
projection: &mut Vec<usize>,
94+
expr: &PhysicalExprRef,
95+
) -> PhysicalExprRef {
96+
let projection = Arc::new(Mutex::new(projection));
97+
expr.clone()
98+
.transform_down(&|node: PhysicalExprRef| {
99+
Ok(Transformed::yes(
100+
if let Ok(column) = downcast_any!(node, Column) {
101+
let mut projection = projection.lock();
102+
if let Some(existed_idx) =
103+
projection.iter().position(|&idx| idx == column.index())
104+
{
105+
Arc::new(Column::new(column.name(), existed_idx)) as PhysicalExprRef
106+
} else {
107+
let new_idx = projection.len();
108+
projection.push(column.index());
109+
Arc::new(Column::new(column.name(), new_idx)) as PhysicalExprRef
110+
}
111+
} else {
112+
node
113+
},
114+
))
115+
})
116+
.map(|r| r.data)
117+
.unwrap()
118+
}
119+
120+
pub fn map_columns(expr: &PhysicalExprRef, mapping: &HashMap<usize, usize>) -> PhysicalExprRef {
121+
expr.clone()
122+
.transform_down(&|node: PhysicalExprRef| {
123+
Ok(Transformed::yes(
124+
if let Ok(column) = downcast_any!(node, Column) {
125+
let mapped_idx = mapping[&column.index()];
126+
Arc::new(Column::new(column.name(), mapped_idx))
127+
} else {
128+
node
129+
},
130+
))
131+
})
132+
.map(|r| r.data)
133+
.unwrap()
134+
}

0 commit comments

Comments
 (0)