Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ unit_arg = "allow"
manual_repeat_n = "allow"
manual_div_ceil = "allow"

# Condition/Logic
collapsible_if = "allow"
collapsible_else_if = "allow"
if_same_then_else = "allow"
match_like_matches_macro = "allow"
explicit_auto_deref = "allow"
bool_assert_comparison = "allow"

# Naming/Structure/Remaining
upper_case_acronyms = "allow"
module_inception = "allow"
Expand Down
14 changes: 5 additions & 9 deletions native-engine/datafusion-ext-functions/src/spark_bround.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,12 @@ fn round_half_even_f32(x: f32) -> f32 {
let sign = x.signum();
let ax = x.abs();
let f = ax.floor();
let diff = ax - f;

let rounded = if diff > 0.5 {
f + 1.0
} else if diff < 0.5 {
f
} else if ((f as i32) & 1) == 0 {
f
} else {
f + 1.0
let rounded = match ax - f {
diff if diff > 0.5 => f + 1.0,
diff if diff < 0.5 => f,
_ if (f as i32) & 1 == 0 => f,
_ => f + 1.0,
};

rounded.copysign(sign)
Expand Down
7 changes: 3 additions & 4 deletions native-engine/datafusion-ext-functions/src/spark_strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,9 @@ pub fn string_concat(args: &[ColumnarValue]) -> Result<ColumnarValue> {
} else {
// short avenue with only scalars
// returns null if args contains null
let is_not_null = args.iter().all(|arg| match arg {
ColumnarValue::Scalar(scalar) if scalar.is_null() => false,
_ => true,
});
let is_not_null = args
.iter()
.all(|arg| !matches!(arg, ColumnarValue::Scalar(scalar) if scalar.is_null()));
if !is_not_null {
return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)));
}
Expand Down
17 changes: 9 additions & 8 deletions native-engine/datafusion-ext-plans/src/agg/agg_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,18 @@ impl AggTable {
}

