Skip to content

Commit 1b30cc9

Browse files
author
zhangli20
committed
AggExec: implement columnar accumulator states.
refactor execution context
1 parent 15751e9 commit 1b30cc9

72 files changed

Lines changed: 4503 additions & 6382 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/tpcds-reusable.yml

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -120,23 +120,13 @@ jobs:
120120
- name: Install Blaze JAR
121121
run: cp blaze-engine-*${{ inputs.sparkver }}*.jar spark-bin-${{ inputs.sparkver }}/jars/
122122

123-
- name: Run with BHJ
123+
- name: Run
124124
run: |
125125
export RUST_LOG=ERROR
126126
export RUST_BACKTRACE=1
127127
SPARK_HOME=spark-bin-${{ inputs.sparkver }} dev/run-tpcds-test \
128128
--data-location dev/tpcds_1g \
129129
--conf spark.driver.memory=3g \
130-
--conf spark.driver.memoryOverhead=2048 \
131-
--query-filter ${{ matrix.query }}
132-
133-
- name: Run without BHJ
134-
run: |
135-
export RUST_LOG=ERROR
136-
export RUST_BACKTRACE=1
137-
SPARK_HOME=spark-bin-${{ inputs.sparkver }} dev/run-tpcds-test \
138-
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
139-
--conf spark.driver.memory=3g \
140-
--conf spark.driver.memoryOverhead=2048 \
141-
--data-location dev/tpcds_1g \
130+
--conf spark.driver.memoryOverhead=2536 \
131+
--conf spark.sql.broadcastTimeout=900s \
142132
--query-filter ${{ matrix.query }}

Cargo.lock

Lines changed: 9 additions & 33 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native-engine/blaze-serde/src/from_proto.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use datafusion_ext_exprs::{
5959
string_ends_with::StringEndsWithExpr, string_starts_with::StringStartsWithExpr,
6060
};
6161
use datafusion_ext_plans::{
62-
agg::{create_agg, AggExecMode, AggExpr, AggFunction, AggMode, GroupingExpr},
62+
agg::{agg::create_agg, AggExecMode, AggExpr, AggFunction, AggMode, GroupingExpr},
6363
agg_exec::AggExec,
6464
broadcast_join_build_hash_map_exec::BroadcastJoinBuildHashMapExec,
6565
broadcast_join_exec::BroadcastJoinExec,
@@ -536,7 +536,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
536536
exec_mode,
537537
physical_groupings,
538538
physical_aggs,
539-
agg.initial_input_buffer_offset as usize,
540539
agg.supports_partial_skipping,
541540
input,
542541
)?))

native-engine/blaze/src/rt.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,23 @@ use datafusion::{
3131
common::Result,
3232
error::DataFusionError,
3333
execution::context::TaskContext,
34-
physical_plan::{
35-
metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
36-
ExecutionPlan,
37-
},
34+
physical_plan::{metrics::ExecutionPlanMetricsSet, ExecutionPlan},
35+
};
36+
use datafusion_ext_commons::df_execution_err;
37+
use datafusion_ext_plans::{
38+
common::{execution_context::ExecutionContext, output::TaskOutputter},
39+
parquet_sink_exec::ParquetSinkExec,
3840
};
39-
use datafusion_ext_commons::{df_execution_err, streams::coalesce_stream::CoalesceInput};
40-
use datafusion_ext_plans::{common::output::TaskOutputter, parquet_sink_exec::ParquetSinkExec};
4141
use futures::{FutureExt, StreamExt};
4242
use jni::objects::{GlobalRef, JObject};
4343
use tokio::runtime::Runtime;
4444

4545
use crate::{handle_unwinded_scope, metrics::update_spark_metric_node};
4646

