Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 146 additions & 11 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ on:
required: true
type: choice
options:
- null
- "null"
- bigquery
default: "null"
bq_project_id:
Expand Down Expand Up @@ -88,6 +88,8 @@ env:
POSTGRES_DB: bench
POSTGRES_PORT: 5430
POSTGRES_HOST: localhost
BENCHMARK_SAMPLES: 3
BENCHMARK_WARMUP_SAMPLES: 1

permissions:
actions: read
Expand Down Expand Up @@ -130,7 +132,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y "${missing_packages[@]}"
fi
go install github.com/pingcap/go-tpc@v1.0.12
go install github.com/pingcap/go-tpc/cmd/go-tpc@v1.0.12
echo "$(go env GOPATH)/bin" >> "$GITHUB_PATH"

- name: Start Postgres
Expand Down Expand Up @@ -164,6 +166,11 @@ jobs:
mkdir -p target/bench-results

tpcc_threads="${TPCC_THREADS_INPUT:-derived}"
host_memory="$(awk '/MemTotal/ { printf "%.2f GiB", $2 / 1024 / 1024 }' /proc/meminfo || true)"
if [[ -z "${host_memory}" ]]; then
host_memory="unavailable"
fi

cgroup_memory_limit="unavailable"
if [[ -r /sys/fs/cgroup/memory.max ]]; then
cgroup_memory_limit="$(cat /sys/fs/cgroup/memory.max)"
Expand All @@ -172,22 +179,25 @@ jobs:
fi

{
echo "## Benchmark environment"
echo "## 🖥️ Benchmark environment"
echo
echo "| Setting | Value |"
echo "| --- | --- |"
echo "| Runner | ${BENCHMARK_RUNNER_INPUT} |"
echo "| Runner name | ${RUNNER_NAME} |"
echo "| Runner OS | ${RUNNER_OS} |"
echo "| Runner arch | ${RUNNER_ARCH} |"
echo "| CPUs | $(nproc) |"
echo "| Host memory | $(awk '/MemTotal/ { printf \"%.2f GiB\", $2 / 1024 / 1024 }' /proc/meminfo) |"
echo "| Host memory | ${host_memory} |"
echo "| Cgroup memory limit | ${cgroup_memory_limit} |"
echo "| Warehouses | ${WAREHOUSES_INPUT} |"
echo "| TPC-C threads | ${tpcc_threads} |"
echo "| Measured samples | ${BENCHMARK_SAMPLES} |"
echo "| Warmup samples | ${BENCHMARK_WARMUP_SAMPLES} |"
echo "| Memory budget ratio | ${MEMORY_BUDGET_RATIO_INPUT} |"
echo "| Memory backpressure | ${ENABLE_MEMORY_BACKPRESSURE_INPUT} |"
echo "| Destination | ${DESTINATION_INPUT} |"
} > target/bench-results/benchmark-environment.md

cat target/bench-results/benchmark-environment.md >> "$GITHUB_STEP_SUMMARY"
} >> "$GITHUB_STEP_SUMMARY"

- name: Run benchmarks
env:
Expand Down Expand Up @@ -224,6 +234,8 @@ jobs:
--batch-max-fill-ms "$BATCH_MAX_FILL_MS_INPUT"
--memory-budget-ratio "$MEMORY_BUDGET_RATIO_INPUT"
--destination "$DESTINATION_INPUT"
--samples "$BENCHMARK_SAMPLES"
--warmup-samples "$BENCHMARK_WARMUP_SAMPLES"
--output-dir target/bench-results
)

Expand Down Expand Up @@ -259,6 +271,130 @@ jobs:

cargo xtask "${args[@]}"

- name: Summarize benchmark results
run: |
set -euo pipefail

python3 <<'PY' >> "$GITHUB_STEP_SUMMARY"
import json
from pathlib import Path

results_dir = Path("target/bench-results")

def load(name):
path = results_dir / name
if not path.exists():
return None
return json.loads(path.read_text())

def number(value, digits=2):
if value is None:
return "-"
if isinstance(value, int) or float(value).is_integer():
return f"{int(value):,}"
return f"{float(value):,.{digits}f}"

