Skip to content

feat: support merge-engine=aggregation (basic mode)#340

Draft
TheR1sing3un wants to merge 9 commits into
apache:mainfrom
TheR1sing3un:feat/merge-engine-aggregation
Draft

feat: support merge-engine=aggregation (basic mode)#340
TheR1sing3un wants to merge 9 commits into
apache:mainfrom
TheR1sing3un:feat/merge-engine-aggregation

Conversation

@TheR1sing3un
Copy link
Copy Markdown
Member

Purpose

Add merge-engine=aggregation (basic mode) — Rust port of Java AggregateMergeFunction.

Brief change log

  • MergeEngine::Aggregation + AggregationConfig (basic-mode option whitelist; rejects ignore-retract, distinct, sequence-group, nested-key, count-limit, aggregation.remove-record-on-delete, ignore-delete).
  • 12 aggregators: sum / product / min / max / count / last_value / first_value / last_non_null_value / first_non_null_value / bool_and / bool_or / listagg.
  • AggregateMergeFunction wired through KeyValueFileReader. Sequence fields forced to last_value; PK columns copied through.
  • DELETE / UPDATE_BEFORE rejected at runtime. DECIMAL product deferred (BigDecimal-style scale rebasing TBD).

Tests

  • 53 unit tests under aggregator/* and sort_merge.
  • 9 DataFusion e2e tests in pk_tables.rs covering sum+listagg, default function, mixed aggregators, sequence-field forced last_value, DELETE rejection, missing-config error, unsupported-options rejection, all-NULL sum, and read-path routing.
  • cargo fmt, cargo clippy --features fulltext,vortex -- -D warnings, cargo test -p paimon --features fulltext,vortex --lib, cargo test -p paimon-datafusion --test pk_tables all green.

API and Format

No on-disk format change. All new types are pub(crate).

Documentation

Inline module docs reference the Java counterparts. No user-facing doc update yet.

Wire the aggregation merge engine through the option parsing and
write-side validation surface, mirroring the layout already used for
partial-update.  No read-side merge logic yet — KeyValueFileReader
returns Unsupported until a follow-up commit lands the aggregator
implementations.

- spec/core_options.rs: add MergeEngine::Aggregation, parser arm.
- spec/aggregation.rs: AggregationConfig with create-/runtime-mode
  validation; rejects ignore-retract, distinct, sequence-group,
  nested-key, count-limit, aggregation.remove-record-on-delete,
  ignore-delete keys in basic mode.
- spec/schema.rs: invoke AggregationConfig::validate_create_mode at
  Schema build.
- table/kv_file_writer.rs: AggregationConfig::validate_runtime_mode at
  writer construction; reject deletion-vectors; keep-all flush like
  partial-update; cover Aggregation in the dedup unreachable arm.
- table/bucket_assigner_cross.rs: reject Aggregation in
  CrossPartitionAssigner (matches partial-update treatment).
- table/kv_file_reader.rs: placeholder Unsupported arm for
  Aggregation; replaced by AggregateMergeFunction in a later commit.
… in cross-partition writes

Review of the prior commit surfaced two wire-up gaps:

- table_read.rs::to_arrow only dispatched Deduplicate and PartialUpdate
  to the KV read path, so an aggregation table would silently fall
  through to read_raw and return unmerged level-0 rows instead of the
  Unsupported error promised in kv_file_reader.rs.  Route Aggregation
  to read_pk, and force the KV branch (no level-zero fast path) since
  the aggregation engine cannot use the plain DataFileReader.

- table_write.rs only rejected dynamic cross-partition writes for
  partial-update, leaving aggregation to fail late from inside the
  CrossPartitionAssigner on the first crossing key.  Mirror the early
  Unsupported.

Add two regression tests:

- read_builder: a direct read against an aggregation PK table now
  surfaces Unsupported on first batch.
- table_write: cross-partition aggregation table is rejected at writer
  construction.
…egators

Introduces the per-field aggregator registry consumed by the upcoming
AggregateMergeFunction.  Each aggregator owns small per-PK-group state;
the merge function calls reset() at the start of a group, agg() per row
in user-sequence order, then result() to materialize a 1-row Arrow
array suitable for an interleave assembly.

Aggregators in this commit (12, aligned with Java basic mode):

- numeric.rs: sum, product, min, max (numeric + Decimal + comparable
  Date/Time/Timestamp/Char/VarChar for min/max) and count (BIGINT
  output for any input, counts non-NULL rows).
- value.rs: last_value, first_value, last_non_null_value,
  first_non_null_value, sharing a generic PickValueAgg that stores a
  zero-copy 1-row slice of the source array.
- bool_agg.rs: bool_and, bool_or for BOOLEAN columns.
- listagg.rs: listagg for Char/VarChar, with per-field
  fields.<col>.list-agg-delimiter (default ',').

Semantics aligned with Java FieldSumAgg / FieldMinAgg /
FieldLastNonNullValueAgg / etc.:

- NULL inputs are skipped for sum/product/min/max/bool_*/listagg/count
  and for the *_non_null_value variants; last_value / first_value
  preserve NULL.
