Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions native-engine/datafusion-ext-commons/src/arrow/array_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
// limitations under the License.

use arrow::{
array::{Array, ArrayData},
array::{Array, ArrayData, StructArray},
buffer::Buffer,
record_batch::RecordBatch,
};

Expand All @@ -25,35 +26,40 @@ pub trait ArraySize {
fn get_array_mem_size(&self) -> usize;
}

impl ArraySize for dyn Array {
impl<T: ?Sized + Array> ArraySize for T {
fn get_array_mem_size(&self) -> usize {
get_array_data_mem_size(&self.to_data())
}
}

impl ArraySize for RecordBatch {
fn get_array_mem_size(&self) -> usize {
self.columns()
.iter()
.map(|array| array.get_array_mem_size())
.sum::<usize>()
pub trait BatchSize {
fn get_batch_mem_size(&self) -> usize;
}

impl BatchSize for RecordBatch {
fn get_batch_mem_size(&self) -> usize {
let as_struct = StructArray::from(self.clone());
let as_dyn_array: &dyn Array = &as_struct;
as_dyn_array.get_array_mem_size()
}
}

fn get_array_data_mem_size(array_data: &ArrayData) -> usize {
let mut mem_size = 0;

for buffer in array_data.buffers() {
mem_size += buffer.len();
mem_size += size_of::<Buffer>() + buffer.len().max(buffer.capacity());
}

mem_size += size_of::<Option<Buffer>>();
mem_size += array_data
.nulls()
.map(|nb| nb.buffer().len())
.unwrap_or_default();

// summing child data size
for child in array_data.child_data() {
mem_size += get_array_data_mem_size(child);
mem_size += size_of::<ArrayData>() + get_array_data_mem_size(child);
}
mem_size
}
1 change: 1 addition & 0 deletions native-engine/datafusion-ext-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod arrow;
pub mod hadoop_fs;
pub mod hash;
pub mod io;
pub mod scalar_value;
pub mod spark_bit_array;
pub mod spark_bloom_filter;
pub mod spark_hash;
Expand Down
41 changes: 41 additions & 0 deletions native-engine/datafusion-ext-commons/src/scalar_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2022 The Blaze Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::array::{Array, UInt64Array};
use datafusion::{common, common::ScalarValue};

use crate::arrow::array_size::ArraySize;

pub fn compacted_scalar_value_from_array(
array: &dyn Array,
i: usize,
) -> common::Result<ScalarValue> {
if array.data_type().is_nested() {
// avoid using sliced nested array for imprecise memory usage
let taken =
arrow::compute::take(array, &UInt64Array::new_scalar(i as u64).into_inner(), None)?;
ScalarValue::try_from_array(&taken, 0)
} else {
ScalarValue::try_from_array(array, i)
}
}

pub fn scalar_value_heap_mem_size(value: &ScalarValue) -> usize {
match value {
ScalarValue::List(list) => list.as_ref().get_array_mem_size(),
ScalarValue::Map(map) => map.get_array_mem_size(),
ScalarValue::Struct(struct_) => struct_.get_array_mem_size(),
_ => value.size() - size_of::<ScalarValue>(),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use datafusion::{
common::{Result, ScalarValue},
logical_expr::ColumnarValue,
};
use datafusion_ext_commons::{df_execution_err, downcast_any};
use datafusion_ext_commons::{
df_execution_err, downcast_any, scalar_value::compacted_scalar_value_from_array,
};
use itertools::Itertools;

/// Return a list of unique entries, for a given set of lists.
Expand Down Expand Up @@ -120,7 +122,7 @@ fn update_set(set: &mut HashSet<ScalarValue>, array: &ListArray, row_idx: usize)
if array.is_valid(row_idx) {
let values = array.value(row_idx);
for i in 0..values.len() {
let scalar = ScalarValue::try_from_array(&values, i)?;
let scalar = compacted_scalar_value_from_array(&values, i)?;
set.insert(scalar);
}
}
Expand Down
17 changes: 10 additions & 7 deletions native-engine/datafusion-ext-plans/src/agg/acc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion::{
use datafusion_ext_commons::{
assume,
io::{read_bytes_slice, read_len, read_scalar, write_len, write_scalar},
scalar_value::scalar_value_heap_mem_size,
unchecked,
};
use smallvec::SmallVec;
Expand Down Expand Up @@ -391,7 +392,7 @@ impl AccGenericColumn {
AccGenericColumn::Scalar { items, .. } => {
idx_for! {
(idx in idx) => {
heap_mem_used += items[idx].size() - size_of::<ScalarValue>();
heap_mem_used += scalar_value_heap_mem_size(&items[idx]);
}
}
}
Expand Down Expand Up @@ -465,7 +466,7 @@ impl AccColumn for AccGenericColumn {
heap_mem_used,
} => {
for idx in len..items.len() {
*heap_mem_used -= items[idx].size() - size_of::<ScalarValue>();
*heap_mem_used -= scalar_value_heap_mem_size(&items[idx]);
}
items.resize_with(len, || {
ScalarValue::try_from(&*dt).expect("unsupported data type: {dt:?}")
Expand Down Expand Up @@ -495,17 +496,19 @@ impl AccColumn for AccGenericColumn {

fn mem_used(&self) -> usize {
match self {
AccGenericColumn::Prim { raw, valids, .. } => raw.capacity() + valids.capacity() / 8,
AccGenericColumn::Prim { raw, valids, .. } => {
raw.capacity() * 2 + valids.capacity() * 2 / 8
}
AccGenericColumn::Bytes {
items,
heap_mem_used,
..
} => heap_mem_used + items.capacity() * size_of::<Option<AccBytes>>(),
} => heap_mem_used + items.capacity() * 2 * size_of::<Option<AccBytes>>(),
AccGenericColumn::Scalar {
items,
heap_mem_used,
..
} => heap_mem_used + items.capacity() * size_of::<ScalarValue>(),
} => heap_mem_used + items.capacity() * 2 * size_of::<ScalarValue>(),
}
}

Expand Down Expand Up @@ -608,7 +611,7 @@ impl AccColumn for AccGenericColumn {
} => {
for (idx, cursor) in cursors.iter_mut().enumerate() {
items[idx] = read_scalar(cursor, dt, true)?;
*heap_mem_used += items[idx].size() - size_of::<ScalarValue>();
*heap_mem_used += scalar_value_heap_mem_size(&items[idx]);
}
}
}
Expand Down Expand Up @@ -706,7 +709,7 @@ impl AccColumn for AccGenericColumn {
} => {
for i in 0..num_rows {
items[i] = read_scalar(r, dt, true)?;
*heap_mem_used += items[i].size() - size_of::<ScalarValue>();
*heap_mem_used += scalar_value_heap_mem_size(&items[i]);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ impl Table {

fn mem_size(&self) -> usize {
size_of_val(self)
+ self.map.capacity() * size_of::<MapValueGroup>()
+ self.keys.capacity() * size_of::<OwnedKey>()
+ self.map.capacity() * 2 * size_of::<MapValueGroup>()
+ self.keys.capacity() * 2 * size_of::<OwnedKey>()
+ self.key_heap_mem_size
}

Expand Down
2 changes: 1 addition & 1 deletion native-engine/datafusion-ext-plans/src/agg/agg_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ impl MergingData {
mem_used += self.key_rows_mem_size; // key rows memory usage
mem_used += self.acc_table.mem_size(); // acc table memory usage
mem_used += // sorting indices memory usage
self.entries.capacity() * size_of::<(u32, u32, u32, u32)>();
self.entries.capacity() * 2 * size_of::<(u32, u32, u32, u32)>();
mem_used += self.num_records() * size_of::<u64>(); // overheads
mem_used
}
Expand Down
3 changes: 2 additions & 1 deletion native-engine/datafusion-ext-plans/src/agg/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::{
use datafusion_ext_commons::{
df_execution_err, downcast_any,
io::{read_bytes_slice, read_len, read_scalar, write_len, write_scalar},
scalar_value::compacted_scalar_value_from_array,
};
use hashbrown::raw::RawTable;
use smallvec::SmallVec;
Expand Down Expand Up @@ -126,7 +127,7 @@ impl<C: AccCollectionColumn> Agg for AggGenericCollect<C> {

idx_for_zipped! {
((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => {
let scalar = ScalarValue::try_from_array(&partial_args[0], partial_arg_idx)?;
let scalar = compacted_scalar_value_from_array(&partial_args[0], partial_arg_idx)?;
if !scalar.is_null() {
accs.append_item(acc_idx, &scalar);
}
Expand Down
2 changes: 1 addition & 1 deletion native-engine/datafusion-ext-plans/src/agg/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl AccColumn for AccCountColumn {
}

fn mem_used(&self) -> usize {
self.values.capacity() * size_of::<i64>()
self.values.capacity() * 2 * size_of::<i64>()
}

fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions native-engine/datafusion-ext-plans/src/agg/first.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::{
common::{Result, ScalarValue},
physical_expr::PhysicalExpr,
};
use datafusion_ext_commons::downcast_any;
use datafusion_ext_commons::{downcast_any, scalar_value::compacted_scalar_value_from_array};

use crate::{
agg::{
Expand Down Expand Up @@ -136,7 +136,7 @@ impl Agg for AggFirst {
((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => {
if accs.flags.prim_valid(acc_idx) {
accs.flags.set_prim_valid(acc_idx, true);
accs.values.scalar_values_mut()[acc_idx] = ScalarValue::try_from_array(partial_arg, partial_arg_idx)?;
accs.values.scalar_values_mut()[acc_idx] = compacted_scalar_value_from_array(partial_arg, partial_arg_idx)?;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion::{
common::{Result, ScalarValue},
physical_expr::PhysicalExpr,
};
use datafusion_ext_commons::downcast_any;
use datafusion_ext_commons::{downcast_any, scalar_value::compacted_scalar_value_from_array};

use crate::{
agg::{
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Agg for AggFirstIgnoresNull {
idx_for_zipped! {
((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => {
if accs.scalar_values()[acc_idx].is_null() && partial_arg.is_valid(partial_arg_idx) {
accs.scalar_values_mut()[acc_idx] = ScalarValue::try_from_array(partial_arg, partial_arg_idx)?;
accs.scalar_values_mut()[acc_idx] = compacted_scalar_value_from_array(partial_arg, partial_arg_idx)?;
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions native-engine/datafusion-ext-plans/src/agg/maxmin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{

use arrow::{array::*, datatypes::*};
use datafusion::{common::Result, physical_expr::PhysicalExpr, scalar::ScalarValue};
use datafusion_ext_commons::downcast_any;
use datafusion_ext_commons::{downcast_any, scalar_value::compacted_scalar_value_from_array};
use paste::paste;

use crate::{
Expand Down Expand Up @@ -173,8 +173,11 @@ impl<P: AggMaxMinParams> Agg for AggMaxMin<P> {
_ => {
idx_for_zipped! {
((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => {
let partial_arg_scalar = ScalarValue::try_from_array(&partial_args[0], partial_arg_idx)?;
if !partial_arg_scalar.is_null() {
if partial_args[0].is_valid(partial_arg_idx) {
let partial_arg_scalar = compacted_scalar_value_from_array(
&partial_args[0],
partial_arg_idx,
)?;
let acc_scalar = &mut accs.scalar_values_mut()[acc_idx];
if !acc_scalar.is_null() {
if partial_arg_scalar.partial_cmp(acc_scalar) == Some(P::ORD) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion::{
PlanProperties,
},
};
use datafusion_ext_commons::arrow::coalesce::coalesce_batches_unchecked;
use datafusion_ext_commons::arrow::{array_size::BatchSize, coalesce::coalesce_batches_unchecked};
use futures::StreamExt;
use once_cell::sync::OnceCell;

Expand Down Expand Up @@ -168,7 +168,7 @@ pub fn execute_build_hash_map(
staging_batches.push(batch.clone());
if smj_fallback_enabled {
staging_num_rows += batch.num_rows();
stating_mem_size += batch.get_array_memory_size();
stating_mem_size += batch.get_batch_mem_size();

// fallback if staging data is too large
if staging_num_rows > smj_fallback_rows_threshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion::{
},
};
use datafusion_ext_commons::{
arrow::{array_size::ArraySize, coalesce::coalesce_batches_unchecked},
arrow::{array_size::BatchSize, coalesce::coalesce_batches_unchecked},
batch_size, df_execution_err, suggested_batch_mem_size,
};
use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -176,7 +176,7 @@ impl ExecutionContext {
let num_rows = batch.num_rows();
if num_rows > 0 {
self.staging_rows += batch.num_rows();
self.staging_batches_mem_size += batch.get_array_mem_size();
self.staging_batches_mem_size += batch.get_batch_mem_size();
self.staging_batches.push(batch);
if self.should_flush() {
let coalesced = self.coalesce()?;
Expand Down Expand Up @@ -381,7 +381,7 @@ impl InputBatchStatistics {
}

pub fn record_input_batch(&self, input_batch: &RecordBatch) {
let mem_size = input_batch.get_array_mem_size();
let mem_size = input_batch.get_batch_mem_size();
let num_rows = input_batch.num_rows();
self.input_batch_count.add(1);
self.input_batch_mem_size.add(mem_size);
Expand Down
4 changes: 2 additions & 2 deletions native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion::{
PlanProperties, SendableRecordBatchStream, Statistics,
},
};
use datafusion_ext_commons::arrow::array_size::ArraySize;
use datafusion_ext_commons::arrow::array_size::BatchSize;
use jni::objects::GlobalRef;
use once_cell::sync::OnceCell;

Expand Down Expand Up @@ -183,7 +183,7 @@ fn read_ffi(
struct_array.columns().to_vec(),
&RecordBatchOptions::new().with_row_count(Some(struct_array.len())),
)?;
size_counter.add(batch.get_array_mem_size());
size_counter.add(batch.get_batch_mem_size());
exec_ctx_cloned
.baseline_metrics()
.record_output(batch.num_rows());
Expand Down
9 changes: 6 additions & 3 deletions native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ use datafusion::{
},
};
use datafusion_ext_commons::{
arrow::{array_size::ArraySize, coalesce::coalesce_arrays_unchecked},
arrow::{
array_size::{ArraySize, BatchSize},
coalesce::coalesce_arrays_unchecked,
},
batch_size, df_execution_err, suggested_batch_mem_size,
};
use jni::objects::{GlobalRef, JObject};
Expand Down Expand Up @@ -228,7 +231,7 @@ fn read_ipc(
)?;
staging_num_rows.store(0, SeqCst);
staging_mem_size.store(0, SeqCst);
size_counter.add(batch.get_array_mem_size());
size_counter.add(batch.get_batch_mem_size());
exec_ctx.baseline_metrics().record_output(batch.num_rows());
sender.send(batch).await;
}
Expand All @@ -246,7 +249,7 @@ fn read_ipc(
coalesced_cols,
&RecordBatchOptions::new().with_row_count(Some(cur_staging_num_rows)),
)?;
size_counter.add(batch.get_array_mem_size());
size_counter.add(batch.get_batch_mem_size());
exec_ctx.baseline_metrics().record_output(batch.num_rows());
sender.send(batch).await;
}
Expand Down
Loading
Loading