diff --git a/crates/wasi-io/src/streams.rs b/crates/wasi-io/src/streams.rs index 8f2392697c7e..402de016d581 100644 --- a/crates/wasi-io/src/streams.rs +++ b/crates/wasi-io/src/streams.rs @@ -3,6 +3,15 @@ use alloc::boxed::Box; use anyhow::Result; use bytes::Bytes; +/// `Pollable::ready()` for `InputStream` and `OutputStream` may return +/// prematurely due to `io::ErrorKind::WouldBlock`. +/// +/// To ensure that `blocking_` functions return a valid non-empty result, +/// we use a loop with a maximum iteration limit. +/// +/// This constant defines the maximum number of loop attempts allowed. +const MAX_BLOCKING_ATTEMPTS: u8 = 10; + /// Host trait for implementing the `wasi:io/streams.input-stream` resource: A /// bytestream which can be read from. #[async_trait::async_trait] @@ -24,8 +33,24 @@ pub trait InputStream: Pollable { /// Similar to `read`, except that it blocks until at least one byte can be /// read. async fn blocking_read(&mut self, size: usize) -> StreamResult { - self.ready().await; - self.read(size) + if size == 0 { + self.ready().await; + return self.read(size); + } + + let mut i = 0; + loop { + // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`. + self.ready().await; + let data = self.read(size)?; + if !data.is_empty() { + return Ok(data); + } + if i >= MAX_BLOCKING_ATTEMPTS { + return Err(StreamError::trap("max blocking attempts exceeded")); + } + i += 1; + } } /// Same as the `read` method except that bytes are skipped. @@ -239,8 +264,19 @@ pub trait OutputStream: Pollable { /// Simultaneously waits for this stream to be writable and then returns how /// much may be written or the last error that happened. async fn write_ready(&mut self) -> StreamResult { - self.ready().await; - self.check_write() + let mut i = 0; + loop { + // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`. + self.ready().await; + let n = self.check_write()?; + if n > 0 { + return Ok(n); + } + if i >= MAX_BLOCKING_ATTEMPTS { + return Err(StreamError::trap("max blocking attempts exceeded")); + } + i += 1; + } } /// Cancel any asynchronous work and wait for it to wrap up.