Skip to content
Open
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Perf

### 2026-04-07

- Replace synchronous disk I/O with async operations in snap sync [#6113](https://github.com/lambdaclass/ethrex/pull/6113)

### 2026-03-26

- Eliminate stack-frame spill in Stack::push for zero-upper-limb values [#6390](https://github.com/lambdaclass/ethrex/pull/6390)
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/networking/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ crossbeam.workspace = true

[dev-dependencies]
hex-literal.workspace = true
tempfile = "3"
criterion = { version = "0.5", features = ["html_reports"] }

[lib]
Expand Down
136 changes: 136 additions & 0 deletions crates/networking/p2p/snap/async_fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
//! Async file system utilities for snap sync
//!
//! Provides async wrappers around file system operations to avoid blocking the
//! tokio runtime during disk I/O. `tokio::fs` is used for simple operations
//! (internally delegates to `spawn_blocking`), while explicit `spawn_blocking`
//! is used for `read_dir` to avoid async stream complexity.

use std::path::{Path, PathBuf};

use super::error::SnapError;

/// Ensures a directory exists, creating it if necessary.
pub async fn ensure_dir_exists(path: &Path) -> Result<(), SnapError> {
tokio::fs::create_dir_all(path)
.await
.map_err(|e| SnapError::FileSystem {
operation: "create directory",
path: path.to_path_buf(),
kind: e.kind(),
})
}

/// Reads all file paths from a directory, sorted alphabetically.
///
/// Uses `spawn_blocking` with sync `read_dir` since we always need all paths
/// upfront (for `ingest_external_file` or batch iteration).
pub async fn read_dir_paths(dir: &Path) -> Result<Vec<PathBuf>, SnapError> {
let dir = dir.to_path_buf();
tokio::task::spawn_blocking(move || {
let mut paths: Vec<PathBuf> = std::fs::read_dir(&dir)
.map_err(|e| SnapError::FileSystem {
operation: "read directory",
path: dir.clone(),
kind: e.kind(),
})?
.map(|entry| {
entry.map(|e| e.path()).map_err(|e| SnapError::FileSystem {
operation: "read directory entry",
path: dir.clone(),
kind: e.kind(),
})
})
.collect::<Result<Vec<_>, _>>()?;
paths.sort();
Ok(paths)
})
.await?
}

/// Reads the contents of a file asynchronously.
pub async fn read_file(path: &Path) -> Result<Vec<u8>, SnapError> {
tokio::fs::read(path)
.await
.map_err(|e| SnapError::FileSystem {
operation: "read file",
path: path.to_path_buf(),
kind: e.kind(),
})
}

/// Removes a directory and all its contents asynchronously.
pub async fn remove_dir_all(path: &Path) -> Result<(), SnapError> {
tokio::fs::remove_dir_all(path)
.await
.map_err(|e| SnapError::FileSystem {
operation: "remove directory",
path: path.to_path_buf(),
kind: e.kind(),
})
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;

#[tokio::test]
async fn test_ensure_dir_exists_creates_new() {
let temp = tempdir().unwrap();
let new_dir = temp.path().join("new_dir");

assert!(!new_dir.exists());
ensure_dir_exists(&new_dir).await.unwrap();
assert!(new_dir.exists());
}

#[tokio::test]
async fn test_ensure_dir_exists_idempotent() {
let temp = tempdir().unwrap();
let existing_dir = temp.path().join("existing");
std::fs::create_dir(&existing_dir).unwrap();

ensure_dir_exists(&existing_dir).await.unwrap();
assert!(existing_dir.exists());
}

#[tokio::test]
async fn test_read_dir_paths() {
let temp = tempdir().unwrap();

std::fs::write(temp.path().join("b.txt"), b"b").unwrap();
std::fs::write(temp.path().join("a.txt"), b"a").unwrap();
std::fs::write(temp.path().join("c.txt"), b"c").unwrap();

let paths = read_dir_paths(temp.path()).await.unwrap();

assert_eq!(paths.len(), 3);
assert!(paths[0].ends_with("a.txt"));
assert!(paths[1].ends_with("b.txt"));
assert!(paths[2].ends_with("c.txt"));
}

#[tokio::test]
async fn test_read_file() {
let temp = tempdir().unwrap();
let file_path = temp.path().join("test.bin");

let data = b"hello world";
std::fs::write(&file_path, data).unwrap();

let read_data = read_file(&file_path).await.unwrap();
assert_eq!(read_data, data);
}

#[tokio::test]
async fn test_remove_dir_all() {
let temp = tempdir().unwrap();
let dir = temp.path().join("to_remove");
std::fs::create_dir(&dir).unwrap();
std::fs::write(dir.join("file.txt"), b"data").unwrap();

assert!(dir.exists());
remove_dir_all(&dir).await.unwrap();
assert!(!dir.exists());
}
}
98 changes: 38 additions & 60 deletions crates/networking/p2p/snap/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
GetStorageRanges, GetTrieNodes, StorageRanges, TrieNodes,
},
},
snap::{constants::*, encodable_to_proof, error::SnapError},
snap::{async_fs, constants::*, encodable_to_proof, error::SnapError},
sync::{AccountStorageRoots, SnapBlockSyncState, block_is_stale, update_pivot},
utils::{
AccountsWithStorage, dump_accounts_to_file, dump_storages_to_file,
Expand Down Expand Up @@ -159,13 +159,7 @@ pub async fn request_account_range(
.zip(current_account_states)
.collect::<Vec<(H256, AccountState)>>();

if !std::fs::exists(account_state_snapshots_dir).map_err(|_| {
SnapError::SnapshotDir("State snapshots directory does not exist".to_string())
})? {
std::fs::create_dir_all(account_state_snapshots_dir).map_err(|_| {
SnapError::SnapshotDir("Failed to create state snapshots directory".to_string())
})?;
}
async_fs::ensure_dir_exists(account_state_snapshots_dir).await?;

let account_state_snapshots_dir_cloned = account_state_snapshots_dir.to_path_buf();
write_set.spawn(async move {
Expand All @@ -192,8 +186,6 @@ pub async fn request_account_range(
}

if let Ok((accounts, peer_id, chunk_start_end)) = task_receiver.try_recv() {
// Release the reservation we made before spawning the task.
peers.peer_table.dec_requests(peer_id)?;
if let Some((chunk_start, chunk_end)) = chunk_start_end {
if chunk_start <= chunk_end {
tasks_queue_not_started.push_back((chunk_start, chunk_end));
Expand Down Expand Up @@ -261,15 +253,12 @@ pub async fn request_account_range(
.expect("Should be able to update pivot")
}

// Reserve a request slot before spawning so get_best_peer sees
// this peer as busy immediately, preventing spawn floods.
// Workers call outgoing_request directly (not make_request) to
// avoid a double increment. Released via dec_requests on try_recv.
peers.peer_table.inc_requests(peer_id)?;
let peer_table = peers.peer_table.clone();

tokio::spawn(request_account_range_worker(
peer_id,
connection,
peer_table,
chunk_start,
chunk_end,
pivot_header.state_root,
Expand All @@ -294,13 +283,7 @@ pub async fn request_account_range(
.zip(current_account_states)
.collect::<Vec<(H256, AccountState)>>();

if !std::fs::exists(account_state_snapshots_dir).map_err(|_| {
SnapError::SnapshotDir("State snapshots directory does not exist".to_string())
})? {
std::fs::create_dir_all(account_state_snapshots_dir).map_err(|_| {
SnapError::SnapshotDir("Failed to create state snapshots directory".to_string())
})?;
}
async_fs::ensure_dir_exists(account_state_snapshots_dir).await?;

let path = get_account_state_snapshot_file(account_state_snapshots_dir, chunk_file);
dump_accounts_to_file(&path, account_state_chunk)
Expand Down Expand Up @@ -394,8 +377,6 @@ pub async fn request_bytecodes(
remaining_start,
remaining_end,
} = result;
// Release the reservation we made before spawning the task.
peers.peer_table.dec_requests(peer_id)?;

debug!(
"Downloaded {} bytecodes from peer {peer_id} (current count: {downloaded_count})",
Expand Down Expand Up @@ -455,8 +436,7 @@ pub async fn request_bytecodes(
.copied()
.collect();

// Reserve a request slot before spawning (see account range comment).
peers.peer_table.inc_requests(peer_id)?;
let peer_table = peers.peer_table.clone();

tokio::spawn(async move {
let empty_task_result = TaskResult {
Expand All @@ -475,10 +455,14 @@ pub async fn request_bytecodes(
hashes: hashes_to_request.clone(),
bytes: MAX_RESPONSE_BYTES,
});
// The caller already holds a request reservation for this peer,
// so call outgoing_request directly to avoid a double increment.
if let Ok(RLPxMessage::ByteCodes(ByteCodes { id: _, codes })) = connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
if let Ok(RLPxMessage::ByteCodes(ByteCodes { id: _, codes })) =
PeerHandler::make_request(
&peer_table,
peer_id,
&mut connection,
request,
PEER_REPLY_TIMEOUT,
)
.await
{
if codes.is_empty() {
Expand Down Expand Up @@ -616,15 +600,8 @@ pub async fn request_storage_ranges(
let current_account_storages = std::mem::take(&mut current_account_storages);
let snapshot = current_account_storages.into_values().collect::<Vec<_>>();

if !std::fs::exists(account_storages_snapshots_dir).map_err(|_| {
SnapError::SnapshotDir("Storage snapshots directory does not exist".to_string())
})? {
std::fs::create_dir_all(account_storages_snapshots_dir).map_err(|_| {
SnapError::SnapshotDir(
"Failed to create storage snapshots directory".to_string(),
)
})?;
}
async_fs::ensure_dir_exists(account_storages_snapshots_dir).await?;

let account_storages_snapshots_dir_cloned =
account_storages_snapshots_dir.to_path_buf();
if !disk_joinset.is_empty() {
Expand Down Expand Up @@ -657,8 +634,6 @@ pub async fn request_storage_ranges(
remaining_end,
remaining_hash_range: (hash_start, hash_end),
} = result;
// Release the reservation we made before spawning the task.
peers.peer_table.dec_requests(peer_id)?;
completed_tasks += 1;

for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() {
Expand Down Expand Up @@ -994,13 +969,13 @@ pub async fn request_storage_ranges(
chunk_storage_roots.first().unwrap_or(&H256::zero()),
);
}
// Reserve a request slot before spawning (see account range comment).
peers.peer_table.inc_requests(peer_id)?;
let peer_table = peers.peer_table.clone();

tokio::spawn(request_storage_ranges_worker(
task,
peer_id,
connection,
peer_table,
pivot_header.state_root,
chunk_account_hashes,
chunk_storage_roots,
Expand All @@ -1011,13 +986,8 @@ pub async fn request_storage_ranges(
{
let snapshot = current_account_storages.into_values().collect::<Vec<_>>();

if !std::fs::exists(account_storages_snapshots_dir).map_err(|_| {
SnapError::SnapshotDir("Storage snapshots directory does not exist".to_string())
})? {
std::fs::create_dir_all(account_storages_snapshots_dir).map_err(|_| {
SnapError::SnapshotDir("Failed to create storage snapshots directory".to_string())
})?;
}
async_fs::ensure_dir_exists(account_storages_snapshots_dir).await?;

let path = get_account_storages_snapshot_file(account_storages_snapshots_dir, chunk_index);
dump_storages_to_file(&path, snapshot).map_err(|_| {
SnapError::SnapshotDir(format!(
Expand Down Expand Up @@ -1153,6 +1123,7 @@ pub async fn request_storage_trienodes(
async fn request_account_range_worker(
peer_id: H256,
mut connection: PeerConnection,
peer_table: PeerTable,
chunk_start: H256,
chunk_end: H256,
state_root: H256,
Expand All @@ -1171,11 +1142,14 @@ async fn request_account_range_worker(
id: _,
accounts,
proof,
// The caller already holds a request reservation for this peer,
// so call outgoing_request directly to avoid a double increment.
})) = connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await
})) = PeerHandler::make_request(
&peer_table,
peer_id,
&mut connection,
request,
PEER_REPLY_TIMEOUT,
)
.await
{
if accounts.is_empty() {
tx.send((Vec::new(), peer_id, Some((chunk_start, chunk_end))))
Expand Down Expand Up @@ -1251,6 +1225,7 @@ async fn request_storage_ranges_worker(
task: StorageTask,
peer_id: H256,
mut connection: PeerConnection,
peer_table: PeerTable,
state_root: H256,
chunk_account_hashes: Vec<H256>,
chunk_storage_roots: Vec<H256>,
Expand Down Expand Up @@ -1281,11 +1256,14 @@ async fn request_storage_ranges_worker(
id: _,
slots,
proof,
// The caller already holds a request reservation for this peer,
// so call outgoing_request directly to avoid a double increment.
})) = connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await
})) = PeerHandler::make_request(
&peer_table,
peer_id,
&mut connection,
request,
PEER_REPLY_TIMEOUT,
)
.await
else {
tracing::debug!("Failed to get storage range");
tx.send(empty_task_result).await.ok();
Expand Down
2 changes: 1 addition & 1 deletion crates/networking/p2p/snap/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::rlpx::error::PeerConnectionError;
use ethrex_rlp::error::RLPDecodeError;
use ethrex_storage::error::StoreError;
use ethrex_trie::TrieError;
use spawned_concurrency::error::ActorError;
use spawned_concurrency::ActorError;
use std::io::ErrorKind;
use std::path::PathBuf;
use thiserror::Error;
Expand Down
1 change: 1 addition & 0 deletions crates/networking/p2p/snap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! - `constants`: Protocol constants and configuration values
//! - `error`: Unified error types for snap protocol operations

pub mod async_fs;
pub mod client;
pub mod constants;
pub mod error;
Expand Down
Loading
Loading