Skip to content

Commit 37c4f68

Browse files
richoxzhangli20
andauthored
remove bind() logic in from_proto.rs (#777)
fix broadcast join shuffle read check fix partitioning matching in rss shuffle writer reduce agg output memory usage change protobufs default recursion limit to Int.MaxValue fix SortAgg bug fix SortMergeJoin bug add conf: spark.blaze.partialAggSkipping.skipSpill add conf: spark.blaze.tokio.worker.threads.per.cpu use case-insensitive field matching in scan schema adaptor Co-authored-by: zhangli20 <zhangli20@kuaishou.com>
1 parent 86bf34b commit 37c4f68

17 files changed

Lines changed: 270 additions & 175 deletions

File tree

native-engine/blaze-jni-bridge/src/conf.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,11 @@ define_conf!(BooleanConf, IGNORE_CORRUPTED_FILES);
3737
define_conf!(BooleanConf, PARTIAL_AGG_SKIPPING_ENABLE);
3838
define_conf!(DoubleConf, PARTIAL_AGG_SKIPPING_RATIO);
3939
define_conf!(IntConf, PARTIAL_AGG_SKIPPING_MIN_ROWS);
40+
define_conf!(BooleanConf, PARTIAL_AGG_SKIPPING_SKIP_SPILL);
4041
define_conf!(BooleanConf, PARQUET_ENABLE_PAGE_FILTERING);
4142
define_conf!(BooleanConf, PARQUET_ENABLE_BLOOM_FILTER);
4243
define_conf!(StringConf, SPARK_IO_COMPRESSION_CODEC);
44+
define_conf!(IntConf, TOKIO_WORKER_THREADS_PER_CPU);
4345
define_conf!(IntConf, SPARK_TASK_CPUS);
4446
define_conf!(StringConf, SPILL_COMPRESSION_CODEC);
4547
define_conf!(BooleanConf, SMJ_FALLBACK_ENABLE);

native-engine/blaze-serde/src/from_proto.rs

Lines changed: 20 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use datafusion::{
3333
object_store::ObjectStoreUrl,
3434
physical_plan::FileScanConfig,
3535
},
36-
error::DataFusionError,
3736
logical_expr::{ColumnarValue, Operator, ScalarUDF, Volatility},
3837
physical_expr::{
3938
expressions::{in_list, LikeExpr, SCAndExpr, SCOrExpr},
@@ -101,31 +100,6 @@ use crate::{
101100
Schema,
102101
};
103102

104-
fn bind(
105-
expr_in: Arc<dyn PhysicalExpr>,
106-
input_schema: &Arc<Schema>,
107-
) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
108-
let expr = expr_in.as_any();
109-
110-
if let Some(expr) = expr.downcast_ref::<Column>() {
111-
if expr.name() == "__bound_reference__" {
112-
Ok(Arc::new(expr.clone()))
113-
} else {
114-
Ok(Arc::new(Column::new_with_schema(
115-
expr.name(),
116-
input_schema,
117-
)?))
118-
}
119-
} else {
120-
let new_children = expr_in
121-
.children()
122-
.iter()
123-
.map(|&child_expr| bind(child_expr.clone(), input_schema))
124-
.collect::<Result<Vec<_>, DataFusionError>>()?;
125-
Ok(expr_in.with_new_children(new_children)?)
126-
}
127-
}
128-
129103
impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
130104
type Error = PlanSerDeError;
131105

@@ -145,10 +119,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
145119
.zip(projection.expr_name.iter())
146120
.map(|(expr, name)| {
147121
Ok((
148-
bind(
149-
try_parse_physical_expr(expr, &input.schema())?,
150-
&input.schema(),
151-
)?,
122+
try_parse_physical_expr(expr, &input.schema())?,
152123
name.to_string(),
153124
))
154125
})
@@ -160,12 +131,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
160131
let predicates = filter
161132
.expr
162133
.iter()
163-
.map(|expr| {
164-
Ok(bind(
165-
try_parse_physical_expr(expr, &input.schema())?,
166-
&input.schema(),
167-
)?)
168-
})
134+
.map(|expr| try_parse_physical_expr(expr, &input.schema()))
169135
.collect::<Result<_, Self::Error>>()?;
170136
Ok(Arc::new(FilterExec::try_new(predicates, input)?))
171137
}
@@ -213,11 +179,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
213179
.map(|col| {
214180
let left_key =
215181
try_parse_physical_expr(&col.left.as_ref().unwrap(), &left.schema())?;
216-
let left_key_binded = bind(left_key, &left.schema())?;
217182
let right_key =
218183
try_parse_physical_expr(&col.right.as_ref().unwrap(), &right.schema())?;
219-
let right_key_binded = bind(right_key, &right.schema())?;
220-
Ok((left_key_binded, right_key_binded))
184+
Ok((left_key, right_key))
221185
})
222186
.collect::<Result<_, Self::Error>>()?;
223187

