diff --git a/crates/test-programs/src/bin/preview2_tcp_streams.rs b/crates/test-programs/src/bin/preview2_tcp_streams.rs new file mode 100644 index 000000000000..a9a01b5ec692 --- /dev/null +++ b/crates/test-programs/src/bin/preview2_tcp_streams.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use test_programs::wasi::io::streams::StreamError; +use test_programs::wasi::sockets::network::{IpAddress, IpAddressFamily, IpSocketAddress, Network}; +use test_programs::wasi::sockets::tcp::{ShutdownType, TcpSocket}; + +/// InputStream::read should return `StreamError::Closed` after the connection has been shut down by the server. +fn test_tcp_read_from_closed_input_stream(net: &Network, family: IpAddressFamily) { + // Set up server & client sockets: + let bind_address = IpSocketAddress::new(IpAddress::new_loopback(family), 0); + let listener = TcpSocket::new(family).unwrap(); + listener.blocking_bind(&net, bind_address).unwrap(); + listener.blocking_listen().unwrap(); + let bound_address = listener.local_address().unwrap(); + let client = TcpSocket::new(family).unwrap(); + let (connected_input, connected_output) = client.blocking_connect(net, bound_address).unwrap(); + let (accepted, accepted_input, accepted_output) = listener.blocking_accept().unwrap(); + + // Shut down the connection from the server side and give the kernel a bit + // of time to propagate the shutdown signal from the server socket to the + // client socket. + accepted.shutdown(ShutdownType::Both).unwrap(); + drop(accepted_input); + drop(accepted_output); + drop(accepted); + std::thread::sleep(Duration::from_millis(50)); + + // And now the actual test: + + // The input stream should immediately signal StreamError::Closed. + // Notably, it should _not_ return an empty list (the wasi-io equivalent of EWOULDBLOCK) + // See: https://github.com/bytecodealliance/wasmtime/pull/8968 + assert!(matches!(connected_input.read(10), Err(StreamError::Closed))); // If this randomly fails, try tweaking the timeout above. + + // Stream should still be closed, even when requesting 0 bytes: + assert!(matches!(connected_input.read(0), Err(StreamError::Closed))); + + drop(connected_input); + drop(connected_output); + drop(client); +} + +fn main() { + let net = Network::default(); + + test_tcp_read_from_closed_input_stream(&net, IpAddressFamily::Ipv4); + test_tcp_read_from_closed_input_stream(&net, IpAddressFamily::Ipv6); +} diff --git a/crates/wasi/src/tcp.rs b/crates/wasi/src/tcp.rs index e385874d9396..ff020525b36f 100644 --- a/crates/wasi/src/tcp.rs +++ b/crates/wasi/src/tcp.rs @@ -696,7 +696,7 @@ impl HostInputStream for TcpReadStream { // A 0-byte read indicates that the stream has closed. Ok(0) => { self.closed = true; - 0 + return Err(StreamError::Closed); } Ok(n) => n, diff --git a/crates/wasi/tests/all/async_.rs b/crates/wasi/tests/all/async_.rs index 84073e31d865..7d25423726b2 100644 --- a/crates/wasi/tests/all/async_.rs +++ b/crates/wasi/tests/all/async_.rs @@ -324,6 +324,10 @@ async fn preview2_tcp_states() { run(PREVIEW2_TCP_STATES_COMPONENT, false).await.unwrap() } #[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn preview2_tcp_streams() { + run(PREVIEW2_TCP_STREAMS_COMPONENT, false).await.unwrap() +} +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn preview2_tcp_bind() { run(PREVIEW2_TCP_BIND_COMPONENT, false).await.unwrap() } diff --git a/crates/wasi/tests/all/sync.rs b/crates/wasi/tests/all/sync.rs index aa02d3be7761..5629d5d3ffd8 100644 --- a/crates/wasi/tests/all/sync.rs +++ b/crates/wasi/tests/all/sync.rs @@ -270,6 +270,10 @@ fn preview2_tcp_states() { run(PREVIEW2_TCP_STATES_COMPONENT, false).unwrap() } #[test_log::test] +fn preview2_tcp_streams() { + run(PREVIEW2_TCP_STREAMS_COMPONENT, false).unwrap() +} +#[test_log::test] fn preview2_tcp_bind() { run(PREVIEW2_TCP_BIND_COMPONENT, false).unwrap() }