def seconds(ms):
if ms is None:
return "-"
return f"{float(ms) / 1000:,.2f} s"

def pct(value):
if value is None:
return "-"
return f"{float(value):,.2f}%"

def samples(report):
sample_count = report.get("sample_count")
warmup_count = report.get("warmup_sample_count", 0)
if sample_count is None:
return "1"
if warmup_count:
return f"{sample_count} + {warmup_count} warmup"
return str(sample_count)

def spread(report, key):
return pct((report.get("sample_summary") or {}).get(key, {}).get("spread_pct"))

def table(headers, rows):
print("| " + " | ".join(headers) + " |")
print("| " + " | ".join("---" for _ in headers) + " |")
for row in rows:
print("| " + " | ".join(str(cell) for cell in row) + " |")

table_copy = load("table_copy.json")
table_streaming = load("table_streaming.json")

print()
print("## 📊 Benchmark results")
print()

summary = []
if table_copy:
summary.append([
"📦 Table copy",
f"{number(table_copy.get('rows_per_second'))} rows/s",
f"{number(table_copy.get('estimated_mib_per_second'))} MiB/s",
seconds(table_copy.get("total_ms")),
spread(table_copy, "rows_per_second"),
])
if table_streaming:
summary.append([
"🔁 Table streaming",
f"{number(table_streaming.get('end_to_end_with_shutdown_events_per_second'))} events/s",
f"{number(table_streaming.get('end_to_end_with_shutdown_estimated_mib_per_second'))} MiB/s",
seconds(table_streaming.get("total_ms")),
spread(table_streaming, "end_to_end_with_shutdown_events_per_second"),
])

if summary:
table(["Benchmark", "Median throughput", "Data rate", "Total", "Sample spread"], summary)
print()
else:
print("No benchmark JSON reports were written.")
raise SystemExit(0)

if table_copy:
print("### 📦 Table copy")
table(
["Metric", "Value"],
[
["Samples", samples(table_copy)],
["Rows copied", number(table_copy.get("copied_rows"))],
["Rows/s", number(table_copy.get("rows_per_second"))],
["Rows/s spread", spread(table_copy, "rows_per_second")],
["Decoded MiB", number(table_copy.get("estimated_copied_mib"))],
["Decoded MiB/s", number(table_copy.get("estimated_mib_per_second"))],
["Decoded MiB/s spread", spread(table_copy, "estimated_mib_per_second")],
["Copy wait", seconds(table_copy.get("copy_wait_ms"))],
["Total", seconds(table_copy.get("total_ms"))],
],
)
print()

if table_streaming:
print("### 🔁 Table streaming")
stats = table_streaming.get("destination_stats", {})
table(
["Metric", "Value"],
[
["Samples", samples(table_streaming)],
["Workload", table_streaming.get("workload", "-")],
["Produced events", number(table_streaming.get("produced_events"))],
["Observed CDC events", number(table_streaming.get("observed_cdc_events"))],
["Events/s", number(table_streaming.get("end_to_end_with_shutdown_events_per_second"))],
["Events/s spread", spread(table_streaming, "end_to_end_with_shutdown_events_per_second")],
["Decoded MiB/s", number(table_streaming.get("end_to_end_with_shutdown_estimated_mib_per_second"))],
["Decoded MiB/s spread", spread(table_streaming, "end_to_end_with_shutdown_estimated_mib_per_second")],
["Inserts", number(stats.get("inserts"))],
["Updates", number(stats.get("updates"))],
["Deletes", number(stats.get("deletes"))],
["Total", seconds(table_streaming.get("total_ms"))],
],
)
print()
PY

- name: Compare benchmark results
env:
GITHUB_TOKEN: ${{ github.token }}
Expand All @@ -274,10 +410,9 @@ jobs:
exit "$status"