@@ -252,11 +216,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
252216
.map(|col| {
253217
let left_key =
254218
try_parse_physical_expr(&col.left.as_ref().unwrap(), &left.schema())?;
255-
let left_key_binded = bind(left_key, &left.schema())?;
256219
let right_key =
257220
try_parse_physical_expr(&col.right.as_ref().unwrap(), &right.schema())?;
258-
let right_key_binded = bind(right_key, &right.schema())?;
259-
Ok((left_key_binded, right_key_binded))
221+
Ok((left_key, right_key))
260222
})
261223
.collect::<Result<_, Self::Error>>()?;
262224

@@ -347,12 +309,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
347309
let keys = bhm
348310
.keys
349311
.iter()
350-
.map(|expr| {
351-
Ok(bind(
352-
try_parse_physical_expr(expr, &input.schema())?,
353-
&input.schema(),
354-
)?)
355-
})
312+
.map(|expr| try_parse_physical_expr(expr, &input.schema()))
356313
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, Self::Error>>()?;
357314
Ok(Arc::new(BroadcastJoinBuildHashMapExec::new(input, keys)))
358315
}
@@ -366,11 +323,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
366323
.map(|col| {
367324
let left_key =
368325
try_parse_physical_expr(&col.left.as_ref().unwrap(), &left.schema())?;
369-
let left_key_binded = bind(left_key, &left.schema())?;
370326
let right_key =
371327
try_parse_physical_expr(&col.right.as_ref().unwrap(), &right.schema())?;
372-
let right_key_binded = bind(right_key, &right.schema())?;
373-
Ok((left_key_binded, right_key_binded))
328+
Ok((left_key, right_key))
374329
})
375330
.collect::<Result<_, Self::Error>>()?;
376331

@@ -448,10 +403,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
448403
.zip(agg.grouping_expr_name.iter())
449404
.map(|(expr, name)| {
450405
try_parse_physical_expr(expr, &input_schema).and_then(|expr| {
451-
Ok(bind(expr, &input_schema).map(|expr| GroupingExpr {
406+
Ok(GroupingExpr {
452407
expr,
453408
field_name: name.to_owned(),
454-
})?)
409+
})
455410
})
456411
})
457412
.collect::<Result<Vec<_>, _>>()?;
@@ -480,10 +435,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
480435
let agg_children_exprs = agg_node
481436
.children
482437
.iter()
483-
.map(|expr| {
484-
try_parse_physical_expr(expr, &input_schema)
485-
.and_then(|expr| Ok(bind(expr, &input_schema)?))
486-
})
438+
.map(|expr| try_parse_physical_expr(expr, &input_schema))
487439
.collect::<Result<Vec<_>, _>>()?;
488440

489441
Ok(AggExpr {
@@ -532,12 +484,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
532484
projection
533485
.expr
534486
.iter()
535-
.map(|expr| {
536-
Ok(bind(
537-
try_parse_physical_expr(expr, &input.schema())?,
538-
&input.schema(),
539-
)?)
540-
})
487+
.map(|expr| try_parse_physical_expr(expr, &input.schema()))
541488
.collect::<Result<Vec<_>, Self::Error>>()
542489
})
543490
.collect::<Result<Vec<_>, _>>()?;
@@ -565,12 +512,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
565512
let children = w
566513
.children
567514
.iter()
568-
.map(|expr| {
569-
Ok(bind(
570-
try_parse_physical_expr(expr, &input.schema())?,
571-
&input.schema(),
572-
)?)
573-
})
515+
.map(|expr| try_parse_physical_expr(expr, &input.schema()))
574516
.collect::<Result<Vec<_>, Self::Error>>()?;
575517