// trigger partial skipping if memory usage is too high
if self.agg_ctx.partial_skipping_skip_spill && self.mem_used_percent() > 0.8 {
if self.agg_ctx.supports_partial_skipping {
return df_execution_err!("AGG_TRIGGER_PARTIAL_SKIPPING");
}
if self.agg_ctx.partial_skipping_skip_spill
&& self.mem_used_percent() > 0.8
&& self.agg_ctx.supports_partial_skipping
{
return df_execution_err!("AGG_TRIGGER_PARTIAL_SKIPPING");
}

// check for partial skipping by cardinality ratio
if in_mem.num_records() >= self.agg_ctx.partial_skipping_min_rows {
if in_mem.check_trigger_partial_skipping() {
return df_execution_err!("AGG_TRIGGER_PARTIAL_SKIPPING");
}
if in_mem.num_records() >= self.agg_ctx.partial_skipping_min_rows
&& in_mem.check_trigger_partial_skipping()
{
return df_execution_err!("AGG_TRIGGER_PARTIAL_SKIPPING");
}

// update memory usage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,10 @@ impl CachedExprsEvaluator {
} else {
Arc::new(uda.filter(selected)?)
}
} else if let Some(previous_selected) = &previous_selected {
filter(&scatter(previous_selected, array)?, selected)?
} else {
if let Some(previous_selected) = &previous_selected {
filter(&scatter(previous_selected, array)?, selected)?
} else {
filter(&array, selected)?
}
filter(&array, selected)?
}
})));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,15 @@ impl ExecutionContext {

// short path for not coalescable batches
let batch_num_rows = batch.batch().num_rows();
if self.staging_batches.is_empty() {
if batch_num_rows > batch_size() / 4 {
return Poll::Ready(Some(Ok(batch)));
}
if self.staging_batches.is_empty() && batch_num_rows > batch_size() / 4
{
return Poll::Ready(Some(Ok(batch)));
}
let batch_mem_size = batch.batch().get_batch_mem_size();
if self.staging_batches.is_empty() {
if batch_mem_size >= suggested_batch_mem_size() / 4 {
return Poll::Ready(Some(Ok(batch)));
}
if self.staging_batches.is_empty()
&& batch_mem_size >= suggested_batch_mem_size() / 4
{
return Poll::Ready(Some(Ok(batch)));
}

self.staging_rows += batch_num_rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,10 +746,10 @@ mod tests {
// Row 2: (null, "Charlie") - has null -> should be false (invalid)
// Row 3: (4, "David") - no nulls -> should be true (valid)
assert_eq!(null_buffer.len(), 4);
assert_eq!(null_buffer.is_valid(0), true); // No nulls
assert_eq!(null_buffer.is_valid(1), false); // Has null in name
assert_eq!(null_buffer.is_valid(2), false); // Has null in id
assert_eq!(null_buffer.is_valid(3), true); // No nulls
assert!(null_buffer.is_valid(0)); // No nulls
assert!(!null_buffer.is_valid(1)); // Has null in name
assert!(!null_buffer.is_valid(2)); // Has null in id
assert!(null_buffer.is_valid(3)); // No nulls
Ok(())
}

Expand Down Expand Up @@ -815,7 +815,7 @@ mod tests {
// All rows should be invalid (false) since they all contain nulls
assert_eq!(null_buffer.len(), 3);
for i in 0..3 {
assert_eq!(null_buffer.is_valid(i), false);
assert!(!null_buffer.is_valid(i));
}
Ok(())
}
Expand Down Expand Up @@ -856,7 +856,7 @@ mod tests {
// All rows should be valid (true) since none contain nulls
assert_eq!(null_buffer.len(), 3);
for i in 0..3 {
assert_eq!(null_buffer.is_valid(i), true);
assert!(null_buffer.is_valid(i));
}
Ok(())
}
Expand Down
8 changes: 3 additions & 5 deletions native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,10 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {
if let Some(&map_idx) = eqs.next() {
if P.probe_is_join_side {
probed_joined.set(row_idx, true);
} else {
if !map_joined[map_idx as usize] {
} else if !map_joined[map_idx as usize] {
map_joined.set(map_idx as usize, true);
for &map_idx in eqs {
map_joined.set(map_idx as usize, true);
for &map_idx in eqs {
map_joined.set(map_idx as usize, true);
}
}
// otherwise all map records with this key
// should
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl SingleShuffleRepartitioner {
impl ShuffleRepartitioner for SingleShuffleRepartitioner {
async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
let mut output_data = self.output_data.lock().await;
let output_writer = self.get_output_writer(&mut *output_data)?;
let output_writer = self.get_output_writer(&mut output_data)?;
output_writer.write_batch(input.num_rows(), input.columns())?;
Ok(())
}
Expand Down
14 changes: 7 additions & 7 deletions native-engine/datafusion-ext-plans/src/sort_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ async fn send_output_batch(
} else if let Ok(sender) = downcast_any!(sender, WrappedRecordBatchWithKeyRowsSender) {
let key_rows = Arc::new(key_collector.into_rows(
pruned_batch.num_rows(),
&*prune_sort_keys_from_batch.sort_row_converter.lock(),
&prune_sort_keys_from_batch.sort_row_converter.lock(),
)?);
let batch =
prune_sort_keys_from_batch.restore_from_existed_key_rows(pruned_batch, &key_rows)?;
Expand Down Expand Up @@ -844,11 +844,11 @@ impl<B: SortedBlock> SortedBlockCursor<B> {
"calling next_key() on finished sort spill cursor"
);

if self.cur_key_row_idx >= self.cur_batches.last().map(|b| b.num_rows()).unwrap_or(0) {
if !self.load_next_batch()? {
self.finished = true;
return Ok(());
}
if self.cur_key_row_idx >= self.cur_batches.last().map(|b| b.num_rows()).unwrap_or(0)
&& !self.load_next_batch()?
{
self.finished = true;
return Ok(());
}
self.input.next_key()?;
self.cur_key_row_idx += 1;
Expand Down Expand Up @@ -1181,7 +1181,7 @@ impl PruneSortKeysFromBatch {
key_collector: KC,
) -> Result<RecordBatch> {
let num_rows = pruned_batch.num_rows();
let key_rows = key_collector.into_rows(num_rows, &*self.sort_row_converter.lock())?;
let key_rows = key_collector.into_rows(num_rows, &self.sort_row_converter.lock())?;
self.restore_from_existed_key_rows(pruned_batch, &key_rows)
}

Expand Down
Loading