From cddc636a56c8e778fae76b5b78cd715ccc3efd12 Mon Sep 17 00:00:00 2001 From: zhangli20 Date: Wed, 23 Apr 2025 01:35:24 +0800 Subject: [PATCH] fix imprecise ScalarValue memory size --- .../src/arrow/array_size.rs | 26 +++++++----- .../datafusion-ext-commons/src/lib.rs | 1 + .../src/scalar_value.rs | 41 +++++++++++++++++++ .../src/brickhouse/array_union.rs | 6 ++- .../datafusion-ext-plans/src/agg/acc.rs | 17 ++++---- .../src/agg/agg_hash_map.rs | 4 +- .../datafusion-ext-plans/src/agg/agg_table.rs | 2 +- .../datafusion-ext-plans/src/agg/collect.rs | 3 +- .../datafusion-ext-plans/src/agg/count.rs | 2 +- .../datafusion-ext-plans/src/agg/first.rs | 4 +- .../src/agg/first_ignores_null.rs | 4 +- .../datafusion-ext-plans/src/agg/maxmin.rs | 9 ++-- .../src/broadcast_join_build_hash_map_exec.rs | 4 +- .../src/common/execution_context.rs | 6 +-- .../src/ffi_reader_exec.rs | 4 +- .../src/ipc_reader_exec.rs | 9 ++-- .../src/joins/stream_cursor.rs | 6 +-- .../src/parquet_sink_exec.rs | 4 +- .../src/shuffle/buffered_data.rs | 6 +-- .../datafusion-ext-plans/src/shuffle/mod.rs | 4 +- .../src/shuffle/rss_sort_repartitioner.rs | 4 +- .../src/shuffle/sort_repartitioner.rs | 4 +- .../datafusion-ext-plans/src/sort_exec.rs | 12 +++--- 23 files changed, 121 insertions(+), 61 deletions(-) create mode 100644 native-engine/datafusion-ext-commons/src/scalar_value.rs diff --git a/native-engine/datafusion-ext-commons/src/arrow/array_size.rs b/native-engine/datafusion-ext-commons/src/arrow/array_size.rs index dd2186901..2496f447c 100644 --- a/native-engine/datafusion-ext-commons/src/arrow/array_size.rs +++ b/native-engine/datafusion-ext-commons/src/arrow/array_size.rs @@ -13,7 +13,8 @@ // limitations under the License. use arrow::{ - array::{Array, ArrayData}, + array::{Array, ArrayData, StructArray}, + buffer::Buffer, record_batch::RecordBatch, }; @@ -25,18 +26,21 @@ pub trait ArraySize { fn get_array_mem_size(&self) -> usize; } -impl ArraySize for dyn Array { +impl ArraySize for T { fn get_array_mem_size(&self) -> usize { get_array_data_mem_size(&self.to_data()) } } -impl ArraySize for RecordBatch { - fn get_array_mem_size(&self) -> usize { - self.columns() - .iter() - .map(|array| array.get_array_mem_size()) - .sum::() +pub trait BatchSize { + fn get_batch_mem_size(&self) -> usize; +} + +impl BatchSize for RecordBatch { + fn get_batch_mem_size(&self) -> usize { + let as_struct = StructArray::from(self.clone()); + let as_dyn_array: &dyn Array = &as_struct; + as_dyn_array.get_array_mem_size() } } @@ -44,8 +48,10 @@ fn get_array_data_mem_size(array_data: &ArrayData) -> usize { let mut mem_size = 0; for buffer in array_data.buffers() { - mem_size += buffer.len(); + mem_size += size_of::() + buffer.len().max(buffer.capacity()); } + + mem_size += size_of::>(); mem_size += array_data .nulls() .map(|nb| nb.buffer().len()) @@ -53,7 +59,7 @@ fn get_array_data_mem_size(array_data: &ArrayData) -> usize { // summing child data size for child in array_data.child_data() { - mem_size += get_array_data_mem_size(child); + mem_size += size_of::() + get_array_data_mem_size(child); } mem_size } diff --git a/native-engine/datafusion-ext-commons/src/lib.rs b/native-engine/datafusion-ext-commons/src/lib.rs index fbc41791e..2bda6c15a 100644 --- a/native-engine/datafusion-ext-commons/src/lib.rs +++ b/native-engine/datafusion-ext-commons/src/lib.rs @@ -28,6 +28,7 @@ pub mod arrow; pub mod hadoop_fs; pub mod hash; pub mod io; +pub mod scalar_value; pub mod spark_bit_array; pub mod spark_bloom_filter; pub mod spark_hash; diff --git a/native-engine/datafusion-ext-commons/src/scalar_value.rs b/native-engine/datafusion-ext-commons/src/scalar_value.rs new file mode 100644 index 000000000..a80b54473 --- /dev/null +++ b/native-engine/datafusion-ext-commons/src/scalar_value.rs @@ -0,0 +1,41 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::array::{Array, UInt64Array}; +use datafusion::{common, common::ScalarValue}; + +use crate::arrow::array_size::ArraySize; + +pub fn compacted_scalar_value_from_array( + array: &dyn Array, + i: usize, +) -> common::Result { + if array.data_type().is_nested() { + // avoid using sliced nested array for imprecise memory usage + let taken = + arrow::compute::take(array, &UInt64Array::new_scalar(i as u64).into_inner(), None)?; + ScalarValue::try_from_array(&taken, 0) + } else { + ScalarValue::try_from_array(array, i) + } +} + +pub fn scalar_value_heap_mem_size(value: &ScalarValue) -> usize { + match value { + ScalarValue::List(list) => list.as_ref().get_array_mem_size(), + ScalarValue::Map(map) => map.get_array_mem_size(), + ScalarValue::Struct(struct_) => struct_.get_array_mem_size(), + _ => value.size() - size_of::(), + } +} diff --git a/native-engine/datafusion-ext-functions/src/brickhouse/array_union.rs b/native-engine/datafusion-ext-functions/src/brickhouse/array_union.rs index 9b11942dd..96eabfea3 100644 --- a/native-engine/datafusion-ext-functions/src/brickhouse/array_union.rs +++ b/native-engine/datafusion-ext-functions/src/brickhouse/array_union.rs @@ -23,7 +23,9 @@ use datafusion::{ common::{Result, ScalarValue}, logical_expr::ColumnarValue, }; -use datafusion_ext_commons::{df_execution_err, downcast_any}; +use datafusion_ext_commons::{ + df_execution_err, downcast_any, scalar_value::compacted_scalar_value_from_array, +}; use itertools::Itertools; /// Return a list of unique entries, for a given set of lists. @@ -120,7 +122,7 @@ fn update_set(set: &mut HashSet, array: &ListArray, row_idx: usize) if array.is_valid(row_idx) { let values = array.value(row_idx); for i in 0..values.len() { - let scalar = ScalarValue::try_from_array(&values, i)?; + let scalar = compacted_scalar_value_from_array(&values, i)?; set.insert(scalar); } } diff --git a/native-engine/datafusion-ext-plans/src/agg/acc.rs b/native-engine/datafusion-ext-plans/src/agg/acc.rs index ba7dd705d..59b8f4bd4 100644 --- a/native-engine/datafusion-ext-plans/src/agg/acc.rs +++ b/native-engine/datafusion-ext-plans/src/agg/acc.rs @@ -31,6 +31,7 @@ use datafusion::{ use datafusion_ext_commons::{ assume, io::{read_bytes_slice, read_len, read_scalar, write_len, write_scalar}, + scalar_value::scalar_value_heap_mem_size, unchecked, }; use smallvec::SmallVec; @@ -391,7 +392,7 @@ impl AccGenericColumn { AccGenericColumn::Scalar { items, .. } => { idx_for! { (idx in idx) => { - heap_mem_used += items[idx].size() - size_of::(); + heap_mem_used += scalar_value_heap_mem_size(&items[idx]); } } } @@ -465,7 +466,7 @@ impl AccColumn for AccGenericColumn { heap_mem_used, } => { for idx in len..items.len() { - *heap_mem_used -= items[idx].size() - size_of::(); + *heap_mem_used -= scalar_value_heap_mem_size(&items[idx]); } items.resize_with(len, || { ScalarValue::try_from(&*dt).expect("unsupported data type: {dt:?}") @@ -495,17 +496,19 @@ impl AccColumn for AccGenericColumn { fn mem_used(&self) -> usize { match self { - AccGenericColumn::Prim { raw, valids, .. } => raw.capacity() + valids.capacity() / 8, + AccGenericColumn::Prim { raw, valids, .. } => { + raw.capacity() * 2 + valids.capacity() * 2 / 8 + } AccGenericColumn::Bytes { items, heap_mem_used, .. - } => heap_mem_used + items.capacity() * size_of::>(), + } => heap_mem_used + items.capacity() * 2 * size_of::>(), AccGenericColumn::Scalar { items, heap_mem_used, .. - } => heap_mem_used + items.capacity() * size_of::(), + } => heap_mem_used + items.capacity() * 2 * size_of::(), } } @@ -608,7 +611,7 @@ impl AccColumn for AccGenericColumn { } => { for (idx, cursor) in cursors.iter_mut().enumerate() { items[idx] = read_scalar(cursor, dt, true)?; - *heap_mem_used += items[idx].size() - size_of::(); + *heap_mem_used += scalar_value_heap_mem_size(&items[idx]); } } } @@ -706,7 +709,7 @@ impl AccColumn for AccGenericColumn { } => { for i in 0..num_rows { items[i] = read_scalar(r, dt, true)?; - *heap_mem_used += items[i].size() - size_of::(); + *heap_mem_used += scalar_value_heap_mem_size(&items[i]); } } } diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs b/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs index ebf566e1c..1eaf7e76c 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs @@ -57,8 +57,8 @@ impl Table { fn mem_size(&self) -> usize { size_of_val(self) - + self.map.capacity() * size_of::() - + self.keys.capacity() * size_of::() + + self.map.capacity() * 2 * size_of::() + + self.keys.capacity() * 2 * size_of::() + self.key_heap_mem_size } diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs index c6f122eb7..7b64b989a 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs @@ -625,7 +625,7 @@ impl MergingData { mem_used += self.key_rows_mem_size; // key rows memory usage mem_used += self.acc_table.mem_size(); // acc table memory usage mem_used += // sorting indices memory usage - self.entries.capacity() * size_of::<(u32, u32, u32, u32)>(); + self.entries.capacity() * 2 * size_of::<(u32, u32, u32, u32)>(); mem_used += self.num_records() * size_of::(); // overheads mem_used } diff --git a/native-engine/datafusion-ext-plans/src/agg/collect.rs b/native-engine/datafusion-ext-plans/src/agg/collect.rs index 79359169d..e81ee14f5 100644 --- a/native-engine/datafusion-ext-plans/src/agg/collect.rs +++ b/native-engine/datafusion-ext-plans/src/agg/collect.rs @@ -29,6 +29,7 @@ use datafusion::{ use datafusion_ext_commons::{ df_execution_err, downcast_any, io::{read_bytes_slice, read_len, read_scalar, write_len, write_scalar}, + scalar_value::compacted_scalar_value_from_array, }; use hashbrown::raw::RawTable; use smallvec::SmallVec; @@ -126,7 +127,7 @@ impl Agg for AggGenericCollect { idx_for_zipped! { ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => { - let scalar = ScalarValue::try_from_array(&partial_args[0], partial_arg_idx)?; + let scalar = compacted_scalar_value_from_array(&partial_args[0], partial_arg_idx)?; if !scalar.is_null() { accs.append_item(acc_idx, &scalar); } diff --git a/native-engine/datafusion-ext-plans/src/agg/count.rs b/native-engine/datafusion-ext-plans/src/agg/count.rs index b2431c4f6..5ab5bafb4 100644 --- a/native-engine/datafusion-ext-plans/src/agg/count.rs +++ b/native-engine/datafusion-ext-plans/src/agg/count.rs @@ -184,7 +184,7 @@ impl AccColumn for AccCountColumn { } fn mem_used(&self) -> usize { - self.values.capacity() * size_of::() + self.values.capacity() * 2 * size_of::() } fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec]) -> Result<()> { diff --git a/native-engine/datafusion-ext-plans/src/agg/first.rs b/native-engine/datafusion-ext-plans/src/agg/first.rs index c4ffeaced..0137933e9 100644 --- a/native-engine/datafusion-ext-plans/src/agg/first.rs +++ b/native-engine/datafusion-ext-plans/src/agg/first.rs @@ -24,7 +24,7 @@ use datafusion::{ common::{Result, ScalarValue}, physical_expr::PhysicalExpr, }; -use datafusion_ext_commons::downcast_any; +use datafusion_ext_commons::{downcast_any, scalar_value::compacted_scalar_value_from_array}; use crate::{ agg::{ @@ -136,7 +136,7 @@ impl Agg for AggFirst { ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => { if accs.flags.prim_valid(acc_idx) { accs.flags.set_prim_valid(acc_idx, true); - accs.values.scalar_values_mut()[acc_idx] = ScalarValue::try_from_array(partial_arg, partial_arg_idx)?; + accs.values.scalar_values_mut()[acc_idx] = compacted_scalar_value_from_array(partial_arg, partial_arg_idx)?; } } } diff --git a/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs b/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs index dac05a872..ff178e115 100644 --- a/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs +++ b/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs @@ -23,7 +23,7 @@ use datafusion::{ common::{Result, ScalarValue}, physical_expr::PhysicalExpr, }; -use datafusion_ext_commons::downcast_any; +use datafusion_ext_commons::{downcast_any, scalar_value::compacted_scalar_value_from_array}; use crate::{ agg::{ @@ -124,7 +124,7 @@ impl Agg for AggFirstIgnoresNull { idx_for_zipped! { ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => { if accs.scalar_values()[acc_idx].is_null() && partial_arg.is_valid(partial_arg_idx) { - accs.scalar_values_mut()[acc_idx] = ScalarValue::try_from_array(partial_arg, partial_arg_idx)?; + accs.scalar_values_mut()[acc_idx] = compacted_scalar_value_from_array(partial_arg, partial_arg_idx)?; } } } diff --git a/native-engine/datafusion-ext-plans/src/agg/maxmin.rs b/native-engine/datafusion-ext-plans/src/agg/maxmin.rs index b4873273c..ff86db83e 100644 --- a/native-engine/datafusion-ext-plans/src/agg/maxmin.rs +++ b/native-engine/datafusion-ext-plans/src/agg/maxmin.rs @@ -22,7 +22,7 @@ use std::{ use arrow::{array::*, datatypes::*}; use datafusion::{common::Result, physical_expr::PhysicalExpr, scalar::ScalarValue}; -use datafusion_ext_commons::downcast_any; +use datafusion_ext_commons::{downcast_any, scalar_value::compacted_scalar_value_from_array}; use paste::paste; use crate::{ @@ -173,8 +173,11 @@ impl Agg for AggMaxMin

