From a1de4ada28d62ee77af59b73ea01ff4dad4fe7ba Mon Sep 17 00:00:00 2001 From: Heap-Hop Date: Sat, 25 Jan 2025 16:31:06 +0900 Subject: [PATCH 1/3] fix InputStream blocking_read spurious wake up --- crates/wasi-io/src/streams.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/wasi-io/src/streams.rs b/crates/wasi-io/src/streams.rs index 8f2392697c7e..bc9b11c4e40e 100644 --- a/crates/wasi-io/src/streams.rs +++ b/crates/wasi-io/src/streams.rs @@ -24,8 +24,15 @@ pub trait InputStream: Pollable { /// Similar to `read`, except that it blocks until at least one byte can be /// read. async fn blocking_read(&mut self, size: usize) -> StreamResult { - self.ready().await; - self.read(size) + loop { + // This `ready` call may return early due to `io::ErrorKind::WouldBlock`. + self.ready().await; + let data = self.read(size)?; + if data.is_empty() { + continue; + } + return Ok(data); + } } /// Same as the `read` method except that bytes are skipped. From 575052c785a12c46ebd5f00b20c1fb56f20d98e0 Mon Sep 17 00:00:00 2001 From: Heap-Hop Date: Tue, 28 Jan 2025 14:29:46 +0900 Subject: [PATCH 2/3] add MAX_BLOCKING_ATTEMPTS and also wrap for `write_ready` --- crates/wasi-io/src/streams.rs | 36 +++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/crates/wasi-io/src/streams.rs b/crates/wasi-io/src/streams.rs index bc9b11c4e40e..5364f8e54e5b 100644 --- a/crates/wasi-io/src/streams.rs +++ b/crates/wasi-io/src/streams.rs @@ -3,6 +3,15 @@ use alloc::boxed::Box; use anyhow::Result; use bytes::Bytes; +/// `Pollable::ready()` for `InputStream` and `OutputStream` may return +/// prematurely due to `io::ErrorKind::WouldBlock`. +/// +/// To ensure that `blocking_` functions return a valid non-empty result, +/// we use a loop with a maximum iteration limit. +/// +/// This constant defines the maximum number of loop attempts allowed. +const MAX_BLOCKING_ATTEMPTS: u8 = 10; + /// Host trait for implementing the `wasi:io/streams.input-stream` resource: A /// bytestream which can be read from. #[async_trait::async_trait] @@ -24,14 +33,18 @@ pub trait InputStream: Pollable { /// Similar to `read`, except that it blocks until at least one byte can be /// read. async fn blocking_read(&mut self, size: usize) -> StreamResult { + let mut i = 0; loop { - // This `ready` call may return early due to `io::ErrorKind::WouldBlock`. + // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`. self.ready().await; let data = self.read(size)?; - if data.is_empty() { - continue; + if !data.is_empty() { + return Ok(data); } - return Ok(data); + if i >= MAX_BLOCKING_ATTEMPTS { + return Err(StreamError::trap("max blocking attempts exceeded")); + } + i += 1; } } @@ -246,8 +259,19 @@ pub trait OutputStream: Pollable { /// Simultaneously waits for this stream to be writable and then returns how /// much may be written or the last error that happened. async fn write_ready(&mut self) -> StreamResult { - self.ready().await; - self.check_write() + let mut i = 0; + loop { + // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`. + self.ready().await; + let n = self.check_write()?; + if n > 0 { + return Ok(n); + } + if i >= MAX_BLOCKING_ATTEMPTS { + return Err(StreamError::trap("max blocking attempts exceeded")); + } + i += 1; + } } /// Cancel any asynchronous work and wait for it to wrap up. From a2e65720e1b5b489d0279a7033383a32baaba62d Mon Sep 17 00:00:00 2001 From: Heap-Hop Date: Mon, 3 Feb 2025 16:56:15 +0900 Subject: [PATCH 3/3] avoid loop when reading zero size --- crates/wasi-io/src/streams.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/wasi-io/src/streams.rs b/crates/wasi-io/src/streams.rs index 5364f8e54e5b..402de016d581 100644 --- a/crates/wasi-io/src/streams.rs +++ b/crates/wasi-io/src/streams.rs @@ -33,6 +33,11 @@ pub trait InputStream: Pollable { /// Similar to `read`, except that it blocks until at least one byte can be /// read. async fn blocking_read(&mut self, size: usize) -> StreamResult { + if size == 0 { + self.ready().await; + return self.read(size); + } + let mut i = 0; loop { // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.