Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion stresstest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ bytesize = { version = "2.0.1", features = ["serde"] }
futures = { workspace = true }
humantime-serde = { workspace = true }
indicatif = "0.18.0"
objectstore-client = { workspace = true }
bytes = { workspace = true }
objectstore-client = { workspace = true, features = ["multipart"] }
rand = { workspace = true, features = ["small_rng"] }
rand_distr = "0.5.1"
serde = { workspace = true }
Expand Down
96 changes: 96 additions & 0 deletions stresstest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Objectstore Stresstest

A load generator for stress-testing Objectstore servers.

## Usage

```bash
cargo run -p stresstest -- -c config/example.yaml
```

## Configuration

The stresstest is configured via a YAML file. See `config/example.yaml` for a
full example.

### Top-level options

| Field | Required | Default | Description |
|------------|----------|---------|---------------------------------------------------------|
| `remote` | yes | — | URL of the Objectstore server (e.g. `http://localhost:18888`) |
| `duration` | yes | — | How long to run the test (e.g. `5s`, `10m`, `1h`) |
| `workloads`| yes | — | List of workload definitions (see below) |
| `cleanup` | no | `false` | Delete all created objects after the test finishes |

### Workload options

Each entry in `workloads` configures an independent, concurrent workload:

| Field | Required | Default | Description |
|-----------------|----------|------------|--------------------------------------------------------------------|
| `name` | yes | — | Identifier for this workload (used as the Objectstore usecase) |
| `concurrency` | no | CPU count | Maximum number of concurrent operations |
| `organizations` | no | `1` | Number of organizations to distribute load across |
| `mode` | no | `weighted` | Scheduling mode: `weighted`, `throughput`, or `batch` |
| `file_sizes` | yes | — | File size distribution (see below) |
| `actions` | no | 97/2/1 | Operation distribution (see below) |
| `multipart` | no | — | Multipart upload configuration (see below) |

### File sizes

Controls the LogNormal distribution used to generate object sizes:

| Field | Required | Description |
|-------|----------|------------------------------------------|
| `p50` | yes | Median file size (e.g. `50 KiB`) |
| `p99` | yes | 99th percentile file size (e.g. `200 KiB`) |
| `max` | no | Hard cap on generated file sizes |

### Actions

Controls the mix of operations. Interpretation depends on the mode:

| Field | Default | Description |
|-----------|---------|------------------------------------|
| `writes` | `97` | Weight (or ops/s) for write operations |
| `reads` | `2` | Weight (or ops/s) for read operations |
| `deletes` | `1` | Weight (or ops/s) for delete operations |

- In **weighted** mode: values are relative weights for random selection.
- In **throughput** mode: values are target operations per second.
- In **batch** mode: `writes` is the number of puts per batch request; `reads`
and `deletes` must be `0`.

### Multipart

Enables multipart uploads for `actions.writes` ops of objects larger than 1 KiB.
Cannot be used in `batch` mode, but can be combined with `throughput` or `weighted`.

| Field | Required | Description |
|---------------|----------|----------------------------------------------------------|
| `concurrency` | yes | Max concurrent part uploads per object |
| `part_size` | yes | Size of each part (min `5 MiB`); last part may be smaller |

```yaml
multipart:
concurrency: 4
part_size: 10 MiB
```

## Modes

### Weighted (default)

Runs at maximum speed with fixed concurrency. Each operation is randomly
selected based on the `actions` weights. Good for saturating the server.

### Throughput

Maintains a fixed operations-per-second rate derived from `actions` values.
Useful for simulating steady-state production traffic.

### Batch

Sends multiple writes in a single HTTP request using the batch ("many") API.
The `actions.writes` value determines how many puts per batch request.
Note that reads and deletes are currently not supported in this mode.
13 changes: 13 additions & 0 deletions stresstest/config/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,16 @@ workloads:
writes: 20000
reads: 0
deletes: 0
- name: large-uploads
concurrency: 4
organizations: 100
file_sizes:
p50: 20 MiB
p99: 100 MiB
multipart:
concurrency: 4
part_size: 10 MiB
actions:
writes: 100
reads: 0
deletes: 0
63 changes: 62 additions & 1 deletion stresstest/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::time::Duration;

use anyhow::bail;
use bytesize::ByteSize;
use serde::Deserialize;
use stresstest::workload::WorkloadMode;
use stresstest::workload::{self, WorkloadMode};