{ _ => { idx_for_zipped! { ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => { - let partial_arg_scalar = ScalarValue::try_from_array(&partial_args[0], partial_arg_idx)?; - if !partial_arg_scalar.is_null() { + if partial_args[0].is_valid(partial_arg_idx) { + let partial_arg_scalar = compacted_scalar_value_from_array( + &partial_args[0], + partial_arg_idx, + )?; let acc_scalar = &mut accs.scalar_values_mut()[acc_idx]; if !acc_scalar.is_null() { if partial_arg_scalar.partial_cmp(acc_scalar) == Some(P::ORD) { diff --git a/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs b/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs index 2a4a70e5c..3f08a3f8e 100644 --- a/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs +++ b/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs @@ -38,7 +38,7 @@ use datafusion::{ PlanProperties, }, }; -use datafusion_ext_commons::arrow::coalesce::coalesce_batches_unchecked; +use datafusion_ext_commons::arrow::{array_size::BatchSize, coalesce::coalesce_batches_unchecked}; use futures::StreamExt; use once_cell::sync::OnceCell; @@ -168,7 +168,7 @@ pub fn execute_build_hash_map( staging_batches.push(batch.clone()); if smj_fallback_enabled { staging_num_rows += batch.num_rows(); - stating_mem_size += batch.get_array_memory_size(); + stating_mem_size += batch.get_batch_mem_size(); // fallback if staging data is too large if staging_num_rows > smj_fallback_rows_threshold diff --git a/native-engine/datafusion-ext-plans/src/common/execution_context.rs b/native-engine/datafusion-ext-plans/src/common/execution_context.rs index 9abc290b4..ccb607632 100644 --- a/native-engine/datafusion-ext-plans/src/common/execution_context.rs +++ b/native-engine/datafusion-ext-plans/src/common/execution_context.rs @@ -33,7 +33,7 @@ use datafusion::{ }, }; use datafusion_ext_commons::{ - arrow::{array_size::ArraySize, coalesce::coalesce_batches_unchecked}, + arrow::{array_size::BatchSize, coalesce::coalesce_batches_unchecked}, batch_size, df_execution_err, suggested_batch_mem_size, }; use futures::{Stream, StreamExt}; @@ -176,7 +176,7 @@ impl ExecutionContext { let num_rows = batch.num_rows(); if num_rows > 0 { self.staging_rows += batch.num_rows(); - self.staging_batches_mem_size += batch.get_array_mem_size(); + self.staging_batches_mem_size += batch.get_batch_mem_size(); self.staging_batches.push(batch); if self.should_flush() { let coalesced = self.coalesce()?; @@ -381,7 +381,7 @@ impl InputBatchStatistics { } pub fn record_input_batch(&self, input_batch: &RecordBatch) { - let mem_size = input_batch.get_array_mem_size(); + let mem_size = input_batch.get_batch_mem_size(); let num_rows = input_batch.num_rows(); self.input_batch_count.add(1); self.input_batch_mem_size.add(mem_size); diff --git a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs index 57128502d..4fcea636f 100644 --- a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs +++ b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs @@ -35,7 +35,7 @@ use datafusion::{ PlanProperties, SendableRecordBatchStream, Statistics, }, }; -use datafusion_ext_commons::arrow::array_size::ArraySize; +use datafusion_ext_commons::arrow::array_size::BatchSize; use jni::objects::GlobalRef; use once_cell::sync::OnceCell; @@ -183,7 +183,7 @@ fn read_ffi( struct_array.columns().to_vec(), &RecordBatchOptions::new().with_row_count(Some(struct_array.len())), )?; - size_counter.add(batch.get_array_mem_size()); + size_counter.add(batch.get_batch_mem_size()); exec_ctx_cloned .baseline_metrics() .record_output(batch.num_rows()); diff --git a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs index 97765d538..d8cf8c8dc 100644 --- a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs +++ b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs @@ -45,7 +45,10 @@ use datafusion::{ }, }; use datafusion_ext_commons::{ - arrow::{array_size::ArraySize, coalesce::coalesce_arrays_unchecked}, + arrow::{ + array_size::{ArraySize, BatchSize}, + coalesce::coalesce_arrays_unchecked, + }, batch_size, df_execution_err, suggested_batch_mem_size, }; use jni::objects::{GlobalRef, JObject}; @@ -228,7 +231,7 @@ fn read_ipc( )?; staging_num_rows.store(0, SeqCst); staging_mem_size.store(0, SeqCst); - size_counter.add(batch.get_array_mem_size()); + size_counter.add(batch.get_batch_mem_size()); exec_ctx.baseline_metrics().record_output(batch.num_rows()); sender.send(batch).await; } @@ -246,7 +249,7 @@ fn read_ipc( coalesced_cols, &RecordBatchOptions::new().with_row_count(Some(cur_staging_num_rows)), )?; - size_counter.add(batch.get_array_mem_size()); + size_counter.add(batch.get_batch_mem_size()); exec_ctx.baseline_metrics().record_output(batch.num_rows()); sender.send(batch).await; } diff --git a/native-engine/datafusion-ext-plans/src/joins/stream_cursor.rs b/native-engine/datafusion-ext-plans/src/joins/stream_cursor.rs index 3ffe500df..9f414ac53 100644 --- a/native-engine/datafusion-ext-plans/src/joins/stream_cursor.rs +++ b/native-engine/datafusion-ext-plans/src/joins/stream_cursor.rs @@ -27,7 +27,7 @@ use datafusion::{ physical_plan::metrics::Time, }; use datafusion_ext_commons::{ - arrow::{array_size::ArraySize, selection::take_batch}, + arrow::{array_size::BatchSize, selection::take_batch}, unlikely, }; use futures::{Future, StreamExt}; @@ -157,7 +157,7 @@ impl StreamCursor { &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())), )?; - self.mem_size += projected_batch.get_array_mem_size(); + self.mem_size += projected_batch.get_batch_mem_size(); self.mem_size += key_has_nulls .as_ref() .map(|nb| nb.buffer().len()) @@ -171,7 +171,7 @@ impl StreamCursor { // fill out-dated batches with null batches while unlikely!(self.num_null_batches < self.min_reserved_idx.0) { let i = self.num_null_batches; - self.mem_size -= self.projected_batches[i].get_array_mem_size(); + self.mem_size -= self.projected_batches[i].get_batch_mem_size(); self.mem_size -= self.key_has_nulls[i] .as_ref() .map(|nb| nb.buffer().len()) diff --git a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs index bddf5e73d..8f75ea0a9 100644 --- a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs +++ b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs @@ -39,7 +39,7 @@ use datafusion::{ }, }; use datafusion_ext_commons::{ - arrow::{array_size::ArraySize, cast::cast}, + arrow::{array_size::BatchSize, cast::cast}, df_execution_err, hadoop_fs::{FsDataOutputWrapper, FsProvider}, }; @@ -269,7 +269,7 @@ fn execute_parquet_sink( } // compute sub batch size - let batch_mem_size = batch.get_array_mem_size(); + let batch_mem_size = batch.get_batch_mem_size(); let num_sub_batches = (batch_mem_size / 1048576).max(1); let num_sub_batch_rows = (batch.num_rows() / num_sub_batches).max(16); diff --git a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs index f25b22a65..b1ba48ab4 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs @@ -22,7 +22,7 @@ use datafusion::{common::Result, physical_plan::metrics::Time}; use datafusion_ext_commons::{ algorithm::rdx_sort::radix_sort_by_key, arrow::{ - array_size::ArraySize, + array_size::BatchSize, selection::{create_batch_interleaver, BatchInterleaver}, }, compute_suggested_batch_size_for_output, df_execution_err, @@ -88,7 +88,7 @@ impl BufferedData { // first add to staging, mem used is doubled for later sorting self.num_rows += batch.num_rows(); self.staging_num_rows += batch.num_rows(); - self.staging_mem_used += batch.get_array_mem_size() * 2; + self.staging_mem_used += batch.get_batch_mem_size() * 2; self.staging_batches.push(batch); let suggested_batch_size = @@ -111,7 +111,7 @@ impl BufferedData { self.staging_num_rows = 0; self.staging_mem_used = 0; - self.sorted_mem_used += sorted_batch.get_array_mem_size() + offsets.len() * 4; + self.sorted_mem_used += sorted_batch.get_batch_mem_size() + offsets.len() * 4; self.sorted_batches.push(sorted_batch); self.sorted_offsets.push(offsets); Ok(()) diff --git a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs index 31ed7503c..46264943f 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs @@ -34,7 +34,7 @@ use datafusion::{ physical_expr::{PhysicalExpr, PhysicalSortExpr}, physical_plan::SendableRecordBatchStream, }; -use datafusion_ext_commons::{arrow::array_size::ArraySize, spark_hash::create_murmur3_hashes}; +use datafusion_ext_commons::{arrow::array_size::BatchSize, spark_hash::create_murmur3_hashes}; use futures::StreamExt; use parking_lot::Mutex as SyncMutex; @@ -72,7 +72,7 @@ impl dyn ShuffleRepartitioner { while let Some(batch) = coalesced.next().await.transpose()? { let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer(); let batch_num_rows = batch.num_rows(); - let batch_mem_size = batch.get_array_mem_size(); + let batch_mem_size = batch.get_batch_mem_size(); if batches_num_rows.load(SeqCst) == 0 { log::info!( "start shuffle writing, first batch num_rows={}, mem_size={}", diff --git a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs index bb17b692d..5c8924e96 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs @@ -17,7 +17,7 @@ use std::sync::Weak; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::{common::Result, physical_plan::metrics::Time}; -use datafusion_ext_commons::arrow::array_size::ArraySize; +use datafusion_ext_commons::arrow::array_size::BatchSize; use futures::lock::Mutex; use jni::objects::GlobalRef; @@ -89,7 +89,7 @@ impl Drop for RssSortShuffleRepartitioner { impl ShuffleRepartitioner for RssSortShuffleRepartitioner { async fn insert_batch(&self, input: RecordBatch) -> Result<()> { // update memory usage before adding to buffered data - let mem_used = self.data.lock().await.mem_used() + input.get_array_mem_size() * 2; + let mem_used = self.data.lock().await.mem_used() + input.get_batch_mem_size() * 2; self.update_mem_used(mem_used).await?; // add batch to buffered data diff --git a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs index 41f022115..c41354548 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs @@ -25,7 +25,7 @@ use datafusion::{ common::{DataFusionError, Result}, physical_plan::metrics::Time, }; -use datafusion_ext_commons::{arrow::array_size::ArraySize, df_execution_err}; +use datafusion_ext_commons::{arrow::array_size::BatchSize, df_execution_err}; use futures::lock::Mutex; use crate::{ @@ -122,7 +122,7 @@ impl Drop for SortShuffleRepartitioner { impl ShuffleRepartitioner for SortShuffleRepartitioner { async fn insert_batch(&self, input: RecordBatch) -> Result<()> { // update memory usage before adding to buffered data - let mem_used = self.data.lock().await.mem_used() + input.get_array_mem_size() * 2; + let mem_used = self.data.lock().await.mem_used() + input.get_batch_mem_size() * 2; self.update_mem_used(mem_used).await?; // add batch to buffered data diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs b/native-engine/datafusion-ext-plans/src/sort_exec.rs index fc873aae0..8bf071274 100644 --- a/native-engine/datafusion-ext-plans/src/sort_exec.rs +++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs @@ -49,7 +49,7 @@ use datafusion::{ use datafusion_ext_commons::{ algorithm::loser_tree::{ComparableForLoserTree, LoserTree}, arrow::{ - array_size::ArraySize, + array_size::BatchSize, selection::{create_batch_interleaver, take_batch, BatchInterleaver}, }, compute_suggested_batch_size_for_kway_merge, @@ -348,7 +348,7 @@ impl BufferedData { create_zero_column_batch(num_rows) }; - self.sorted_batches_mem_used += sorted_batch.get_array_mem_size(); + self.sorted_batches_mem_used += sorted_batch.get_batch_mem_size(); self.sorted_key_stores_mem_used += sorted_key_store.len(); self.sorted_key_stores.push(sorted_key_store.into()); @@ -577,10 +577,10 @@ impl ExternalSorter { } self.num_total_rows.fetch_add(batch.num_rows(), SeqCst); self.mem_total_size - .fetch_add(batch.get_array_mem_size(), SeqCst); + .fetch_add(batch.get_batch_mem_size(), SeqCst); // update memory usage before adding to data - let mem_used = self.data.lock().await.mem_used() + batch.get_array_mem_size() * 2; + let mem_used = self.data.lock().await.mem_used() + batch.get_batch_mem_size() * 2; self.update_mem_used(mem_used).await?; // add batch to data @@ -752,7 +752,7 @@ impl<'a> SpillCursor<'a> { cols, &RecordBatchOptions::new().with_row_count(Some(num_rows)), )?; - self.cur_mem_used += batch.get_array_mem_size(); + self.cur_mem_used += batch.get_batch_mem_size(); self.cur_batch_num_rows = batch.num_rows(); self.cur_loaded_num_rows = 0; self.cur_batches.push(batch); @@ -779,7 +779,7 @@ impl<'a> SpillCursor<'a> { fn clear_finished_batches(&mut self) { if self.cur_batch_idx > 0 { for batch in self.cur_batches.drain(..self.cur_batch_idx) { - self.cur_mem_used -= batch.get_array_mem_size(); + self.cur_mem_used -= batch.get_batch_mem_size(); } self.cur_batch_idx = 0; }