diff --git a/native-engine/datafusion-ext-plans/src/agg_exec.rs b/native-engine/datafusion-ext-plans/src/agg_exec.rs index 2fa0f5838..2b4c1e029 100644 --- a/native-engine/datafusion-ext-plans/src/agg_exec.rs +++ b/native-engine/datafusion-ext-plans/src/agg_exec.rs @@ -188,6 +188,7 @@ async fn execute_agg_with_grouping_hash( metrics: ExecutionPlanMetricsSet, ) -> Result { // create tables + let baseline_metrics = BaselineMetrics::new(&metrics, partition_id); let tables = Arc::new(AggTable::new( partition_id, agg_ctx.clone(), @@ -211,6 +212,7 @@ async fn execute_agg_with_grouping_hash( .map_err(|err| err.context("agg: polling batches from input error"))? { // insert or update rows into in-mem table + let _timer = baseline_metrics.elapsed_compute().timer(); tables.process_input_batch(input_batch).await?; // stop aggregating if triggered partial skipping @@ -224,7 +226,10 @@ async fn execute_agg_with_grouping_hash( // merge all tables and output let output_schema = agg_ctx.output_schema.clone(); let output = context.output_with_sender("Agg", output_schema, |sender| async move { + sender.exclude_time(baseline_metrics.elapsed_compute()); + // output all aggregated records in table + let _timer = baseline_metrics.elapsed_compute().timer(); tables.output(sender.clone()).await?; // in partial skipping mode, there might be unconsumed records in input stream