- Min/max skip NaN floats and refuse Boolean / non-comparable types.
- sum/product use checked arithmetic on integer types and surface
  DataInvalid on overflow rather than silently wrapping.
- count requires the column to be declared as BIGINT.
- Unknown aggregator names and unsupported (aggregator, type) pairs
  fail at construction with ConfigInvalid so misconfiguration is
  rejected at table creation rather than at read time.

The module is allow(dead_code) for this commit; the AggregateMergeFunction
that consumes it lands in the next commit.

41 unit tests cover happy path, NULL propagation, overflow, type
rejection, and reset semantics for every aggregator.
… with code

Self-review of the previous commit found two issues:

- ProductAgg accepted DECIMAL columns and used a naked i128 checked_mul.
  That silently multiplies the raw values without dividing by 10^scale,
  so DECIMAL(10,2) 1.00 * 1.00 produced raw=10000 (read back as 100.00)
  instead of 1.00.  Java FieldProductAgg goes through BigDecimal which
  handles scale rebasing; aligning that here is its own piece of work.
  For basic mode, reject DECIMAL at ProductAgg::new with ConfigInvalid
  and drop the dead Decimal128 variant from ProductState so a future
  reviewer cannot reintroduce the bug by adding back the arm.

- numeric.rs's module doc and CountAgg's struct doc both claimed count
  'accepts any input type and produces a BIGINT column', but
  CountAgg::new rejects every non-BIGINT column.  Rewrite the doc to
  match the actual contract: count requires the column to be declared
  as BIGINT, counts non-NULL inputs, and emits BIGINT output.

Adds test_product_rejects_decimal_until_scale_handling_lands to lock
in the new ProductAgg constructor behavior.
Adds AggregateMergeFunction in sort_merge.rs and connects it to the
KeyValueFileReader so that PK tables with merge-engine=aggregation now
return field-wise aggregated rows rather than the placeholder
Unsupported error introduced earlier on this branch.

- spec/aggregation.rs: re-exposes agg_function_for_field and
  default_agg_function getters that the merge function uses to pick a
  per-column aggregator from the user's options.
- table/aggregator/mod.rs: tightens the FieldAggregator trait to require
  Debug (so concrete aggregators can be stored in a Debug-derivable
  merge function), and drops the module-level #![allow(dead_code)] now
  that the aggregators are consumed.
- table/aggregator/value.rs: removes the indirect FieldAggregator impl
  on the inner PickValueAgg helper and inlines the four pick-style
  wrappers via a small macro.  Avoids a dead 'name' field and keeps the
  helper purely inherent.
- table/sort_merge.rs: introduces AggregateMergeFunction.  Each PK group
  drives an aggregator slot per output column (None for primary-key
  columns, which are copied from a representative row).  Rows are sorted
  by user-sequence then system sequence before being fed to each
  aggregator; sequence fields are forced to last_value to align with
  Java AggregateMergeFunction#createFieldAggregators.  Retract rows
  (DELETE / UPDATE_BEFORE) are rejected with Unsupported.  A defensive
  guard at the end of merge() reports the offending aggregator name
  when a NULL slips into a non-nullable column.
- table/kv_file_reader.rs: extends new_merge_function with the data the
  aggregator factory needs (output fields, primary keys, sequence
  fields) and constructs an AggregateMergeFunction for Aggregation
  splits.  Hoists primary_keys and sequence_fields out of self.config
  before the try_stream! body so the closure can borrow them.
- table/read_builder.rs: removes the Commit-1 placeholder test that
  asserted the read path returned Unsupported for Aggregation; the
  full path is now exercised by the new sort-merge tests.

Unit tests in sort_merge.rs cover sum + listagg over multiple streams,
DELETE rejection, missing aggregate-function configuration error,
sequence-field forced-last_value behavior, default aggregate-function
fallback, and rejection of unsupported options at construction time.
…egateMergeFunction

Self-review of the prior commit found three minor polish issues:

- The non-nullable NULL guard at the end of merge() used a single
  message format that called the source 'aggregator <primary-key>' for
  PK columns, which misleadingly directs the user at the aggregator
  config when the actual cause is a NULL primary-key value on the
  source row.  Split the branch so PK-NULL is reported as 'primary-key
  column ... contains NULL on a source row; declare the column nullable
  or fix the upstream data', and the aggregator branch keeps the
  pre-existing message naming the offending aggregator.
