Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,18 @@ jobs:
- name: Run tests
run: sudo -E $(which cargo) test
working-directory: crates/memtrack

benchmarks:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: moonrepo/setup-rust@v1
with:
bins: cargo-codspeed
- name: Build benchmarks
run: cargo codspeed build -p runner-shared
- name: Run benchmarks
uses: CodSpeedHQ/action@v4
with:
mode: instrumentation
run: cargo codspeed run -p runner-shared
111 changes: 109 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 69 additions & 24 deletions crates/memtrack/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use clap::Parser;
use ipc_channel::ipc::{self};
use log::{debug, info};
use memtrack::{MemtrackIpcMessage, Tracker, handle_ipc_message};
use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact, MemtrackEvent};
use std::path::PathBuf;
use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact, MemtrackEvent, MemtrackWriter};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

#[derive(Parser)]
#[command(name = "memtrack")]
Expand Down Expand Up @@ -51,10 +53,8 @@ fn main() -> Result<()> {
} => {
debug!("Starting memtrack for command: {command}");

let (root_pid, events, status) =
track_command(&command, ipc_server).context("Failed to track command")?;
let result = MemtrackArtifact { events };
result.save_with_pid_to(&out_dir, root_pid as libc::pid_t)?;
let status =
track_command(&command, ipc_server, &out_dir).context("Failed to track command")?;

std::process::exit(status.code().unwrap_or(1));
}
Expand All @@ -64,7 +64,8 @@ fn main() -> Result<()> {
fn track_command(
cmd_string: &str,
ipc_server_name: Option<String>,
) -> anyhow::Result<(u32, Vec<MemtrackEvent>, std::process::ExitStatus)> {
out_dir: &Path,
) -> anyhow::Result<std::process::ExitStatus> {
let tracker = Tracker::new()?;

let tracker_arc = Arc::new(Mutex::new(tracker));
Expand Down Expand Up @@ -95,37 +96,81 @@ fn track_command(
let event_rx = { tracker_arc.lock().unwrap().track(root_pid)? };
info!("Spawned child with pid {root_pid}");

// Spawn event processing thread
let process_events = Arc::new(AtomicBool::new(true));
let process_events_clone = process_events.clone();
let processing_thread = thread::spawn(move || {
let mut events = Vec::new();
loop {
if !process_events_clone.load(Ordering::Relaxed) {
break;
}
// Generate output file name and create file for streaming events
let file_name = MemtrackArtifact::file_name(Some(root_pid));
let out_file = std::fs::File::create(out_dir.join(file_name))?;

let Ok(event) = event_rx.try_recv() else {
let (write_tx, write_rx) = channel::<MemtrackEvent>();

// Stage A: Fast drain thread - This is required so that we immediately clear the ring buffer
// because it only has a limited size.
static DRAIN_EVENTS: AtomicBool = AtomicBool::new(true);
let write_tx_clone = write_tx.clone();
let drain_thread = thread::spawn(move || {
// Regular draining loop
while DRAIN_EVENTS.load(Ordering::Relaxed) {
let Ok(event) = event_rx.recv_timeout(Duration::from_millis(100)) else {
continue;
};
let _ = write_tx_clone.send(event.into());
}

events.push(event.into());
// Final aggressive drain - keep trying until truly empty
loop {
match event_rx.try_recv() {
Ok(event) => {
let _ = write_tx_clone.send(event.into());
}
Err(_) => {
// Sleep briefly and try once more to catch late arrivals
thread::sleep(Duration::from_millis(50));
if let Ok(event) = event_rx.try_recv() {
let _ = write_tx_clone.send(event.into());
} else {
break;
}
}
}
}
events
});

// Stage B: Writer thread - Immediately writes the events to disk
let writer_thread = thread::spawn(move || -> anyhow::Result<()> {
let mut writer = MemtrackWriter::new(out_file)?;

while let Ok(first) = write_rx.recv() {
writer.write_event(&first)?;

// Drain any backlog in a tight loop (batching)
while let Ok(ev) = write_rx.try_recv() {
writer.write_event(&ev)?;
}
}
writer.finish()?;

Ok(())
});

// Wait for the command to complete
let status = child.wait().context("Failed to wait for command")?;
info!("Command exited with status: {status}");

info!("Waiting for the event processing thread to finish");
process_events.store(false, Ordering::Relaxed);
let events = processing_thread
// Wait for drain thread to finish
info!("Waiting for the drain thread to finish");
DRAIN_EVENTS.store(false, Ordering::Relaxed);
drain_thread
.join()
.map_err(|_| anyhow::anyhow!("Failed to join drain thread"))?;

// Wait for writer thread to finish and propagate errors
info!("Waiting for the writer thread to finish");
drop(write_tx);
writer_thread
.join()
.map_err(|_| anyhow::anyhow!("Failed to join event thread"))?;
.map_err(|_| anyhow::anyhow!("Failed to join writer thread"))??;

// IPC thread will exit when channel closes
drop(ipc_handle);

Ok((root_pid as u32, events, status))
Ok(status)
}
9 changes: 9 additions & 0 deletions crates/runner-shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,12 @@ log = { workspace = true }
rmp = "0.8.14"
rmp-serde = "1.3.0"
libc = { workspace = true }
zstd = "0.13"

[dev-dependencies]
divan = { version = "4.2.0", package = "codspeed-divan-compat" }
rand = "0.8"

[[bench]]
name = "memtrack_writer"
harness = false
Loading
Loading