diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 494365674..73868b362 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -54,7 +54,7 @@ on: required: true type: choice options: - - null + - "null" - bigquery default: "null" bq_project_id: @@ -88,6 +88,8 @@ env: POSTGRES_DB: bench POSTGRES_PORT: 5430 POSTGRES_HOST: localhost + BENCHMARK_SAMPLES: 3 + BENCHMARK_WARMUP_SAMPLES: 1 permissions: actions: read @@ -130,7 +132,7 @@ jobs: sudo apt-get update sudo apt-get install -y "${missing_packages[@]}" fi - go install github.com/pingcap/go-tpc@v1.0.12 + go install github.com/pingcap/go-tpc/cmd/go-tpc@v1.0.12 echo "$(go env GOPATH)/bin" >> "$GITHUB_PATH" - name: Start Postgres @@ -164,6 +166,11 @@ jobs: mkdir -p target/bench-results tpcc_threads="${TPCC_THREADS_INPUT:-derived}" + host_memory="$(awk '/MemTotal/ { printf "%.2f GiB", $2 / 1024 / 1024 }' /proc/meminfo || true)" + if [[ -z "${host_memory}" ]]; then + host_memory="unavailable" + fi + cgroup_memory_limit="unavailable" if [[ -r /sys/fs/cgroup/memory.max ]]; then cgroup_memory_limit="$(cat /sys/fs/cgroup/memory.max)" @@ -172,22 +179,25 @@ jobs: fi { - echo "## Benchmark environment" + echo "## šŸ–„ļø Benchmark environment" echo echo "| Setting | Value |" echo "| --- | --- |" echo "| Runner | ${BENCHMARK_RUNNER_INPUT} |" + echo "| Runner name | ${RUNNER_NAME} |" + echo "| Runner OS | ${RUNNER_OS} |" + echo "| Runner arch | ${RUNNER_ARCH} |" echo "| CPUs | $(nproc) |" - echo "| Host memory | $(awk '/MemTotal/ { printf \"%.2f GiB\", $2 / 1024 / 1024 }' /proc/meminfo) |" + echo "| Host memory | ${host_memory} |" echo "| Cgroup memory limit | ${cgroup_memory_limit} |" echo "| Warehouses | ${WAREHOUSES_INPUT} |" echo "| TPC-C threads | ${tpcc_threads} |" + echo "| Measured samples | ${BENCHMARK_SAMPLES} |" + echo "| Warmup samples | ${BENCHMARK_WARMUP_SAMPLES} |" echo "| Memory budget ratio | ${MEMORY_BUDGET_RATIO_INPUT} |" echo "| Memory backpressure | ${ENABLE_MEMORY_BACKPRESSURE_INPUT} |" echo "| Destination | ${DESTINATION_INPUT} |" - } > target/bench-results/benchmark-environment.md - - cat target/bench-results/benchmark-environment.md >> "$GITHUB_STEP_SUMMARY" + } >> "$GITHUB_STEP_SUMMARY" - name: Run benchmarks env: @@ -224,6 +234,8 @@ jobs: --batch-max-fill-ms "$BATCH_MAX_FILL_MS_INPUT" --memory-budget-ratio "$MEMORY_BUDGET_RATIO_INPUT" --destination "$DESTINATION_INPUT" + --samples "$BENCHMARK_SAMPLES" + --warmup-samples "$BENCHMARK_WARMUP_SAMPLES" --output-dir target/bench-results ) @@ -259,6 +271,130 @@ jobs: cargo xtask "${args[@]}" + - name: Summarize benchmark results + run: | + set -euo pipefail + + python3 <<'PY' >> "$GITHUB_STEP_SUMMARY" + import json + from pathlib import Path + + results_dir = Path("target/bench-results") + + def load(name): + path = results_dir / name + if not path.exists(): + return None + return json.loads(path.read_text()) + + def number(value, digits=2): + if value is None: + return "-" + if isinstance(value, int) or float(value).is_integer(): + return f"{int(value):,}" + return f"{float(value):,.{digits}f}" + + def seconds(ms): + if ms is None: + return "-" + return f"{float(ms) / 1000:,.2f} s" + + def pct(value): + if value is None: + return "-" + return f"{float(value):,.2f}%" + + def samples(report): + sample_count = report.get("sample_count") + warmup_count = report.get("warmup_sample_count", 0) + if sample_count is None: + return "1" + if warmup_count: + return f"{sample_count} + {warmup_count} warmup" + return str(sample_count) + + def spread(report, key): + return pct((report.get("sample_summary") or {}).get(key, {}).get("spread_pct")) + + def table(headers, rows): + print("| " + " | ".join(headers) + " |") + print("| " + " | ".join("---" for _ in headers) + " |") + for row in rows: + print("| " + " | ".join(str(cell) for cell in row) + " |") + + table_copy = load("table_copy.json") + table_streaming = load("table_streaming.json") + + print() + print("## šŸ“Š Benchmark results") + print() + + summary = [] + if table_copy: + summary.append([ + "šŸ“¦ Table copy", + f"{number(table_copy.get('rows_per_second'))} rows/s", + f"{number(table_copy.get('estimated_mib_per_second'))} MiB/s", + seconds(table_copy.get("total_ms")), + spread(table_copy, "rows_per_second"), + ]) + if table_streaming: + summary.append([ + "šŸ” Table streaming", + f"{number(table_streaming.get('end_to_end_with_shutdown_events_per_second'))} events/s", + f"{number(table_streaming.get('end_to_end_with_shutdown_estimated_mib_per_second'))} MiB/s", + seconds(table_streaming.get("total_ms")), + spread(table_streaming, "end_to_end_with_shutdown_events_per_second"), + ]) + + if summary: + table(["Benchmark", "Median throughput", "Data rate", "Total", "Sample spread"], summary) + print() + else: + print("No benchmark JSON reports were written.") + raise SystemExit(0) + + if table_copy: + print("### šŸ“¦ Table copy") + table( + ["Metric", "Value"], + [ + ["Samples", samples(table_copy)], + ["Rows copied", number(table_copy.get("copied_rows"))], + ["Rows/s", number(table_copy.get("rows_per_second"))], + ["Rows/s spread", spread(table_copy, "rows_per_second")], + ["Decoded MiB", number(table_copy.get("estimated_copied_mib"))], + ["Decoded MiB/s", number(table_copy.get("estimated_mib_per_second"))], + ["Decoded MiB/s spread", spread(table_copy, "estimated_mib_per_second")], + ["Copy wait", seconds(table_copy.get("copy_wait_ms"))], + ["Total", seconds(table_copy.get("total_ms"))], + ], + ) + print() + + if table_streaming: + print("### šŸ” Table streaming") + stats = table_streaming.get("destination_stats", {}) + table( + ["Metric", "Value"], + [ + ["Samples", samples(table_streaming)], + ["Workload", table_streaming.get("workload", "-")], + ["Produced events", number(table_streaming.get("produced_events"))], + ["Observed CDC events", number(table_streaming.get("observed_cdc_events"))], + ["Events/s", number(table_streaming.get("end_to_end_with_shutdown_events_per_second"))], + ["Events/s spread", spread(table_streaming, "end_to_end_with_shutdown_events_per_second")], + ["Decoded MiB/s", number(table_streaming.get("end_to_end_with_shutdown_estimated_mib_per_second"))], + ["Decoded MiB/s spread", spread(table_streaming, "end_to_end_with_shutdown_estimated_mib_per_second")], + ["Inserts", number(stats.get("inserts"))], + ["Updates", number(stats.get("updates"))], + ["Deletes", number(stats.get("deletes"))], + ["Total", seconds(table_streaming.get("total_ms"))], + ], + ) + print() + PY + - name: Compare benchmark results env: GITHUB_TOKEN: ${{ github.token }} @@ -274,10 +410,9 @@ jobs: exit "$status" - name: Upload benchmark results - uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 + if: always() + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 with: name: benchmark-results - path: | - target/bench-results/*.json - target/bench-results/*.md + path: target/bench-results/*.json if-no-files-found: error diff --git a/AGENTS.md b/AGENTS.md index c162d9037..010e7c5b1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -45,7 +45,12 @@ - Never create commits, push branches, open pull requests, or perform other git write actions unless the user explicitly instructs you to do so. - Keep the workspace on the stable toolchain from `rust-toolchain.toml` for build, lint, and test commands; use the pinned nightly formatter only through `./scripts/fmt` and `./scripts/fmt-check`. - Treat `Cargo.toml` workspace lints, `rustfmt.toml`, and compiler diagnostics as the source of truth for enforceable style and correctness rules. Prefer adding or tightening static checks over adding prose rules here. -- Always run the clippy lint command listed in commands section at the end to check everything compile properly. +- Run Clippy, builds, and tests intentionally when they are relevant: for example + after changing Rust code, when compiler/lint diagnostics indicate a problem, + when workflow assumptions changed, or when the user asks for verification. Do + not run expensive checks reflexively for unrelated documentation, YAML-only, or + similarly low-risk edits; in those cases, run the smallest relevant validation + instead and report what actually ran. ## Rust Style - This section is only for project-specific judgment that is not already covered by rustfmt, rustc, or Clippy. diff --git a/etl-benchmarks/README.md b/etl-benchmarks/README.md index a495c0035..c9786e934 100644 --- a/etl-benchmarks/README.md +++ b/etl-benchmarks/README.md @@ -39,7 +39,7 @@ Local runs need: Install the pinned `go-tpc` version used by CI: ```bash -go install github.com/pingcap/go-tpc@v1.0.12 +go install github.com/pingcap/go-tpc/cmd/go-tpc@v1.0.12 ``` Make sure `$(go env GOPATH)/bin` is on `PATH`. @@ -111,6 +111,8 @@ cargo xtask benchmark \ --force-prepare \ --warehouses 10 \ --streaming-duration-seconds 300 \ + --samples 3 \ + --warmup-samples 1 \ --batch-max-fill-ms 1000 \ --max-table-sync-workers 8 \ --max-copy-connections-per-table 4 \ @@ -120,6 +122,13 @@ cargo xtask benchmark \ Use `--force-prepare` when changing the warehouse count for an existing local benchmark database. Without it, `xtask` reuses the existing TPC-C tables. +`--samples` repeats each selected benchmark and writes the median result to +`table_copy.json` and `table_streaming.json`. `--warmup-samples` runs extra +samples before the measured samples and discards them. Use at least three +samples for CI or other noisy hosts; the JSON reports include +`sample_summary` with min, median, max, and spread for each numeric top-level +metric. + `--tpcc-threads` is optional and applies to both `go-tpc tpcc prepare` and `go-tpc tpcc run`. When omitted, `xtask` derives it as: @@ -250,20 +259,21 @@ For BigQuery workflow runs, set the repository secret `bq_project_id`, and an existing `bq_dataset_id`. The workflow starts only source Postgres, installs pinned `go-tpc`, runs -`cargo xtask benchmark`, compares the new reports against the most recent -successful `benchmark-results` artifact on the same ref, and uploads -`target/bench-results/*.json` plus `target/bench-results/*.md`. It also writes a -benchmark environment note with the selected runner, CPU count, host memory, -cgroup memory limit, memory budget ratio, and memory backpressure setting. +`cargo xtask benchmark` with three measured samples plus one warmup sample, +compares the median reports against the most recent successful +`benchmark-results` artifact on the same ref, and uploads +`target/bench-results/*.json`. The GitHub step summary contains the benchmark +environment, formatted median results with sample spread, and the comparison +diff against the previous successful run when one is available. If no previous successful run exists, the comparison writes a "no previous benchmark artifact" summary and passes. If a comparable previous run exists, the -comparison fails the workflow when exact copy count metrics change or when -throughput and timing metrics regress beyond their per-metric thresholds. -TPC-C streaming event counts are informational because the transaction workload -is duration-based and naturally varies between runs. If benchmark configuration -differs, the comparison still prints the diff table but skips the regression -gate for that benchmark. +comparison fails the workflow when median throughput and timing metrics regress +beyond their per-metric thresholds. Row counts and TPC-C streaming event counts +are informational because the transaction workload is duration-based and +naturally varies between runs. If benchmark configuration differs, the +comparison still prints the diff table but skips the regression gate for that +benchmark. The comparison is also available locally when you have two result directories: diff --git a/xtask/src/commands/benchmark.rs b/xtask/src/commands/benchmark.rs index 1783fa396..4dc70219d 100644 --- a/xtask/src/commands/benchmark.rs +++ b/xtask/src/commands/benchmark.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{Context, Result, bail}; use clap::{Args, ValueEnum}; -use serde_json::Value; +use serde_json::{Map, Number, Value}; const DEFAULT_DB_HOST: &str = "localhost"; const DEFAULT_DB_PORT: u16 = 5430; @@ -99,6 +99,13 @@ pub(crate) struct BenchmarkArgs { /// default. #[arg(long, default_value_t = false)] enable_memory_backpressure: bool, + /// Number of measured samples to run for each selected benchmark. + #[arg(long, default_value_t = 1)] + samples: u16, + /// Number of warmup samples to run before measured samples. Warmups are + /// discarded from the aggregate report. + #[arg(long, default_value_t = 0)] + warmup_samples: u16, /// Directory where JSON benchmark reports are written. #[arg(long, default_value = "target/bench-results")] output_dir: PathBuf, @@ -126,18 +133,14 @@ impl BenchmarkArgs { let tpcc_threads = self.tpcc_threads.unwrap_or_else(|| recommended_threads(self.warehouses)); let pipeline_id_base = benchmark_pipeline_id_base()?; - let table_copy_pipeline_id = pipeline_id_base; - let table_streaming_pipeline_id = - pipeline_id_base.checked_add(1).context("benchmark pipeline id overflow")?; - let table_copy_publication_name = format!("bench_pub_{table_copy_pipeline_id}"); - let table_streaming_publication_name = - format!("bench_streaming_pub_{table_streaming_pipeline_id}"); print_configuration(&[ ("Database", format!("{}@{}:{}", self.database, self.host, self.port)), ("Warehouses", self.warehouses.to_string()), ("TPC-C threads", tpcc_threads.to_string()), ("Streaming workload", "tpcc".to_owned()), + ("Measured samples", self.samples.to_string()), + ("Warmup samples", self.warmup_samples.to_string()), ("Pipeline id base", pipeline_id_base.to_string()), ("Destination", self.destination.as_arg().to_owned()), ("Memory backpressure", memory_backpressure_label(self.enable_memory_backpressure)), @@ -169,7 +172,6 @@ impl BenchmarkArgs { let selected_tables = self.validated_tpcc_tables()?; let table_ids = self.fetch_table_ids(&selected_tables)?; let expected_row_count = self.fetch_expected_row_count(&selected_tables)?; - self.create_publication(&table_copy_publication_name, &selected_tables)?; print_phase( "Table copy", &format!( @@ -179,11 +181,11 @@ impl BenchmarkArgs { ), ); - table_copy_report = Some(self.run_table_copy( - &table_copy_publication_name, + table_copy_report = Some(self.run_table_copy_samples( + &selected_tables, &table_ids, expected_row_count, - table_copy_pipeline_id, + pipeline_id_base, self.output_dir.join("table_copy.json"), )?); print_done("Table copy", "finished"); @@ -193,10 +195,9 @@ impl BenchmarkArgs { if !self.skip_table_streaming { print_phase("Table streaming", "preparing benchmark"); - table_streaming_report = Some(self.run_table_streaming( - &table_streaming_publication_name, + table_streaming_report = Some(self.run_table_streaming_samples( tpcc_threads, - table_streaming_pipeline_id, + pipeline_id_base, self.output_dir.join("table_streaming.json"), )?); print_done("Table streaming", "finished"); @@ -218,6 +219,10 @@ impl BenchmarkArgs { bail!("at least one benchmark must run"); } + if self.samples == 0 { + bail!("--samples must be greater than 0"); + } + if self.streaming_drain_quiet_ms == 0 { bail!("--streaming-drain-quiet-ms must be greater than 0"); } @@ -284,7 +289,7 @@ impl BenchmarkArgs { &format!("loading {} warehouses with {threads} threads", self.warehouses), ); - let status = Command::new("go-tpc") + let output = Command::new("go-tpc") .args(["tpcc", "--warehouses", &self.warehouses.to_string(), "prepare"]) .args(["-d", "postgres"]) .args(["-U", &self.username]) @@ -295,10 +300,20 @@ impl BenchmarkArgs { .args(["--conn-params", "sslmode=disable"]) .args(["-T", &threads.to_string()]) .arg("--no-check") - .status() + .output() .context("failed to run go-tpc")?; - if !status.success() { + if !output.status.success() { + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + if !stdout.trim().is_empty() { + eprintln!("go-tpc prepare stdout:"); + eprint!("{stdout}"); + } + if !stderr.trim().is_empty() { + eprintln!("go-tpc prepare stderr:"); + eprint!("{stderr}"); + } bail!("go-tpc prepare failed"); } @@ -405,6 +420,62 @@ impl BenchmarkArgs { .context("failed to create benchmark publication") } + fn drop_publication(&self, publication_name: &str) -> Result<()> { + let publication_name = quote_identifier(publication_name)?; + self.psql_status(&self.database, &format!("drop publication if exists {publication_name}")) + .context("failed to drop benchmark publication") + } + + fn run_table_copy_samples( + &self, + selected_tables: &[String], + table_ids: &str, + expected_row_count: u64, + pipeline_id_base: u64, + report_path: PathBuf, + ) -> Result { + let mut measured_reports = Vec::with_capacity(usize::from(self.samples)); + let total_samples = self.total_sample_count(); + for sample_idx in 0..total_samples { + let pipeline_id = sample_pipeline_id(pipeline_id_base, 0, sample_idx)?; + let publication_name = format!("bench_pub_{pipeline_id}"); + let sample_report_path = + self.sample_report_path("table_copy", sample_idx.saturating_add(1)); + let sample_label = self.sample_label(sample_idx); + + print_phase("Table copy", &format!("{sample_label} creating publication")); + self.create_publication(&publication_name, selected_tables)?; + print_phase("Table copy", &format!("{sample_label} running")); + let report = self.run_table_copy( + &publication_name, + table_ids, + expected_row_count, + pipeline_id, + sample_report_path.clone(), + )?; + self.drop_publication(&publication_name)?; + remove_sample_report(&sample_report_path)?; + print_sample_result("table_copy", &sample_label, &report); + + if self.is_warmup_sample(sample_idx) { + print_skip("Table copy", &format!("{sample_label} discarded as warmup")); + } else { + measured_reports.push(report); + } + } + + let aggregate = aggregate_reports( + "table_copy", + usize::from(self.samples), + usize::from(self.warmup_samples), + &measured_reports, + )?; + write_json_report(&aggregate, &report_path)?; + print_benchmark_report("table_copy", &aggregate); + print_done("Report", &format!("wrote {}", report_path.display())); + Ok(aggregate) + } + fn run_table_copy( &self, publication_name: &str, @@ -433,11 +504,59 @@ impl BenchmarkArgs { run_benchmark_binary("table_copy", self.destination, &args, &report_path) } + fn run_table_streaming_samples( + &self, + tpcc_threads: u16, + pipeline_id_base: u64, + report_path: PathBuf, + ) -> Result { + let selected_tables = self.validated_tpcc_tables()?; + let mut measured_reports = Vec::with_capacity(usize::from(self.samples)); + let total_samples = self.total_sample_count(); + for sample_idx in 0..total_samples { + let pipeline_id = sample_pipeline_id(pipeline_id_base, 1, sample_idx)?; + let publication_name = format!("bench_streaming_pub_{pipeline_id}"); + let sample_report_path = + self.sample_report_path("table_streaming", sample_idx.saturating_add(1)); + let sample_label = self.sample_label(sample_idx); + + print_phase("Table streaming", &format!("{sample_label} preparing")); + let report = self.run_table_streaming( + &publication_name, + tpcc_threads, + pipeline_id, + &selected_tables, + sample_report_path.clone(), + )?; + self.drop_publication(&publication_name)?; + remove_sample_report(&sample_report_path)?; + print_sample_result("table_streaming", &sample_label, &report); + + if self.is_warmup_sample(sample_idx) { + print_skip("Table streaming", &format!("{sample_label} discarded as warmup")); + } else { + measured_reports.push(report); + } + } + + let aggregate = aggregate_reports( + "table_streaming", + usize::from(self.samples), + usize::from(self.warmup_samples), + &measured_reports, + )?; + write_json_report(&aggregate, &report_path)?; + print_benchmark_report("table_streaming", &aggregate); + print_done("Report", &format!("wrote {}", report_path.display())); + Ok(aggregate) + } + fn run_table_streaming( &self, publication_name: &str, tpcc_threads: u16, pipeline_id: u64, + selected_tables: &[String], report_path: PathBuf, ) -> Result { let mut args = vec!["--log-target".to_owned(), "terminal".to_owned(), "run".to_owned()]; @@ -459,9 +578,8 @@ impl BenchmarkArgs { self.streaming_drain_quiet_ms.to_string(), ]); - let selected_tables = self.validated_tpcc_tables()?; - let table_ids = self.fetch_table_ids(&selected_tables)?; - self.create_publication(publication_name, &selected_tables)?; + let table_ids = self.fetch_table_ids(selected_tables)?; + self.create_publication(publication_name, selected_tables)?; let duration_seconds = self.streaming_duration_seconds.unwrap_or(DEFAULT_STREAMING_DURATION_SECONDS); print_phase( @@ -481,6 +599,27 @@ impl BenchmarkArgs { run_benchmark_binary("table_streaming", self.destination, &args, &report_path) } + fn total_sample_count(&self) -> usize { + usize::from(self.samples) + usize::from(self.warmup_samples) + } + + fn is_warmup_sample(&self, sample_idx: usize) -> bool { + sample_idx < usize::from(self.warmup_samples) + } + + fn sample_label(&self, sample_idx: usize) -> String { + let warmups = usize::from(self.warmup_samples); + if sample_idx < warmups { + format!("warmup {}/{}", sample_idx + 1, warmups) + } else { + format!("sample {}/{}", sample_idx + 1 - warmups, self.samples) + } + } + + fn sample_report_path(&self, benchmark: &str, sample_number: usize) -> PathBuf { + self.output_dir.join(format!(".{benchmark}_sample_{sample_number}.json")) + } + fn push_common_benchmark_args(&self, args: &mut Vec) { args.extend([ "--host".to_owned(), @@ -631,6 +770,206 @@ fn check_command(command: &str) -> Result<()> { bail!("required command '{command}' was not found") } +fn sample_pipeline_id(base: u64, benchmark_offset: u64, sample_idx: usize) -> Result { + let sample_offset = u64::try_from(sample_idx) + .context("benchmark sample index does not fit in pipeline id")? + .checked_mul(2) + .context("benchmark sample pipeline id overflow")?; + base.checked_add(sample_offset) + .and_then(|id| id.checked_add(benchmark_offset)) + .context("benchmark pipeline id overflow") +} + +fn remove_sample_report(path: &Path) -> Result<()> { + match fs::remove_file(path) { + Ok(()) => Ok(()), + Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()), + Err(error) => Err(error).with_context(|| format!("failed to remove {}", path.display())), + } +} + +fn write_json_report(report: &Value, path: &Path) -> Result<()> { + if let Some(parent) = path.parent() + && !parent.as_os_str().is_empty() + { + fs::create_dir_all(parent) + .with_context(|| format!("failed to create report directory {}", parent.display()))?; + } + + let json = serde_json::to_string_pretty(report)?; + fs::write(path, format!("{json}\n")) + .with_context(|| format!("failed to write benchmark report to {}", path.display())) +} + +fn aggregate_reports( + benchmark: &str, + sample_count: usize, + warmup_sample_count: usize, + reports: &[Value], +) -> Result { + if reports.is_empty() { + bail!("cannot aggregate benchmark reports without measured samples"); + } + + let primary_metric = primary_metric_key(benchmark); + let base_report = primary_metric + .and_then(|key| median_report_by_metric(reports, key)) + .unwrap_or(&reports[reports.len() / 2]); + let mut aggregate = aggregate_value(base_report, reports); + let Some(object) = aggregate.as_object_mut() else { + bail!("benchmark report is not a JSON object"); + }; + + object.insert("sample_count".to_owned(), Value::Number(Number::from(sample_count as u64))); + object.insert( + "warmup_sample_count".to_owned(), + Value::Number(Number::from(warmup_sample_count as u64)), + ); + object.insert("aggregation".to_owned(), Value::String("median".to_owned())); + object.insert("sample_summary".to_owned(), sample_summary(reports)); + Ok(aggregate) +} + +fn aggregate_value(base: &Value, reports: &[Value]) -> Value { + match base { + Value::Object(base_object) => { + let mut object = Map::new(); + for (key, value) in base_object { + let values = reports + .iter() + .filter_map(|report| report.as_object()?.get(key)) + .collect::>(); + if values.len() == reports.len() { + object.insert(key.clone(), aggregate_field(value, &values)); + } else { + object.insert(key.clone(), value.clone()); + } + } + Value::Object(object) + } + value => value.clone(), + } +} + +fn aggregate_field(base: &Value, values: &[&Value]) -> Value { + if let Some(numbers) = numeric_values(values) { + return json_number(median(numbers)); + } + + if matches!(base, Value::Object(_)) && values.iter().all(|value| value.is_object()) { + let owned_values = values.iter().map(|value| (*value).clone()).collect::>(); + return aggregate_value(base, &owned_values); + } + + base.clone() +} + +fn sample_summary(reports: &[Value]) -> Value { + let mut summary = Map::new(); + let Some(base_object) = reports.first().and_then(Value::as_object) else { + return Value::Object(summary); + }; + + for key in base_object.keys() { + let values = + reports.iter().filter_map(|report| report.as_object()?.get(key)).collect::>(); + let Some(numbers) = numeric_values(&values) else { + continue; + }; + if numbers.is_empty() { + continue; + } + + let min = numbers.iter().copied().fold(f64::INFINITY, f64::min); + let max = numbers.iter().copied().fold(f64::NEG_INFINITY, f64::max); + let median = median(numbers); + let spread_pct = if min > 0.0 { ((max - min) / min) * 100.0 } else { 0.0 }; + summary.insert( + key.clone(), + Value::Object(Map::from_iter([ + ("min".to_owned(), json_number(min)), + ("median".to_owned(), json_number(median)), + ("max".to_owned(), json_number(max)), + ("spread_pct".to_owned(), json_number(spread_pct)), + ])), + ); + } + + Value::Object(summary) +} + +fn numeric_values(values: &[&Value]) -> Option> { + values.iter().map(|value| value.as_f64()).collect() +} + +fn median(mut values: Vec) -> f64 { + values.sort_by(f64::total_cmp); + let mid = values.len() / 2; + if values.len().is_multiple_of(2) { (values[mid - 1] + values[mid]) / 2.0 } else { values[mid] } +} + +fn median_report_by_metric<'a>(reports: &'a [Value], key: &str) -> Option<&'a Value> { + let mut indexed_values = reports + .iter() + .enumerate() + .filter_map(|(idx, report)| Some((idx, report.get(key)?.as_f64()?))) + .collect::>(); + if indexed_values.len() != reports.len() { + return None; + } + + indexed_values.sort_by(|(_, left), (_, right)| left.total_cmp(right)); + Some(&reports[indexed_values[indexed_values.len() / 2].0]) +} + +fn json_number(value: f64) -> Value { + if value.is_finite() && value.fract() == 0.0 && value >= 0.0 && value <= u64::MAX as f64 { + Value::Number(Number::from(value as u64)) + } else { + Value::Number(Number::from_f64(value).unwrap_or_else(|| Number::from(0))) + } +} + +fn primary_metric_key(benchmark: &str) -> Option<&'static str> { + match benchmark { + "table_copy" => Some("rows_per_second"), + "table_streaming" => Some("end_to_end_with_shutdown_events_per_second"), + _ => None, + } +} + +fn print_sample_result(benchmark: &str, sample_label: &str, report: &Value) { + match benchmark { + "table_copy" => { + let throughput = report_number(report, "rows_per_second").map_or_else( + || "-".to_owned(), + |value| format!("{} rows/s", format_decimal(value, 2)), + ); + let total = report_number(report, "total_ms") + .map_or_else(|| "-".to_owned(), format_milliseconds); + print_done("Table copy", &format!("{sample_label} {throughput}, total {total}")); + } + "table_streaming" => { + let throughput = report_number(report, "end_to_end_with_shutdown_events_per_second") + .map_or_else( + || "-".to_owned(), + |value| format!("{} events/s", format_decimal(value, 2)), + ); + let produced = report_number(report, "produced_events").map_or_else( + || "-".to_owned(), + |value| format!("{} events", format_integer(value as u64)), + ); + let total = report_number(report, "total_ms") + .map_or_else(|| "-".to_owned(), format_milliseconds); + print_done( + "Table streaming", + &format!("{sample_label} {throughput}, {produced}, total {total}"), + ); + } + _ => {} + } +} + fn print_configuration(rows: &[(&str, String)]) { print_stderr_table("Benchmark configuration", "Setting", "Value", rows); eprintln!(); @@ -761,8 +1100,6 @@ fn run_benchmark_binary( let report = read_report(report_path) .with_context(|| format!("benchmark {binary_name} did not write a valid report"))?; - print_benchmark_report(binary_name, &report); - print_done("Report", &format!("wrote {}", report_path.display())); Ok(report) } @@ -987,3 +1324,63 @@ fn quote_identifier(identifier: &str) -> Result { fn quote_literal(value: &str) -> String { format!("'{}'", value.replace('\'', "''")) } + +#[cfg(test)] +mod tests { + use serde_json::json; + + use crate::commands::benchmark::aggregate_reports; + + #[test] + fn aggregate_reports_uses_medians_and_records_spread() { + let reports = vec![ + json!({ + "benchmark": "table_streaming", + "destination": "null", + "end_to_end_with_shutdown_events_per_second": 100.0, + "end_to_end_with_shutdown_estimated_mib_per_second": 10.0, + "total_ms": 1_000, + "destination_stats": { + "inserts": 10, + "updates": 20 + } + }), + json!({ + "benchmark": "table_streaming", + "destination": "null", + "end_to_end_with_shutdown_events_per_second": 80.0, + "end_to_end_with_shutdown_estimated_mib_per_second": 8.0, + "total_ms": 1_200, + "destination_stats": { + "inserts": 8, + "updates": 16 + } + }), + json!({ + "benchmark": "table_streaming", + "destination": "null", + "end_to_end_with_shutdown_events_per_second": 120.0, + "end_to_end_with_shutdown_estimated_mib_per_second": 12.0, + "total_ms": 900, + "destination_stats": { + "inserts": 12, + "updates": 24 + } + }), + ]; + + let aggregate = aggregate_reports("table_streaming", 3, 1, &reports).unwrap(); + + assert_eq!(aggregate["aggregation"], "median"); + assert_eq!(aggregate["sample_count"], 3); + assert_eq!(aggregate["warmup_sample_count"], 1); + assert_eq!(aggregate["end_to_end_with_shutdown_events_per_second"], 100); + assert_eq!(aggregate["end_to_end_with_shutdown_estimated_mib_per_second"], 10); + assert_eq!(aggregate["total_ms"], 1000); + assert_eq!(aggregate["destination_stats"]["inserts"], 10); + assert_eq!( + aggregate["sample_summary"]["end_to_end_with_shutdown_events_per_second"]["spread_pct"], + 50 + ); + } +} diff --git a/xtask/src/commands/benchmark_compare.rs b/xtask/src/commands/benchmark_compare.rs index 751de172a..291e2b30d 100644 --- a/xtask/src/commands/benchmark_compare.rs +++ b/xtask/src/commands/benchmark_compare.rs @@ -20,7 +20,7 @@ const GITHUB_ACCEPT: &str = "application/vnd.github+json"; const GITHUB_USER_AGENT: &str = "supabase-etl-benchmark-compare"; const TABLE_COPY_METRICS: &[Metric] = &[ - Metric { key: "copied_rows", label: "Rows copied", policy: RegressionPolicy::Exact }, + Metric { key: "copied_rows", label: "Rows copied", policy: RegressionPolicy::Informational }, Metric { key: "estimated_copied_mib", label: "Est. decoded MiB", @@ -78,7 +78,6 @@ const CONFIG_KEYS: &[&str] = &[ "destination", "workload", "table_count", - "expected_row_count", "duration_seconds", "tpcc_warehouses", "tpcc_threads", @@ -89,6 +88,9 @@ const CONFIG_KEYS: &[&str] = &[ "memory_backpressure_enabled", "max_table_sync_workers", "max_copy_connections_per_table", + "sample_count", + "warmup_sample_count", + "aggregation", ]; type Reports = BTreeMap; @@ -125,7 +127,6 @@ struct Metric { #[derive(Clone, Copy)] enum RegressionPolicy { - Exact, HigherIsBetter { max_drop_pct: f64 }, LowerIsBetter { max_increase_pct: f64 }, Informational, @@ -142,6 +143,7 @@ struct WorkflowRun { run_number: u64, html_url: String, conclusion: Option, + head_sha: Option, } #[derive(Deserialize)] @@ -159,6 +161,7 @@ struct Artifact { struct PreviousSource { reports: Reports, run: Option, + code_compare: Option, } struct Comparison { @@ -166,11 +169,18 @@ struct Comparison { failures: Vec, } +struct CodeCompare { + previous_sha: String, + current_sha: String, + url: String, +} + struct GitHubContext { token: String, repo: String, current_run_id: u64, ref_name: String, + current_sha: Option, } struct GitHubClient { @@ -191,9 +201,12 @@ impl BenchmarkCompareArgs { let output = self.output.clone().unwrap_or_else(|| self.current_dir.join("benchmark-comparison.md")); let comparison = match self.previous_source().await? { - Some(previous) => { - compare_reports(&previous.reports, ¤t_reports, previous.run.as_ref()) - } + Some(previous) => compare_reports( + &previous.reports, + ¤t_reports, + previous.run.as_ref(), + previous.code_compare.as_ref(), + ), None => Comparison { markdown: render_missing_previous(&self), failures: Vec::new() }, }; @@ -222,7 +235,7 @@ impl BenchmarkCompareArgs { let reports = load_reports(previous_dir).with_context(|| { format!("failed to load reports from {}", previous_dir.display()) })?; - return Ok(Some(PreviousSource { reports, run: None })); + return Ok(Some(PreviousSource { reports, run: None, code_compare: None })); } let Some(context) = self.github_context() else { @@ -234,12 +247,13 @@ impl BenchmarkCompareArgs { }; let client = GitHubClient::new(context.token.clone(), context.repo.clone())?; - let Some((reports, run)) = self.load_github_previous_reports(&client, &context).await? + let Some((reports, run, code_compare)) = + self.load_github_previous_reports(&client, &context).await? else { return Ok(None); }; - Ok(Some(PreviousSource { reports, run: Some(run) })) + Ok(Some(PreviousSource { reports, run: Some(run), code_compare })) } fn github_context(&self) -> Option { @@ -247,14 +261,15 @@ impl BenchmarkCompareArgs { let repo = env::var("GITHUB_REPOSITORY").ok()?; let current_run_id = env::var("GITHUB_RUN_ID").ok()?.parse().ok()?; let ref_name = self.ref_name.clone().or_else(|| env::var("GITHUB_REF_NAME").ok())?; - Some(GitHubContext { token, repo, current_run_id, ref_name }) + let current_sha = env::var("GITHUB_SHA").ok(); + Some(GitHubContext { token, repo, current_run_id, ref_name, current_sha }) } async fn load_github_previous_reports( &self, client: &GitHubClient, context: &GitHubContext, - ) -> Result> { + ) -> Result)>> { let runs = client.workflow_runs(&self.workflow, &context.ref_name).await?; for run in runs { if run.id == context.current_run_id || run.conclusion.as_deref() != Some("success") { @@ -278,7 +293,8 @@ impl BenchmarkCompareArgs { self.artifact_name, run.run_number ) })?; - return Ok(Some((reports, run))); + let code_compare = code_compare(&context.repo, &run, context.current_sha.as_deref()); + return Ok(Some((reports, run, code_compare))); } eprintln!( @@ -419,7 +435,7 @@ fn load_reports(dir: &Path) -> Result { fn render_missing_previous(args: &BenchmarkCompareArgs) -> String { format!( - "## Benchmark Comparison\n\nNo previous benchmark artifact was available. In GitHub \ + "## šŸ“ˆ Benchmark comparison\n\nā„¹ļø No previous benchmark artifact was available. In GitHub \ Actions, this compares against the most recent successful `{}` artifact from `{}` on the \ same ref. Locally, pass `--previous-dir`.", args.artifact_name, args.workflow @@ -430,14 +446,25 @@ fn compare_reports( previous_reports: &Reports, current_reports: &Reports, previous_run: Option<&WorkflowRun>, + code_compare: Option<&CodeCompare>, ) -> Comparison { - let mut lines = vec!["## Benchmark Comparison".to_owned(), String::new()]; + let mut lines = vec!["## šŸ“ˆ Benchmark comparison".to_owned(), String::new()]; let mut failures = Vec::new(); if let Some(previous_run) = previous_run { lines.push(format!( - "Compared with previous successful run [#{}]({}).", - previous_run.run_number, previous_run.html_url + "Compared with previous successful run [#{}]({}) at `{}`.", + previous_run.run_number, + previous_run.html_url, + previous_run.head_sha.as_deref().map_or("unknown commit", short_sha) )); + if let Some(code_compare) = code_compare { + lines.push(format!( + "Code diff: [`{}...{}`]({}).", + short_sha(&code_compare.previous_sha), + short_sha(&code_compare.current_sha), + code_compare.url + )); + } } else { lines.push("Compared with local previous benchmark results.".to_owned()); } @@ -456,17 +483,17 @@ fn compare_reports( for benchmark in common_benchmarks { let previous = previous_reports.get(&benchmark).expect("benchmark key exists"); let current = current_reports.get(&benchmark).expect("benchmark key exists"); - lines.push(format!("### {benchmark}")); + lines.push(format!("### {}", benchmark_title(&benchmark))); let changes = config_changes(previous, current); if !changes.is_empty() { lines.push(String::new()); - lines.push(format!("> Benchmark configuration differs: {}.", changes.join(", "))); + lines.push(format!("> āš ļø Benchmark configuration differs: {}.", changes.join(", "))); } lines.push(String::new()); - lines.push("| Metric | Previous | Current | Change |".to_owned()); - lines.push("| --- | ---: | ---: | ---: |".to_owned()); + lines.push("| Metric | Previous | Current | Change | Result |".to_owned()); + lines.push("| --- | ---: | ---: | ---: | --- |".to_owned()); for metric in metrics_for(&benchmark) { let Some(previous_value) = previous.get(metric.key) else { continue; @@ -476,11 +503,12 @@ fn compare_reports( }; lines.push(format!( - "| {} | {} | {} | {} |", + "| {} | {} | {} | {} | {} |", metric.label, format_value(previous_value), format_value(current_value), - format_delta(previous_value, current_value) + format_delta(previous_value, current_value), + format_change_result(metric.policy, previous_value, current_value) )); } @@ -492,17 +520,17 @@ fn compare_reports( lines.push(String::new()); if changes.is_empty() { if benchmark_failures.is_empty() { - lines.push("Regression gate: passed.".to_owned()); + lines.push("āœ… Regression gate: passed.".to_owned()); } else { - lines.push("Regression gate: failed.".to_owned()); + lines.push("🚨 Regression gate: failed.".to_owned()); for failure in &benchmark_failures { - lines.push(format!("- {failure}")); + lines.push(format!("- 🚨 {failure}")); } failures.extend(benchmark_failures); } } else { lines.push( - "Regression gate: skipped because benchmark configuration differs.".to_owned(), + "āš ļø Regression gate: skipped because benchmark configuration differs.".to_owned(), ); } lines.push(String::new()); @@ -519,10 +547,10 @@ fn compare_reports( .cloned() .collect::>(); if !previous_only.is_empty() { - lines.push(format!("Previous-only reports: {}.", previous_only.join(", "))); + lines.push(format!("ā„¹ļø Previous-only reports: {}.", previous_only.join(", "))); } if !current_only.is_empty() { - lines.push(format!("Current-only reports: {}.", current_only.join(", "))); + lines.push(format!("ā„¹ļø Current-only reports: {}.", current_only.join(", "))); } Comparison { markdown: lines.join("\n"), failures } @@ -536,6 +564,36 @@ fn metrics_for(benchmark: &str) -> &'static [Metric] { } } +fn benchmark_title(benchmark: &str) -> String { + match benchmark { + "table_copy" => "šŸ“¦ Table copy".to_owned(), + "table_streaming" => "šŸ” Table streaming".to_owned(), + benchmark => benchmark.to_owned(), + } +} + +fn code_compare( + repo: &str, + previous_run: &WorkflowRun, + current_sha: Option<&str>, +) -> Option { + let previous_sha = previous_run.head_sha.as_deref()?.trim(); + let current_sha = current_sha?.trim(); + if previous_sha.is_empty() || current_sha.is_empty() { + return None; + } + + Some(CodeCompare { + previous_sha: previous_sha.to_owned(), + current_sha: current_sha.to_owned(), + url: format!("https://github.com/{repo}/compare/{previous_sha}...{current_sha}"), + }) +} + +fn short_sha(sha: &str) -> &str { + sha.get(..7).unwrap_or(sha) +} + fn config_changes(previous: &Value, current: &Value) -> Vec { CONFIG_KEYS .iter() @@ -575,14 +633,6 @@ fn regression_failure( current: &Value, ) -> Option { match metric.policy { - RegressionPolicy::Exact => (previous != current).then(|| { - format!( - "{benchmark} {} changed from {} to {}", - metric.label, - format_value(previous), - format_value(current) - ) - }), RegressionPolicy::HigherIsBetter { max_drop_pct } => { let previous = numeric_value(previous)?; let current = numeric_value(current)?; @@ -642,6 +692,64 @@ fn format_delta(previous: &Value, current: &Value) -> String { } } +fn format_change_result( + policy: RegressionPolicy, + previous: &Value, + current: &Value, +) -> &'static str { + match policy { + RegressionPolicy::Informational => "ā„¹ļø info", + RegressionPolicy::HigherIsBetter { max_drop_pct } => { + let Some(previous) = numeric_value(previous) else { + return ""; + }; + let Some(current) = numeric_value(current) else { + return ""; + }; + if current > previous { + "āœ… better" + } else if current < previous { + if previous <= 0.0 { + "āš ļø worse" + } else { + let drop_pct = ((previous - current) / previous) * 100.0; + if drop_pct > max_drop_pct { + "🚨 regression" + } else { + "āš ļø within threshold" + } + } + } else { + "āž– same" + } + } + RegressionPolicy::LowerIsBetter { max_increase_pct } => { + let Some(previous) = numeric_value(previous) else { + return ""; + }; + let Some(current) = numeric_value(current) else { + return ""; + }; + if current < previous { + "āœ… better" + } else if current > previous { + if previous <= 0.0 { + "āš ļø worse" + } else { + let increase_pct = ((current - previous) / previous) * 100.0; + if increase_pct > max_increase_pct { + "🚨 regression" + } else { + "āš ļø within threshold" + } + } + } else { + "āž– same" + } + } + } +} + fn format_value(value: &Value) -> String { match value { Value::Null => "null".to_owned(), @@ -710,7 +818,7 @@ mod tests { use serde_json::json; use crate::commands::benchmark_compare::{ - Reports, compare_reports, config_changes, regression_failures, + CodeCompare, Reports, WorkflowRun, compare_reports, config_changes, regression_failures, }; #[test] @@ -737,7 +845,7 @@ mod tests { } #[test] - fn table_copy_regression_gate_flags_exact_and_threshold_failures() { + fn table_copy_regression_gate_flags_threshold_failures() { let previous = json!({ "copied_rows": 1_000, "rows_per_second": 1_000.0, @@ -755,10 +863,9 @@ mod tests { let failures = regression_failures("table_copy", &previous, ¤t); - assert_eq!(failures.len(), 3); - assert!(failures[0].contains("Rows copied changed")); - assert!(failures[1].contains("Rows/s dropped by 20.00%")); - assert!(failures[2].contains("Copy wait ms increased by 25.00%")); + assert_eq!(failures.len(), 2); + assert!(failures[0].contains("Rows/s dropped by 20.00%")); + assert!(failures[1].contains("Copy wait ms increased by 25.00%")); } #[test] @@ -783,6 +890,102 @@ mod tests { assert!(failures.is_empty()); } + #[test] + fn compare_reports_labels_changes_by_metric_direction() { + let mut previous = Reports::new(); + previous.insert( + "table_copy".to_owned(), + json!({ + "benchmark": "table_copy", + "destination": "null", + "table_count": 8, + "rows_per_second": 1_000.0, + "estimated_mib_per_second": 100.0, + "copy_wait_ms": 1_000.0, + "total_ms": 1_000.0 + }), + ); + + let mut current = Reports::new(); + current.insert( + "table_copy".to_owned(), + json!({ + "benchmark": "table_copy", + "destination": "null", + "table_count": 8, + "rows_per_second": 1_100.0, + "estimated_mib_per_second": 90.0, + "copy_wait_ms": 900.0, + "total_ms": 1_100.0 + }), + ); + + let comparison = compare_reports(&previous, ¤t, None, None); + + assert!( + comparison.markdown.contains("| Rows/s | 1000 | 1100 | +100 (+10.00%) | āœ… better |") + ); + assert!( + comparison.markdown.contains( + "| Est. decoded MiB/s | 100 | 90 | -10 (-10.00%) | āš ļø within threshold |" + ) + ); + assert!( + comparison + .markdown + .contains("| Copy wait ms | 1000 | 900 | -100 (-10.00%) | āœ… better |") + ); + assert!( + comparison + .markdown + .contains("| Total ms | 1000 | 1100 | +100 (+10.00%) | āš ļø within threshold |") + ); + } + + #[test] + fn compare_reports_labels_threshold_breaking_changes_as_regressions() { + let mut previous = Reports::new(); + previous.insert( + "table_streaming".to_owned(), + json!({ + "benchmark": "table_streaming", + "destination": "null", + "table_count": 8, + "duration_seconds": 60, + "end_to_end_with_shutdown_events_per_second": 1_000.0, + "end_to_end_with_shutdown_estimated_mib_per_second": 100.0, + "total_ms": 1_000.0 + }), + ); + + let mut current = Reports::new(); + current.insert( + "table_streaming".to_owned(), + json!({ + "benchmark": "table_streaming", + "destination": "null", + "table_count": 8, + "duration_seconds": 60, + "end_to_end_with_shutdown_events_per_second": 800.0, + "end_to_end_with_shutdown_estimated_mib_per_second": 80.0, + "total_ms": 1_250.0 + }), + ); + + let comparison = compare_reports(&previous, ¤t, None, None); + + assert!( + comparison + .markdown + .contains("| Events/s | 1000 | 800 | -200 (-20.00%) | 🚨 regression |") + ); + assert!( + comparison + .markdown + .contains("| Total ms | 1000 | 1250 | +250 (+25.00%) | 🚨 regression |") + ); + } + #[test] fn compare_reports_skips_regression_gate_when_config_differs() { let mut previous = Reports::new(); @@ -811,7 +1014,7 @@ mod tests { }), ); - let comparison = compare_reports(&previous, ¤t, None); + let comparison = compare_reports(&previous, ¤t, None, None); assert!(comparison.failures.is_empty()); assert!( @@ -820,4 +1023,54 @@ mod tests { .contains("Regression gate: skipped because benchmark configuration differs.") ); } + + #[test] + fn compare_reports_links_previous_run_and_commit_diff() { + let mut previous = Reports::new(); + previous.insert( + "table_copy".to_owned(), + json!({ + "benchmark": "table_copy", + "destination": "null", + "table_count": 8, + "rows_per_second": 1_000.0 + }), + ); + + let mut current = Reports::new(); + current.insert( + "table_copy".to_owned(), + json!({ + "benchmark": "table_copy", + "destination": "null", + "table_count": 8, + "rows_per_second": 1_100.0 + }), + ); + + let previous_run = WorkflowRun { + id: 1, + run_number: 42, + html_url: "https://github.com/supabase/etl/actions/runs/42".to_owned(), + conclusion: Some("success".to_owned()), + head_sha: Some("abc123456789".to_owned()), + }; + let code_compare = CodeCompare { + previous_sha: "abc123456789".to_owned(), + current_sha: "def567890123".to_owned(), + url: "https://github.com/supabase/etl/compare/abc123456789...def567890123".to_owned(), + }; + + let comparison = + compare_reports(&previous, ¤t, Some(&previous_run), Some(&code_compare)); + + assert!( + comparison.markdown.contains( + "Compared with previous successful run [#42](https://github.com/supabase/etl/actions/runs/42) at `abc1234`." + ) + ); + assert!(comparison.markdown.contains( + "Code diff: [`abc1234...def5678`](https://github.com/supabase/etl/compare/abc123456789...def567890123)." + )); + } }