- Removed the slot-count vs output-schema-field-count assertion.  The
  slot count is fixed at AggregateMergeFunction::new from the same
  output_fields slice the caller uses to build output_schema, so the
  branch was unreachable noise (and PR apache#263's reviewer specifically
  pushed back on this kind of unreachable validation).
- Collapsed the duplicate AggregationConfig::new(table_options)
  construction in AggregateMergeFunction::new and reworded the doc to
  clarify that 'sequence_fields' is the constructor parameter populated
  by the reader from the sequence.field option.
Exercises the full DataFusion → ReadBuilder → TableRead → KeyValueFileReader
→ AggregateMergeFunction path on disk-backed PK tables.  Nine scenarios:

- sum_and_listagg_fixed_bucket_e2e: numeric sum and string listagg across
  two commits with overlapping keys, verifying both delimiter and NULL skip
  semantics.
- default_function: fields.default-aggregate-function applies to any column
  without an explicit per-field aggregator, across three commits with
  NULL holes in different columns.
- mixed_aggregators: sum / max / bool_or / count combined on a single row.
- sequence_field_forced_last_value: sequence.field overrides any explicit
  per-field aggregate-function to last_value, matching Java
  AggregateMergeFunction#createFieldAggregators.
- rejects_delete: DELETE on an aggregation table surfaces an Unsupported
  error (either at planning or at scan time).
- requires_agg_function_per_field: a missing aggregate-function on a value
  column produces a clear ConfigInvalid mentioning the field name.
- rejects_unsupported_options_at_create: fields.<col>.ignore-retract is
  rejected at CREATE TABLE in basic mode.
- sum_all_null_emits_null_for_nullable_column: all-NULL group on a
  nullable sum column outputs NULL rather than an error.
- routing_uses_kv_path: a same-PK INSERT must collapse to one row,
  guarding the to_arrow → read_pk → read_kv routing introduced earlier on
  this branch from silently regressing to read_raw.
…mment

Self-review of the E2E test commit surfaced two issues:

- test_pk_aggregation_rejects_delete swallowed the planning-stage Err
  branch with no assertion, so a future parser change that rejects
  DELETE for an unrelated reason (syntax, dialect, etc.) would silently
  turn the test into a no-op pass.  Match on both Ok and Err and assert
  the error message names the aggregation engine in either case.  Also
  rename the variable from 'err' to 'plan_result' so the type matches
  the name.
- test_pk_aggregation_sum_all_null_emits_null_for_nullable_column's
  doc comment described an entirely different test ('count', NOT NULL,
  DataInvalid error).  Replace with a one-line summary of the actual
  contract: all-NULL sum on a nullable column emits NULL, not 0 or an
  error.
CI runs clippy with -D warnings, which trips clippy::err-expect on the
manual two-step pattern.  Same semantics, single call.
Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a substantial and well-structured PR. The aggregation engine implementation is clean and well-tested.

Highlights:

  1. Architecture: The FieldAggregator trait with per-field accumulator state is the right abstraction. The Mutex-wrapped aggregator vector in AggregateMergeFunction is a pragmatic choice given the existing MergeFunction trait signature (shared &self), and uncontended in practice.

  2. Correctness safeguards:

    • Overflow detection via checked_add/checked_mul for integer types rather than silent wrapping.
    • NaN handling in min/max mirrors Java behavior.
    • Non-nullable field NULL check post-merge catches configuration errors early.
    • Retract row rejection up-front prevents partial accumulation leaking into output.
  3. Option whitelist approach (basic mode): Explicitly rejecting unsupported options (ignore-retract, distinct, sequence-group, etc.) at both create-time and runtime is the right strategy — fail fast rather than silently ignore advanced features.

  4. Test coverage: 53 unit tests + 9 e2e tests covering sum, listagg, mixed aggregators, default function, sequence-field forcing, DELETE rejection, missing config errors, all-NULL groups, and read-path routing verification. Very thorough.

  5. The pick_agg! macro for last_value/first_value/last_non_null_value/first_non_null_value is elegant — stores a 1-row Arrow slice rather than typed scalars, so it works for any Paimon type without per-type dispatching.

Minor observations (non-blocking):

  • The CountAgg semantics: it counts non-NULL inputs but the column holds i64 placeholders. The Java reference also does this, so it's correct by design, but the behavior may surprise users who write (1, 1) expecting the count column to be the literal value rather than "one row was observed." A doc note in the system table docs could help.
  • DECIMAL product rejection is well-documented with a clear error message pointing the user to alternatives.

Well done!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants