diff --git a/Cargo.lock b/Cargo.lock index 3d3df4a0..7003b006 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4095,6 +4095,7 @@ version = "0.1.0" dependencies = [ "anyhow", "argh", + "bytes", "bytesize", "futures", "humantime-serde", diff --git a/stresstest/Cargo.toml b/stresstest/Cargo.toml index ed091022..7e3107f0 100644 --- a/stresstest/Cargo.toml +++ b/stresstest/Cargo.toml @@ -16,7 +16,8 @@ bytesize = { version = "2.0.1", features = ["serde"] } futures = { workspace = true } humantime-serde = { workspace = true } indicatif = "0.18.0" -objectstore-client = { workspace = true } +bytes = { workspace = true } +objectstore-client = { workspace = true, features = ["multipart"] } rand = { workspace = true, features = ["small_rng"] } rand_distr = "0.5.1" serde = { workspace = true } diff --git a/stresstest/README.md b/stresstest/README.md new file mode 100644 index 00000000..3e71a0ba --- /dev/null +++ b/stresstest/README.md @@ -0,0 +1,96 @@ +# Objectstore Stresstest + +A load generator for stress-testing Objectstore servers. + +## Usage + +```bash +cargo run -p stresstest -- -c config/example.yaml +``` + +## Configuration + +The stresstest is configured via a YAML file. See `config/example.yaml` for a +full example. + +### Top-level options + +| Field | Required | Default | Description | +|------------|----------|---------|---------------------------------------------------------| +| `remote` | yes | — | URL of the Objectstore server (e.g. `http://localhost:18888`) | +| `duration` | yes | — | How long to run the test (e.g. `5s`, `10m`, `1h`) | +| `workloads`| yes | — | List of workload definitions (see below) | +| `cleanup` | no | `false` | Delete all created objects after the test finishes | + +### Workload options + +Each entry in `workloads` configures an independent, concurrent workload: + +| Field | Required | Default | Description | +|-----------------|----------|------------|--------------------------------------------------------------------| +| `name` | yes | — | Identifier for this workload (used as the Objectstore usecase) | +| `concurrency` | no | CPU count | Maximum number of concurrent operations | +| `organizations` | no | `1` | Number of organizations to distribute load across | +| `mode` | no | `weighted` | Scheduling mode: `weighted`, `throughput`, or `batch` | +| `file_sizes` | yes | — | File size distribution (see below) | +| `actions` | no | 97/2/1 | Operation distribution (see below) | +| `multipart` | no | — | Multipart upload configuration (see below) | + +### File sizes + +Controls the LogNormal distribution used to generate object sizes: + +| Field | Required | Description | +|-------|----------|------------------------------------------| +| `p50` | yes | Median file size (e.g. `50 KiB`) | +| `p99` | yes | 99th percentile file size (e.g. `200 KiB`) | +| `max` | no | Hard cap on generated file sizes | + +### Actions + +Controls the mix of operations. Interpretation depends on the mode: + +| Field | Default | Description | +|-----------|---------|------------------------------------| +| `writes` | `97` | Weight (or ops/s) for write operations | +| `reads` | `2` | Weight (or ops/s) for read operations | +| `deletes` | `1` | Weight (or ops/s) for delete operations | + +- In **weighted** mode: values are relative weights for random selection. +- In **throughput** mode: values are target operations per second. +- In **batch** mode: `writes` is the number of puts per batch request; `reads` + and `deletes` must be `0`. + +### Multipart + +Enables multipart uploads for `actions.writes` ops of objects larger than 1 KiB. +Cannot be used in `batch` mode, but can be combined with `throughput` or `weighted`. + +| Field | Required | Description | +|---------------|----------|----------------------------------------------------------| +| `concurrency` | yes | Max concurrent part uploads per object | +| `part_size` | yes | Size of each part (min `5 MiB`); last part may be smaller | + +```yaml +multipart: + concurrency: 4 + part_size: 10 MiB +``` + +## Modes + +### Weighted (default) + +Runs at maximum speed with fixed concurrency. Each operation is randomly +selected based on the `actions` weights. Good for saturating the server. + +### Throughput + +Maintains a fixed operations-per-second rate derived from `actions` values. +Useful for simulating steady-state production traffic. + +### Batch + +Sends multiple writes in a single HTTP request using the batch ("many") API. +The `actions.writes` value determines how many puts per batch request. +Note that reads and deletes are currently not supported in this mode. diff --git a/stresstest/config/example.yaml b/stresstest/config/example.yaml index 403ba383..63143a22 100644 --- a/stresstest/config/example.yaml +++ b/stresstest/config/example.yaml @@ -40,3 +40,16 @@ workloads: writes: 20000 reads: 0 deletes: 0 + - name: large-uploads + concurrency: 4 + organizations: 100 + file_sizes: + p50: 20 MiB + p99: 100 MiB + multipart: + concurrency: 4 + part_size: 10 MiB + actions: + writes: 100 + reads: 0 + deletes: 0 diff --git a/stresstest/src/config.rs b/stresstest/src/config.rs index 50c758cf..6c261003 100644 --- a/stresstest/src/config.rs +++ b/stresstest/src/config.rs @@ -1,8 +1,9 @@ use std::time::Duration; +use anyhow::bail; use bytesize::ByteSize; use serde::Deserialize; -use stresstest::workload::WorkloadMode; +use stresstest::workload::{self, WorkloadMode}; #[derive(Debug, Deserialize)] pub struct Config { @@ -29,6 +30,66 @@ pub struct Workload { pub file_sizes: FileSizes, #[serde(default)] pub actions: Actions, + #[serde(default)] + pub multipart: Option, +} + +#[derive(Debug, Deserialize)] +pub struct MultipartConfig { + pub concurrency: usize, + pub part_size: ByteSize, +} + +impl Workload { + pub fn validate(&self) -> anyhow::Result<()> { + if matches!(self.mode, WorkloadMode::Batch) { + if self.actions.reads > 0 || self.actions.deletes > 0 { + bail!( + "workload '{}': batch mode only supports writes, but reads={} and deletes={} were configured", + self.name, + self.actions.reads, + self.actions.deletes + ); + } + if self.actions.writes == 0 { + bail!( + "workload '{}': batch mode requires actions.writes > 0", + self.name + ); + } + } + + if let Some(mp) = &self.multipart { + if matches!(self.mode, WorkloadMode::Batch) { + bail!( + "workload '{}': multipart uploads are not supported in batch mode", + self.name + ); + } + if mp.part_size.0 < 5 * 1024 * 1024 { + bail!( + "workload '{}': multipart part_size must be at least 5 MiB, got {}", + self.name, + mp.part_size + ); + } + if mp.concurrency == 0 { + bail!( + "workload '{}': multipart concurrency must be at least 1", + self.name + ); + } + } + + Ok(()) + } + + pub fn multipart_config(&self) -> Option { + self.multipart.as_ref().map(|mp| workload::MultipartConfig { + concurrency: mp.concurrency, + part_size: mp.part_size.0, + }) + } } fn default_concurrency() -> usize { diff --git a/stresstest/src/http.rs b/stresstest/src/http.rs index fc3741ca..65854761 100644 --- a/stresstest/src/http.rs +++ b/stresstest/src/http.rs @@ -1,12 +1,13 @@ //! Contains a remote implementation using HTTP to interact with objectstore. use anyhow::Context; -use futures::StreamExt; +use bytes::Bytes; +use futures::{StreamExt, TryStreamExt}; use objectstore_client::{Client, GetResponse, Session, Usecase}; use tokio::io::AsyncReadExt; use tokio_util::io::{ReaderStream, StreamReader}; -use crate::workload::Payload; +use crate::workload::{MultipartConfig, Payload}; /// A remote implementation using HTTP to interact with objectstore. #[derive(Debug)] @@ -43,6 +44,60 @@ impl HttpRemote { .context("error writing payload") } + pub(crate) async fn write_multipart( + &self, + usecase: &Usecase, + organization_id: u64, + payload: Payload, + config: &MultipartConfig, + ) -> anyhow::Result { + let session = self.session(usecase, organization_id); + let total_len = payload.len; + let part_size = config.part_size; + + let upload = session + .initiate_multipart_upload() + .compression(None) + .send() + .await + .context("error initiating multipart upload")?; + + let num_parts = total_len.div_ceil(part_size) as u32; + + // Lazily read chunks from the payload so that at most `concurrency` + // part buffers are in memory at any time (bounded by buffer_unordered). + let chunk_stream = futures::stream::unfold( + (payload, 1u32), + move |(mut payload, part_number)| async move { + if part_number > num_parts { + return None; + } + let chunk_len = part_size.min(payload.len) as usize; + let mut buf = vec![0u8; chunk_len]; + payload.read_exact(&mut buf).await.ok()?; + Some(((part_number, Bytes::from(buf)), (payload, part_number + 1))) + }, + ); + + let mut parts: Vec<_> = chunk_stream + .map(|(part_number, chunk)| { + let upload = &upload; + async move { upload.put(chunk, part_number, None).await } + }) + .buffer_unordered(config.concurrency) + .try_collect() + .await + .context("error uploading parts")?; + + parts.sort_by_key(|p| p.part_number); + + upload + .complete(parts) + .await + .map(|key| key.to_string()) + .context("error completing multipart upload") + } + pub(crate) async fn read( &self, usecase: &Usecase, diff --git a/stresstest/src/main.rs b/stresstest/src/main.rs index 483c2519..b02fde01 100644 --- a/stresstest/src/main.rs +++ b/stresstest/src/main.rs @@ -17,12 +17,11 @@ use std::path::PathBuf; -use anyhow::{Context, bail}; +use anyhow::Context; use argh::FromArgs; use stresstest::Workload; use stresstest::http::HttpRemote; use stresstest::stresstest::Stresstest; -use stresstest::workload::WorkloadMode; use crate::config::Config; @@ -53,22 +52,8 @@ async fn main() -> anyhow::Result<()> { .cleanup(config.cleanup); for w in config.workloads { - if matches!(w.mode, WorkloadMode::Batch) { - if w.actions.reads > 0 || w.actions.deletes > 0 { - bail!( - "workload '{}': batch mode only supports writes, but reads={} and deletes={} were configured", - w.name, - w.actions.reads, - w.actions.deletes - ); - } - if w.actions.writes == 0 { - bail!( - "workload '{}': batch mode requires actions.writes > 0", - w.name - ); - } - } + w.validate()?; + let multipart = w.multipart_config(); let workload = Workload::builder(w.name) .concurrency(w.concurrency) @@ -77,6 +62,7 @@ async fn main() -> anyhow::Result<()> { .size_distribution(w.file_sizes.p50.0, w.file_sizes.p99.0) .max_size(w.file_sizes.max.map(|b| b.0)) .action_weights(w.actions.writes, w.actions.reads, w.actions.deletes) + .multipart(multipart) .build(); stresstest = stresstest.workload(workload); diff --git a/stresstest/src/stresstest.rs b/stresstest/src/stresstest.rs index 088cd0fe..76661d10 100644 --- a/stresstest/src/stresstest.rs +++ b/stresstest/src/stresstest.rs @@ -251,6 +251,8 @@ async fn run_workload( WorkloadMode::Throughput => 100, }; + let multipart_config = workload.multipart.clone().map(Arc::new); + let semaphore = Arc::new(Semaphore::new(concurrency)); let start = Instant::now(); let deadline = tokio::time::Instant::now() + duration; @@ -281,6 +283,7 @@ async fn run_workload( }; + let multipart_config = multipart_config.clone(); let task = async move { let start = Instant::now(); match action { @@ -293,7 +296,14 @@ async fn run_workload( usecase = usecase.with_expiration_policy(ExpirationPolicy::TimeToLive(ttl)); } - match remote.write(&usecase, organization_id, payload).await { + let use_multipart = multipart_config.is_some() && file_size > 1024; + let result = if use_multipart { + remote.write_multipart(&usecase, organization_id, payload, multipart_config.as_ref().unwrap()).await + } else { + remote.write(&usecase, organization_id, payload).await + }; + + match result { Ok(object_key) => { let external_id = (usecase, organization_id, object_key); workload.lock().unwrap().push_file(internal_id, external_id); diff --git a/stresstest/src/workload.rs b/stresstest/src/workload.rs index 581ff3ed..4fa515ef 100644 --- a/stresstest/src/workload.rs +++ b/stresstest/src/workload.rs @@ -13,8 +13,17 @@ use rand_distr::{Distribution, LogNormal, Zipf}; use serde::Deserialize; use tokio::io::{AsyncRead, ReadBuf}; +/// Configuration for multipart uploads within a workload. +#[derive(Debug, Clone)] +pub struct MultipartConfig { + /// Maximum number of concurrent part uploads. + pub concurrency: usize, + /// Size of each part in bytes (except the last part, which may be smaller). + pub part_size: u64, +} + /// Defines how the workload schedules its operations. -#[derive(Debug, Default, Deserialize)] +#[derive(Debug, Default, Clone, Copy, Deserialize)] #[serde(rename_all = "snake_case")] pub enum WorkloadMode { /// The workload runs with fixed concurrency as fast as possible. @@ -51,6 +60,8 @@ pub struct WorkloadBuilder { write_weight: usize, read_weight: usize, delete_weight: usize, + + multipart: Option, } impl WorkloadBuilder { @@ -93,6 +104,12 @@ impl WorkloadBuilder { self } + /// Sets the multipart upload configuration. + pub fn multipart(mut self, config: Option) -> Self { + self.multipart = config; + self + } + /// Creates the workload instance. pub fn build(self) -> Workload { let rng = SmallRng::seed_from_u64(self.seed); @@ -112,6 +129,7 @@ impl WorkloadBuilder { concurrency: self.concurrency, organizations: self.organizations, mode: self.mode, + multipart: self.multipart, rng, size_distribution, @@ -144,6 +162,8 @@ pub struct Workload { pub(crate) organizations: u64, /// The target throughput for the workload, in bytes per second. Overrides concurrency. pub(crate) mode: WorkloadMode, + /// Optional multipart upload configuration. + pub(crate) multipart: Option, /// The RNG driving all our distributions. rng: SmallRng, @@ -178,6 +198,8 @@ impl Workload { write_weight: 33, read_weight: 33, delete_weight: 33, + + multipart: None, } }