Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 95 additions & 13 deletions datafusion/expr/src/higher_order_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::expr::{
use crate::type_coercion::functions::value_fields_with_higher_order_udf;
use crate::udf_eq::UdfEq;
use crate::{ColumnarValue, Documentation, Expr, ExprSchemable};
use arrow::array::{ArrayRef, RecordBatch};
use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow::datatypes::{DataType, FieldRef, Schema};
use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -239,6 +239,15 @@ pub struct LambdaArgument {
/// For example, for `array_transform([2], v -> -v)`,
/// this will be `vec![Field::new("v", DataType::Int32, true)]`
params: Vec<FieldRef>,
/// Indices into `params` of the parameters that are actually referenced
/// by `body` (taking nested-lambda shadowing into account).
///
/// `None` means "no information, assume every declared parameter is used"
/// — that is the backwards-compatible behavior of [`Self::new`]. When set,
/// [`Self::evaluate`] skips evaluating and pushing the closures for the
/// parameters not listed here, so unused declared parameters do not shift
/// the columns the body's compressed indices expect.
used_param_indices: Option<Vec<usize>>,
/// The body of the lambda
///
/// For example, for `array_transform([2], v -> -v)`,
Expand All @@ -257,26 +266,64 @@ pub struct LambdaArgument {
}

impl LambdaArgument {
/// Build a [`LambdaArgument`] that treats every declared parameter as
/// used. This is the backwards-compatible behavior. Prefer
/// [`Self::new_with_used_params`] when the caller knows which subset of
/// the lambda's parameters the body actually references — otherwise the
/// merged batch will still contain columns for unused parameters.
pub fn new(
params: Vec<FieldRef>,
body: Arc<dyn PhysicalExpr>,
captures: Option<RecordBatch>,
) -> Self {
Self::new_with_used_params(params, body, captures, None)
}

/// Build a [`LambdaArgument`] knowing which subset of `params` (by name)
/// the lambda body actually references.
///
/// When `used_params` is `Some(set)`, [`Self::evaluate`] only evaluates
/// and pushes the closures whose corresponding parameter name is in
/// `set`, in the original declaration order of `params`. Unused declared
/// parameters leave no slot in the merged batch, so the body's compressed
/// column indices line up directly. When `used_params` is `None`,
/// behavior is identical to [`Self::new`].
pub fn new_with_used_params(
params: Vec<FieldRef>,
body: Arc<dyn PhysicalExpr>,
captures: Option<RecordBatch>,
used_params: Option<HashSet<String>>,
) -> Self {
let used_param_indices = used_params.map(|set| {
params
.iter()
.enumerate()
.filter(|(_, f)| set.contains(f.name()))
.map(|(i, _)| i)
.collect::<Vec<_>>()
});

let effective_params: Vec<FieldRef> = match &used_param_indices {
Some(indices) => indices.iter().map(|i| Arc::clone(&params[*i])).collect(),
None => params.clone(),
};

let fields = match &captures {
Some(batch) => batch
.schema_ref()
.fields()
.iter()
.cloned()
.chain(params.clone())
.chain(effective_params.iter().cloned())
.collect(),
None => params.clone(),
None => effective_params,
};

let schema = Arc::new(Schema::new(fields));

Self {
params,
used_param_indices,
body,
schema,
captures,
Expand Down Expand Up @@ -344,6 +391,7 @@ impl LambdaArgument {
spread_captures.as_ref(),
Arc::clone(&self.schema),
&self.params,
self.used_param_indices.as_deref(),
args,
)?;

Expand All @@ -355,6 +403,7 @@ fn merge_captures_with_variables(
captures: Option<&RecordBatch>,
schema: SchemaRef,
params: &[FieldRef],
used_param_indices: Option<&[usize]>,
variables: &[&dyn Fn() -> Result<ArrayRef>],
) -> Result<RecordBatch> {
if variables.len() < params.len() {
Expand All @@ -365,23 +414,56 @@ fn merge_captures_with_variables(
);
}

let push_param_arrays = |columns: &mut Vec<ArrayRef>| -> Result<()> {
match used_param_indices {
Some(indices) => {
for &i in indices {
columns.push(variables[i]()?);
}
}
None => {
for arg in &variables[..params.len()] {
columns.push(arg()?);
}
}
}
Ok(())
};

let columns = match captures {
Some(captures) => {
let mut columns = captures.columns().to_vec();

for arg in &variables[..params.len()] {
columns.push(arg()?);
}

push_param_arrays(&mut columns)?;
columns
}
None => {
let mut columns = Vec::with_capacity(
used_param_indices
.map(<[usize]>::len)
.unwrap_or(params.len()),
);
push_param_arrays(&mut columns)?;
columns
}
None => variables
.iter()
.take(params.len())
.map(|arg| arg())
.collect::<Result<_>>()?,
};

if columns.is_empty() {
// Constant lambda body with no captures and no used parameters. We
// still need a row count for the merged batch, so evaluate one
// variable just to derive it. This is essentially free in the common
// case (the variables already exist as closures over arrays the
// caller computed up front).
let row_count = match variables.first() {
Some(first) => first()?.len(),
None => 0,
};
return Ok(RecordBatch::try_new_with_options(
schema,
vec![],
&RecordBatchOptions::new().with_row_count(Some(row_count)),
)?);
}

Ok(RecordBatch::try_new(schema, columns)?)
}

Expand Down
Loading