#[derive(Debug, Deserialize)]
pub struct Config {
Expand All @@ -29,6 +30,66 @@ pub struct Workload {
pub file_sizes: FileSizes,
#[serde(default)]
pub actions: Actions,
#[serde(default)]
pub multipart: Option<MultipartConfig>,
}

#[derive(Debug, Deserialize)]
pub struct MultipartConfig {
pub concurrency: usize,
pub part_size: ByteSize,
}

impl Workload {
pub fn validate(&self) -> anyhow::Result<()> {
if matches!(self.mode, WorkloadMode::Batch) {
if self.actions.reads > 0 || self.actions.deletes > 0 {
bail!(
"workload '{}': batch mode only supports writes, but reads={} and deletes={} were configured",
self.name,
self.actions.reads,
self.actions.deletes
);
}
if self.actions.writes == 0 {
bail!(
"workload '{}': batch mode requires actions.writes > 0",
self.name
);
}
}

if let Some(mp) = &self.multipart {
if matches!(self.mode, WorkloadMode::Batch) {
bail!(
"workload '{}': multipart uploads are not supported in batch mode",
self.name
);
}
if mp.part_size.0 < 5 * 1024 * 1024 {
bail!(
"workload '{}': multipart part_size must be at least 5 MiB, got {}",
self.name,
mp.part_size
);
}
if mp.concurrency == 0 {
bail!(
"workload '{}': multipart concurrency must be at least 1",
self.name
);
}
}

Ok(())
}

pub fn multipart_config(&self) -> Option<workload::MultipartConfig> {
self.multipart.as_ref().map(|mp| workload::MultipartConfig {
concurrency: mp.concurrency,
part_size: mp.part_size.0,
})
}
}

fn default_concurrency() -> usize {
Expand Down
59 changes: 57 additions & 2 deletions stresstest/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Contains a remote implementation using HTTP to interact with objectstore.

use anyhow::Context;
use futures::StreamExt;
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use objectstore_client::{Client, GetResponse, Session, Usecase};
use tokio::io::AsyncReadExt;
use tokio_util::io::{ReaderStream, StreamReader};

use crate::workload::Payload;
use crate::workload::{MultipartConfig, Payload};

/// A remote implementation using HTTP to interact with objectstore.
#[derive(Debug)]
Expand Down Expand Up @@ -43,6 +44,60 @@ impl HttpRemote {
.context("error writing payload")
}

pub(crate) async fn write_multipart(
&self,
usecase: &Usecase,
organization_id: u64,
payload: Payload,
config: &MultipartConfig,
) -> anyhow::Result<String> {
let session = self.session(usecase, organization_id);
let total_len = payload.len;
let part_size = config.part_size;

let upload = session
.initiate_multipart_upload()
.compression(None)
.send()
.await
.context("error initiating multipart upload")?;

let num_parts = total_len.div_ceil(part_size) as u32;

// Lazily read chunks from the payload so that at most `concurrency`
// part buffers are in memory at any time (bounded by buffer_unordered).
let chunk_stream = futures::stream::unfold(
(payload, 1u32),
move |(mut payload, part_number)| async move {
if part_number > num_parts {
return None;
}
let chunk_len = part_size.min(payload.len) as usize;
let mut buf = vec![0u8; chunk_len];
payload.read_exact(&mut buf).await.ok()?;
Some(((part_number, Bytes::from(buf)), (payload, part_number + 1)))
},
);

let mut parts: Vec<_> = chunk_stream
.map(|(part_number, chunk)| {
let upload = &upload;
async move { upload.put(chunk, part_number, None).await }
})
.buffer_unordered(config.concurrency)
.try_collect()
.await
.context("error uploading parts")?;

parts.sort_by_key(|p| p.part_number);

upload
.complete(parts)
.await
.map(|key| key.to_string())
.context("error completing multipart upload")
}

pub(crate) async fn read(
&self,
usecase: &Usecase,
Expand Down
22 changes: 4 additions & 18 deletions stresstest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

use std::path::PathBuf;

use anyhow::{Context, bail};
use anyhow::Context;
use argh::FromArgs;
use stresstest::Workload;
use stresstest::http::HttpRemote;
use stresstest::stresstest::Stresstest;
use stresstest::workload::WorkloadMode;

use crate::config::Config;

Expand Down Expand Up @@ -53,22 +52,8 @@ async fn main() -> anyhow::Result<()> {
.cleanup(config.cleanup);

for w in config.workloads {
if matches!(w.mode, WorkloadMode::Batch) {
if w.actions.reads > 0 || w.actions.deletes > 0 {
bail!(
"workload '{}': batch mode only supports writes, but reads={} and deletes={} were configured",
w.name,
w.actions.reads,
w.actions.deletes
);
}
if w.actions.writes == 0 {
bail!(
"workload '{}': batch mode requires actions.writes > 0",
w.name
);
}
}
w.validate()?;
let multipart = w.multipart_config();

let workload = Workload::builder(w.name)
.concurrency(w.concurrency)
Expand All @@ -77,6 +62,7 @@ async fn main() -> anyhow::Result<()> {
.size_distribution(w.file_sizes.p50.0, w.file_sizes.p99.0)
.max_size(w.file_sizes.max.map(|b| b.0))
.action_weights(w.actions.writes, w.actions.reads, w.actions.deletes)
.multipart(multipart)
.build();

stresstest = stresstest.workload(workload);
Expand Down
12 changes: 11 additions & 1 deletion stresstest/src/stresstest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ async fn run_workload(
WorkloadMode::Throughput => 100,
};

let multipart_config = workload.multipart.clone().map(Arc::new);

let semaphore = Arc::new(Semaphore::new(concurrency));
let start = Instant::now();
let deadline = tokio::time::Instant::now() + duration;
Expand Down Expand Up @@ -281,6 +283,7 @@ async fn run_workload(
};


let multipart_config = multipart_config.clone();
let task = async move {
let start = Instant::now();
match action {
Expand All @@ -293,7 +296,14 @@ async fn run_workload(
usecase = usecase.with_expiration_policy(ExpirationPolicy::TimeToLive(ttl));
}

match remote.write(&usecase, organization_id, payload).await {
let use_multipart = multipart_config.is_some() && file_size > 1024;
let result = if use_multipart {
remote.write_multipart(&usecase, organization_id, payload, multipart_config.as_ref().unwrap()).await
} else {
remote.write(&usecase, organization_id, payload).await
};

match result {
Ok(object_key) => {
let external_id = (usecase, organization_id, object_key);
workload.lock().unwrap().push_file(internal_id, external_id);
Expand Down
Loading
Loading