4747
pub struct NativeExecutionRuntime {
48+
exec_ctx: Arc<ExecutionContext>,
4849
native_wrapper: GlobalRef,
4950
plan: Arc<dyn ExecutionPlan>,
50-
task_context: Arc<TaskContext>,
51-
partition: usize,
5251
batch_receiver: Receiver<Result<Option<RecordBatch>>>,
5352
rt: Runtime,
5453
}
@@ -61,21 +60,23 @@ impl NativeExecutionRuntime {
6160
context: Arc<TaskContext>,
6261
) -> Result<Self> {
6362
// execute plan to output stream
64-
let stream = plan.execute(partition, context.clone())?;
65-
let schema = stream.schema();
63+
let exec_ctx = ExecutionContext::new(
64+
context.clone(),
65+
partition,
66+
plan.schema(),
67+
&ExecutionPlanMetricsSet::new(),
68+
);
69+
let stream = exec_ctx.execute(&plan)?;
6670

6771
// coalesce
6872
let mut stream = if plan.as_any().downcast_ref::<ParquetSinkExec>().is_some() {
6973
stream // cannot coalesce parquet sink output
7074
} else {
71-
context.coalesce_with_default_batch_size(
72-
stream,
73-
&BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), partition),
74-
)?
75+
exec_ctx.coalesce_with_default_batch_size(stream)
7576
};
7677

7778
// init ffi schema
78-
let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())?;
79+
let ffi_schema = FFI_ArrowSchema::try_from(exec_ctx.output_schema().as_ref())?;
7980
jni_call!(BlazeCallNativeWrapper(native_wrapper.as_obj())
8081
.importSchema(&ffi_schema as *const FFI_ArrowSchema as i64) -> ()
8182
)?;
@@ -98,12 +99,11 @@ impl NativeExecutionRuntime {
9899

99100
let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);
100101
let nrt = Self {
102+
exec_ctx,
101103
native_wrapper: native_wrapper.clone(),
102104
plan,
103-
partition,
104105
rt,
105106
batch_receiver,
106-
task_context: context,
107107
};
108108

109109
// spawn batch producer
@@ -188,7 +188,7 @@ impl NativeExecutionRuntime {
188188
}
189189
};
190190

191-
let partition = self.partition;
191+
let partition = self.exec_ctx.partition_id();
192192
match next_batch() {
193193
Ok(ret) => return ret,
194194
Err(err) => {
@@ -203,13 +203,13 @@ impl NativeExecutionRuntime {
203203
}
204204

205205
pub fn finalize(self) {
206-
let partition = self.partition;
206+
let partition = self.exec_ctx.partition_id();
207207

208208
log::info!("[partition={partition}] native execution finalizing");
209209
self.update_metrics().unwrap_or_default();
210210
drop(self.plan);
211211

212-
self.task_context.cancel_task(); // cancel all pending streams
212+
self.exec_ctx.task_ctx().cancel_task(); // cancel all pending streams
213213
self.rt.shutdown_background();
214214
log::info!("[partition={partition}] native execution finalized");
215215
}

native-engine/datafusion-ext-commons/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ num = "0.4.2"
2525
once_cell = "1.20.2"
2626
paste = "1.0.15"
2727
radsort = "0.1.1"
28-
slimmer_box = "0.6.5"
2928
tempfile = "3"
3029
thrift = "0.17.0"
3130
tokio = "1.41"

native-engine/datafusion-ext-commons/src/bytes_arena.rs

Lines changed: 0 additions & 99 deletions
This file was deleted.

native-engine/datafusion-ext-commons/src/io/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,22 @@ pub fn read_u8<R: Read>(input: &mut R) -> std::io::Result<u8> {
138138
Ok(buf[0])
139139
}
140140

141+
pub fn read_bytes_into_vec<R: Read>(
142+
input: &mut R,
143+
buf: &mut Vec<u8>,
144+
len: usize,
145+
) -> std::io::Result<()> {
146+
buf.reserve(len);
147+
unsafe {
148+
// safety: space has been reserved
149+
input.read_exact(std::slice::from_raw_parts_mut(
150+
buf.as_mut_ptr().add(buf.len()),
151+
len,
152+
))?;
153+
buf.set_len(buf.len() + len);
154+
}
155+
Ok(())
156+
}
141157
pub fn read_bytes_slice<R: Read>(input: &mut R, len: usize) -> std::io::Result<Box<[u8]>> {
142158
// safety - assume_init() is safe for [u8]
143159
let mut byte_slice = unsafe { Box::new_uninit_slice(len).assume_init() };

0 commit comments

Comments
 (0)