- name: Upload benchmark results
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
if: always()
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
name: benchmark-results
path: |
target/bench-results/*.json
target/bench-results/*.md
path: target/bench-results/*.json
if-no-files-found: error
7 changes: 6 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@
- Never create commits, push branches, open pull requests, or perform other git write actions unless the user explicitly instructs you to do so.
- Keep the workspace on the stable toolchain from `rust-toolchain.toml` for build, lint, and test commands; use the pinned nightly formatter only through `./scripts/fmt` and `./scripts/fmt-check`.
- Treat `Cargo.toml` workspace lints, `rustfmt.toml`, and compiler diagnostics as the source of truth for enforceable style and correctness rules. Prefer adding or tightening static checks over adding prose rules here.
- Always run the clippy lint command listed in commands section at the end to check everything compile properly.
- Run Clippy, builds, and tests intentionally when they are relevant: for example
after changing Rust code, when compiler/lint diagnostics indicate a problem,
when workflow assumptions changed, or when the user asks for verification. Do
not run expensive checks reflexively for unrelated documentation, YAML-only, or
similarly low-risk edits; in those cases, run the smallest relevant validation
instead and report what actually ran.

## Rust Style
- This section is only for project-specific judgment that is not already covered by rustfmt, rustc, or Clippy.
Expand Down
34 changes: 22 additions & 12 deletions etl-benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Local runs need:
Install the pinned `go-tpc` version used by CI:

```bash
go install github.com/pingcap/go-tpc@v1.0.12
go install github.com/pingcap/go-tpc/cmd/go-tpc@v1.0.12
```

Make sure `$(go env GOPATH)/bin` is on `PATH`.
Expand Down Expand Up @@ -111,6 +111,8 @@ cargo xtask benchmark \
--force-prepare \
--warehouses 10 \
--streaming-duration-seconds 300 \
--samples 3 \
--warmup-samples 1 \
--batch-max-fill-ms 1000 \
--max-table-sync-workers 8 \
--max-copy-connections-per-table 4 \
Expand All @@ -120,6 +122,13 @@ cargo xtask benchmark \
Use `--force-prepare` when changing the warehouse count for an existing local
benchmark database. Without it, `xtask` reuses the existing TPC-C tables.

`--samples` repeats each selected benchmark and writes the median result to
`table_copy.json` and `table_streaming.json`. `--warmup-samples` runs extra
samples before the measured samples and discards them. Use at least three
samples for CI or other noisy hosts; the JSON reports include
`sample_summary` with min, median, max, and spread for each numeric top-level
metric.

`--tpcc-threads` is optional and applies to both `go-tpc tpcc prepare` and
`go-tpc tpcc run`. When omitted, `xtask` derives it as:

Expand Down Expand Up @@ -250,20 +259,21 @@ For BigQuery workflow runs, set the repository secret
`bq_project_id`, and an existing `bq_dataset_id`.

The workflow starts only source Postgres, installs pinned `go-tpc`, runs
`cargo xtask benchmark`, compares the new reports against the most recent
successful `benchmark-results` artifact on the same ref, and uploads
`target/bench-results/*.json` plus `target/bench-results/*.md`. It also writes a
benchmark environment note with the selected runner, CPU count, host memory,
cgroup memory limit, memory budget ratio, and memory backpressure setting.
`cargo xtask benchmark` with three measured samples plus one warmup sample,
compares the median reports against the most recent successful
`benchmark-results` artifact on the same ref, and uploads
`target/bench-results/*.json`. The GitHub step summary contains the benchmark
environment, formatted median results with sample spread, and the comparison
diff against the previous successful run when one is available.

If no previous successful run exists, the comparison writes a "no previous
benchmark artifact" summary and passes. If a comparable previous run exists, the
comparison fails the workflow when exact copy count metrics change or when
throughput and timing metrics regress beyond their per-metric thresholds.
TPC-C streaming event counts are informational because the transaction workload
is duration-based and naturally varies between runs. If benchmark configuration
differs, the comparison still prints the diff table but skips the regression
gate for that benchmark.
comparison fails the workflow when median throughput and timing metrics regress
beyond their per-metric thresholds. Row counts and TPC-C streaming event counts
are informational because the transaction workload is duration-based and
naturally varies between runs. If benchmark configuration differs, the
comparison still prints the diff table but skips the regression gate for that
benchmark.

The comparison is also available locally when you have two result directories:

Expand Down
Loading
Loading