576518
let window_func = match w.func_type() {
@@ -623,12 +565,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
623565
let partition_specs = window
624566
.partition_spec
625567
.iter()
626-
.map(|expr| {
627-
Ok(bind(
628-
try_parse_physical_expr(expr, &input.schema())?,
629-
&input.schema(),
630-
)?)
631-
})
568+
.map(|expr| try_parse_physical_expr(expr, &input.schema()))
632569
.collect::<Result<Vec<_>, Self::Error>>()?;
633570

634571
let order_specs = window
@@ -637,8 +574,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
637574
.map(|expr| {
638575
let expr = expr.expr_type.as_ref().ok_or_else(|| {
639576
proto_error(format!(
640-
"physical_plan::from_proto() Unexpected expr {:?}",
641-
self
577+
"physical_plan::from_proto() Unexpected expr {self:?}",
642578
))
643579
})?;
644580
if let protobuf::physical_expr_node::ExprType::Sort(sort_expr) = expr {
@@ -653,10 +589,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
653589
})?
654590
.as_ref();
655591
Ok(PhysicalSortExpr {
656-
expr: bind(
657-
try_parse_physical_expr(expr, &input.schema())?,
658-
&input.schema(),
659-
)?,
592+
expr: try_parse_physical_expr(expr, &input.schema())?,
660593
options: SortOptions {
661594
descending: !sort_expr.asc,
662595
nulls_first: sort_expr.nulls_first,
@@ -688,12 +621,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
688621

689622
let children = pb_generator_children
690623
.iter()
691-
.map(|expr| {
692-
Ok::<_, PlanSerDeError>(bind(
693-
try_parse_physical_expr(expr, &input_schema)?,
694-
&input_schema,
695-
)?)
696-
})
624+
.map(|expr| try_parse_physical_expr(expr, &input_schema))
697625
.collect::<Result<Vec<_>, _>>()?;
698626

699627
let generator = match pb_generate_func {
@@ -856,10 +784,7 @@ fn try_parse_physical_expr(
856784

857785
let pexpr: Arc<dyn PhysicalExpr> =
858786
match expr_type {
859-
ExprType::Column(c) => {
860-
let pcol: Column = c.into();
861-
Arc::new(pcol)
862-
}
787+
ExprType::Column(c) => Arc::new(Column::new(&c.name, input_schema.index_of(&c.name)?)),
863788
ExprType::Literal(scalar) => Arc::new(Literal::new(convert_required!(scalar.value)?)),
864789
ExprType::BoundReference(bound_reference) => {
865790
let pcol: Column = bound_reference.into();
@@ -894,11 +819,10 @@ fn try_parse_physical_expr(
894819
try_parse_physical_expr_box_required(&e.expr, input_schema)?,
895820
)),
896821
ExprType::InList(e) => {
897-
let expr = try_parse_physical_expr_box_required(&e.expr, input_schema)
898-
.and_then(|expr| Ok(bind(expr, input_schema)?))?; // materialize expr.data_type
822+
let expr = try_parse_physical_expr_box_required(&e.expr, input_schema)?;
899823
let dt = expr.data_type(input_schema)?;
900824
in_list(
901-
bind(expr, input_schema)?,
825+
expr,
902826
e.list
903827
.iter()
904828
.map(|x| {
@@ -1111,10 +1035,7 @@ fn try_parse_physical_sort_expr(
11111035
})?
11121036
.as_ref();
11131037
Ok(PhysicalSortExpr {
1114-
expr: bind(
1115-
try_parse_physical_expr(expr, &input.schema())?,
1116-
&input.schema(),
1117-
)?,
1038+
expr: try_parse_physical_expr(expr, &input.schema())?,
11181039
options: SortOptions {
11191040
descending: !sort_expr.asc,
11201041
nulls_first: sort_expr.nulls_first,
@@ -1150,10 +1071,7 @@ pub fn parse_protobuf_partitioning(
11501071
let expr = hash_part
11511072
.hash_expr
11521073
.iter()
1153-
.map(|e| {
1154-
try_parse_physical_expr(e, &input.schema())
1155-
.and_then(|e| Ok(bind(e, &input.schema())?))
1156-
})
1074+
.map(|e| try_parse_physical_expr(e, &input.schema()))
11571075
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
11581076
Ok(Some(Partitioning::HashPartitioning(
11591077
expr,

native-engine/blaze/src/rt.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow::{
2424
record_batch::RecordBatch,
2525
};
2626
use blaze_jni_bridge::{
27-
conf::{IntConf, SPARK_TASK_CPUS},
27+
conf::{IntConf, SPARK_TASK_CPUS, TOKIO_WORKER_THREADS_PER_CPU},
2828
is_task_running,
2929
jni_bridge::JavaClasses,
3030
jni_call, jni_call_static, jni_convert_byte_array, jni_exception_check, jni_exception_occurred,
@@ -95,13 +95,19 @@ impl NativeExecutionRuntime {
9595
&ExecutionPlanMetricsSet::new(),
9696
);
9797

98+
let num_worker_threads = {
99+
let worker_threads_per_cpu = TOKIO_WORKER_THREADS_PER_CPU.value().unwrap_or(0);
100+
let spark_task_cpus = SPARK_TASK_CPUS.value().unwrap_or(0);
101+
worker_threads_per_cpu * spark_task_cpus
102+
};
103+
98104
// create tokio runtime
99105
// propagate classloader and task context to spawned children threads
100106
let spark_task_context = jni_call_static!(JniBridge.getTaskContext() -> JObject)?;
101107
let spark_task_context_global = jni_new_global_ref!(spark_task_context.as_obj())?;
102-
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
108+
let mut tokio_runtime_builder = tokio::runtime::Builder::new_multi_thread();
109+
tokio_runtime_builder
103110
.thread_name(format!("blaze-native-stage-{stage_id}-part-{partition_id}"))
104-
.worker_threads(SPARK_TASK_CPUS.value().unwrap_or(1) as usize)
105111
.on_thread_start(move || {
106112
let classloader = JavaClasses::get().classloader;
107113
let _ = jni_call_static!(
@@ -112,8 +118,11 @@ impl NativeExecutionRuntime {
112118
);
113119
THREAD_STAGE_ID.set(stage_id);
114120
THREAD_PARTITION_ID.set(partition_id);
115-
})
116-
.build()?;
121+
});
122+
if num_worker_threads > 0 {
123+
tokio_runtime_builder.worker_threads(num_worker_threads as usize);
124+
}
125+
let tokio_runtime = tokio_runtime_builder.build()?;
117126

118127
// spawn batch producer
119128
let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);

native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow::{
2525
};
2626
use blaze_jni_bridge::{
2727
conf,
28-
conf::{DoubleConf, IntConf},
28+
conf::{BooleanConf, DoubleConf, IntConf},
2929
};
3030
use datafusion::{
3131
common::{cast::as_binary_array, Result},
@@ -60,6 +60,7 @@ pub struct AggContext {
6060
pub supports_partial_skipping: bool,
6161
pub partial_skipping_ratio: f64,
6262
pub partial_skipping_min_rows: usize,
63+
pub partial_skipping_skip_spill: bool,
6364
pub is_expand_agg: bool,
6465
pub agg_expr_evaluator: CachedExprsEvaluator,
6566
}
@@ -162,14 +163,18 @@ impl AggContext {
162163
agg_expr_evaluator_output_schema,
163164
)?;
164165

165-
let (partial_skipping_ratio, partial_skipping_min_rows) = if supports_partial_skipping {
166-
(
167-
conf::PARTIAL_AGG_SKIPPING_RATIO.value().unwrap_or(0.999),
168-
conf::PARTIAL_AGG_SKIPPING_MIN_ROWS.value().unwrap_or(20000) as usize,
169-
)
170-
} else {
171-
Default::default()
172-
};
166+
let (partial_skipping_ratio, partial_skipping_min_rows, partial_skipping_skip_spill) =
167+
if supports_partial_skipping {
168+
(
169+
conf::PARTIAL_AGG_SKIPPING_RATIO.value().unwrap_or(0.999),
170+
conf::PARTIAL_AGG_SKIPPING_MIN_ROWS.value().unwrap_or(20000) as usize,
171+
conf::PARTIAL_AGG_SKIPPING_SKIP_SPILL
172+
.value()
173+
.unwrap_or(false),
174+
)
175+
} else {
176+
Default::default()
177+
};
173178

174179
Ok(Self {
175180
exec_mode,
@@ -186,6 +191,7 @@ impl AggContext {
186191
supports_partial_skipping,
187192
partial_skipping_ratio,
188193
partial_skipping_min_rows,
194+
partial_skipping_skip_spill,
189195
is_expand_agg,
190196
})
191197
}

0 commit comments

Comments
 (0)