feat: support merge-engine=aggregation (basic mode)#340
Conversation
Wire the aggregation merge engine through the option parsing and write-side validation surface, mirroring the layout already used for partial-update. No read-side merge logic yet — KeyValueFileReader returns Unsupported until a follow-up commit lands the aggregator implementations. - spec/core_options.rs: add MergeEngine::Aggregation, parser arm. - spec/aggregation.rs: AggregationConfig with create-/runtime-mode validation; rejects ignore-retract, distinct, sequence-group, nested-key, count-limit, aggregation.remove-record-on-delete, ignore-delete keys in basic mode. - spec/schema.rs: invoke AggregationConfig::validate_create_mode at Schema build. - table/kv_file_writer.rs: AggregationConfig::validate_runtime_mode at writer construction; reject deletion-vectors; keep-all flush like partial-update; cover Aggregation in the dedup unreachable arm. - table/bucket_assigner_cross.rs: reject Aggregation in CrossPartitionAssigner (matches partial-update treatment). - table/kv_file_reader.rs: placeholder Unsupported arm for Aggregation; replaced by AggregateMergeFunction in a later commit.
… in cross-partition writes Review of the prior commit surfaced two wire-up gaps: - table_read.rs::to_arrow only dispatched Deduplicate and PartialUpdate to the KV read path, so an aggregation table would silently fall through to read_raw and return unmerged level-0 rows instead of the Unsupported error promised in kv_file_reader.rs. Route Aggregation to read_pk, and force the KV branch (no level-zero fast path) since the aggregation engine cannot use the plain DataFileReader. - table_write.rs only rejected dynamic cross-partition writes for partial-update, leaving aggregation to fail late from inside the CrossPartitionAssigner on the first crossing key. Mirror the early Unsupported. Add two regression tests: - read_builder: a direct read against an aggregation PK table now surfaces Unsupported on first batch. - table_write: cross-partition aggregation table is rejected at writer construction.
…egators Introduces the per-field aggregator registry consumed by the upcoming AggregateMergeFunction. Each aggregator owns small per-PK-group state; the merge function calls reset() at the start of a group, agg() per row in user-sequence order, then result() to materialize a 1-row Arrow array suitable for an interleave assembly. Aggregators in this commit (12, aligned with Java basic mode): - numeric.rs: sum, product, min, max (numeric + Decimal + comparable Date/Time/Timestamp/Char/VarChar for min/max) and count (BIGINT output for any input, counts non-NULL rows). - value.rs: last_value, first_value, last_non_null_value, first_non_null_value, sharing a generic PickValueAgg that stores a zero-copy 1-row slice of the source array. - bool_agg.rs: bool_and, bool_or for BOOLEAN columns. - listagg.rs: listagg for Char/VarChar, with per-field fields.<col>.list-agg-delimiter (default ','). Semantics aligned with Java FieldSumAgg / FieldMinAgg / FieldLastNonNullValueAgg / etc.: - NULL inputs are skipped for sum/product/min/max/bool_*/listagg/count and for the *_non_null_value variants; last_value / first_value preserve NULL. - Min/max skip NaN floats and refuse Boolean / non-comparable types. - sum/product use checked arithmetic on integer types and surface DataInvalid on overflow rather than silently wrapping. - count requires the column to be declared as BIGINT. - Unknown aggregator names and unsupported (aggregator, type) pairs fail at construction with ConfigInvalid so misconfiguration is rejected at table creation rather than at read time. The module is allow(dead_code) for this commit; the AggregateMergeFunction that consumes it lands in the next commit. 41 unit tests cover happy path, NULL propagation, overflow, type rejection, and reset semantics for every aggregator.
… with code Self-review of the previous commit found two issues: - ProductAgg accepted DECIMAL columns and used a naked i128 checked_mul. That silently multiplies the raw values without dividing by 10^scale, so DECIMAL(10,2) 1.00 * 1.00 produced raw=10000 (read back as 100.00) instead of 1.00. Java FieldProductAgg goes through BigDecimal which handles scale rebasing; aligning that here is its own piece of work. For basic mode, reject DECIMAL at ProductAgg::new with ConfigInvalid and drop the dead Decimal128 variant from ProductState so a future reviewer cannot reintroduce the bug by adding back the arm. - numeric.rs's module doc and CountAgg's struct doc both claimed count 'accepts any input type and produces a BIGINT column', but CountAgg::new rejects every non-BIGINT column. Rewrite the doc to match the actual contract: count requires the column to be declared as BIGINT, counts non-NULL inputs, and emits BIGINT output. Adds test_product_rejects_decimal_until_scale_handling_lands to lock in the new ProductAgg constructor behavior.
Adds AggregateMergeFunction in sort_merge.rs and connects it to the KeyValueFileReader so that PK tables with merge-engine=aggregation now return field-wise aggregated rows rather than the placeholder Unsupported error introduced earlier on this branch. - spec/aggregation.rs: re-exposes agg_function_for_field and default_agg_function getters that the merge function uses to pick a per-column aggregator from the user's options. - table/aggregator/mod.rs: tightens the FieldAggregator trait to require Debug (so concrete aggregators can be stored in a Debug-derivable merge function), and drops the module-level #![allow(dead_code)] now that the aggregators are consumed. - table/aggregator/value.rs: removes the indirect FieldAggregator impl on the inner PickValueAgg helper and inlines the four pick-style wrappers via a small macro. Avoids a dead 'name' field and keeps the helper purely inherent. - table/sort_merge.rs: introduces AggregateMergeFunction. Each PK group drives an aggregator slot per output column (None for primary-key columns, which are copied from a representative row). Rows are sorted by user-sequence then system sequence before being fed to each aggregator; sequence fields are forced to last_value to align with Java AggregateMergeFunction#createFieldAggregators. Retract rows (DELETE / UPDATE_BEFORE) are rejected with Unsupported. A defensive guard at the end of merge() reports the offending aggregator name when a NULL slips into a non-nullable column. - table/kv_file_reader.rs: extends new_merge_function with the data the aggregator factory needs (output fields, primary keys, sequence fields) and constructs an AggregateMergeFunction for Aggregation splits. Hoists primary_keys and sequence_fields out of self.config before the try_stream! body so the closure can borrow them. - table/read_builder.rs: removes the Commit-1 placeholder test that asserted the read path returned Unsupported for Aggregation; the full path is now exercised by the new sort-merge tests. Unit tests in sort_merge.rs cover sum + listagg over multiple streams, DELETE rejection, missing aggregate-function configuration error, sequence-field forced-last_value behavior, default aggregate-function fallback, and rejection of unsupported options at construction time.
…egateMergeFunction Self-review of the prior commit found three minor polish issues: - The non-nullable NULL guard at the end of merge() used a single message format that called the source 'aggregator <primary-key>' for PK columns, which misleadingly directs the user at the aggregator config when the actual cause is a NULL primary-key value on the source row. Split the branch so PK-NULL is reported as 'primary-key column ... contains NULL on a source row; declare the column nullable or fix the upstream data', and the aggregator branch keeps the pre-existing message naming the offending aggregator. - Removed the slot-count vs output-schema-field-count assertion. The slot count is fixed at AggregateMergeFunction::new from the same output_fields slice the caller uses to build output_schema, so the branch was unreachable noise (and PR apache#263's reviewer specifically pushed back on this kind of unreachable validation). - Collapsed the duplicate AggregationConfig::new(table_options) construction in AggregateMergeFunction::new and reworded the doc to clarify that 'sequence_fields' is the constructor parameter populated by the reader from the sequence.field option.
Exercises the full DataFusion → ReadBuilder → TableRead → KeyValueFileReader → AggregateMergeFunction path on disk-backed PK tables. Nine scenarios: - sum_and_listagg_fixed_bucket_e2e: numeric sum and string listagg across two commits with overlapping keys, verifying both delimiter and NULL skip semantics. - default_function: fields.default-aggregate-function applies to any column without an explicit per-field aggregator, across three commits with NULL holes in different columns. - mixed_aggregators: sum / max / bool_or / count combined on a single row. - sequence_field_forced_last_value: sequence.field overrides any explicit per-field aggregate-function to last_value, matching Java AggregateMergeFunction#createFieldAggregators. - rejects_delete: DELETE on an aggregation table surfaces an Unsupported error (either at planning or at scan time). - requires_agg_function_per_field: a missing aggregate-function on a value column produces a clear ConfigInvalid mentioning the field name. - rejects_unsupported_options_at_create: fields.<col>.ignore-retract is rejected at CREATE TABLE in basic mode. - sum_all_null_emits_null_for_nullable_column: all-NULL group on a nullable sum column outputs NULL rather than an error. - routing_uses_kv_path: a same-PK INSERT must collapse to one row, guarding the to_arrow → read_pk → read_kv routing introduced earlier on this branch from silently regressing to read_raw.
…mment
Self-review of the E2E test commit surfaced two issues:
- test_pk_aggregation_rejects_delete swallowed the planning-stage Err
branch with no assertion, so a future parser change that rejects
DELETE for an unrelated reason (syntax, dialect, etc.) would silently
turn the test into a no-op pass. Match on both Ok and Err and assert
the error message names the aggregation engine in either case. Also
rename the variable from 'err' to 'plan_result' so the type matches
the name.
- test_pk_aggregation_sum_all_null_emits_null_for_nullable_column's
doc comment described an entirely different test ('count', NOT NULL,
DataInvalid error). Replace with a one-line summary of the actual
contract: all-NULL sum on a nullable column emits NULL, not 0 or an
error.
CI runs clippy with -D warnings, which trips clippy::err-expect on the manual two-step pattern. Same semantics, single call.
JingsongLi
left a comment
There was a problem hiding this comment.
This is a substantial and well-structured PR. The aggregation engine implementation is clean and well-tested.
Highlights:
-
Architecture: The
FieldAggregatortrait with per-field accumulator state is the right abstraction. TheMutex-wrapped aggregator vector inAggregateMergeFunctionis a pragmatic choice given the existingMergeFunctiontrait signature (shared&self), and uncontended in practice. -
Correctness safeguards:
- Overflow detection via
checked_add/checked_mulfor integer types rather than silent wrapping. - NaN handling in min/max mirrors Java behavior.
- Non-nullable field NULL check post-merge catches configuration errors early.
- Retract row rejection up-front prevents partial accumulation leaking into output.
- Overflow detection via
-
Option whitelist approach (basic mode): Explicitly rejecting unsupported options (
ignore-retract,distinct,sequence-group, etc.) at both create-time and runtime is the right strategy — fail fast rather than silently ignore advanced features. -
Test coverage: 53 unit tests + 9 e2e tests covering sum, listagg, mixed aggregators, default function, sequence-field forcing, DELETE rejection, missing config errors, all-NULL groups, and read-path routing verification. Very thorough.
-
The
pick_agg!macro forlast_value/first_value/last_non_null_value/first_non_null_valueis elegant — stores a 1-row Arrow slice rather than typed scalars, so it works for any Paimon type without per-type dispatching.
Minor observations (non-blocking):
- The
CountAggsemantics: it counts non-NULL inputs but the column holdsi64placeholders. The Java reference also does this, so it's correct by design, but the behavior may surprise users who write(1, 1)expecting the count column to be the literal value rather than "one row was observed." A doc note in the system table docs could help. DECIMALproductrejection is well-documented with a clear error message pointing the user to alternatives.
Well done!
Purpose
Add
merge-engine=aggregation(basic mode) — Rust port of JavaAggregateMergeFunction.Brief change log
MergeEngine::Aggregation+AggregationConfig(basic-mode option whitelist; rejectsignore-retract,distinct,sequence-group,nested-key,count-limit,aggregation.remove-record-on-delete,ignore-delete).sum/product/min/max/count/last_value/first_value/last_non_null_value/first_non_null_value/bool_and/bool_or/listagg.AggregateMergeFunctionwired throughKeyValueFileReader. Sequence fields forced tolast_value; PK columns copied through.DELETE/UPDATE_BEFORErejected at runtime.DECIMALproductdeferred (BigDecimal-style scale rebasing TBD).Tests
aggregator/*andsort_merge.pk_tables.rscovering sum+listagg, default function, mixed aggregators, sequence-field forcedlast_value, DELETE rejection, missing-config error, unsupported-options rejection, all-NULL sum, and read-path routing.cargo fmt,cargo clippy --features fulltext,vortex -- -D warnings,cargo test -p paimon --features fulltext,vortex --lib,cargo test -p paimon-datafusion --test pk_tablesall green.API and Format
No on-disk format change. All new types are
pub(crate).Documentation
Inline module docs reference the Java counterparts. No user-facing doc update yet.