Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,19 @@ impl ServeCommand {

let instance = linker.instantiate_pre(&component)?;

// Tokio by default sets `SO_REUSEADDR` for listeners but that makes it
// a bit confusing if you run Wasmtime but forget to close a previous
// `serve` session. To avoid that we explicitly disable `SO_REUSEADDR`
// here.
let socket = match &self.addr {
SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
};
socket.set_reuseaddr(false)?;
// Conditionally enable `SO_REUSEADDR` depending on the current
// platform. On Unix we want this to be able to rebind an address in
// the `TIME_WAIT` state which can happen then a server is killed with
// active TCP connections and then restarted. On Windows though if
// `SO_REUSEADDR` is specified then it enables multiple applications to
// bind the port at the same time which is not something we want. Hence
// this is conditionally set based on the platform (and deviates from
// Tokio's default from always-on).
socket.set_reuseaddr(!cfg!(windows))?;
socket.bind(self.addr)?;
let listener = socket.listen(100)?;

Expand Down
109 changes: 98 additions & 11 deletions tests/all/cli_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1461,11 +1461,15 @@ mod test_programs {
// Spawn `wasmtime serve` on port 0 which will randomly assign it a
// port.
let mut cmd = super::get_wasmtime_command()?;
cmd.arg("serve").arg("--addr=127.0.0.1:0").arg(wasm);
configure(&mut cmd);
Self::spawn(&mut cmd)
}

fn spawn(cmd: &mut Command) -> Result<WasmtimeServe> {
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.arg("serve").arg("--addr=127.0.0.1:0").arg(wasm);
configure(&mut cmd);
let mut child = cmd.spawn()?;

// Read the first line of stderr which will say which address it's
Expand Down Expand Up @@ -1526,15 +1530,7 @@ mod test_programs {

/// Send a request to this server and wait for the response.
async fn send_request(&self, req: http::Request<String>) -> Result<http::Response<String>> {
let tcp = TcpStream::connect(&self.addr)
.await
.context("failed to connect")?;
let tcp = wasmtime_wasi_http::io::TokioIo::new(tcp);
let (mut send, conn) = hyper::client::conn::http1::handshake(tcp)
.await
.context("failed http handshake")?;

let conn_task = tokio::task::spawn(conn);
let (mut send, conn_task) = self.start_requests().await?;

let response = send
.send_request(req)
Expand All @@ -1551,6 +1547,22 @@ mod test_programs {

Ok(http::Response::from_parts(parts, body))
}

async fn start_requests(
&self,
) -> Result<(
hyper::client::conn::http1::SendRequest<String>,
tokio::task::JoinHandle<hyper::Result<()>>,
)> {
let tcp = TcpStream::connect(&self.addr)
.await
.context("failed to connect")?;
let tcp = wasmtime_wasi_http::io::TokioIo::new(tcp);
let (send, conn) = hyper::client::conn::http1::handshake(tcp)
.await
.context("failed http handshake")?;
Ok((send, tokio::task::spawn(conn)))
}
}

// Don't leave child processes running by accident so kill the child process
Expand Down Expand Up @@ -1679,6 +1691,81 @@ mod test_programs {
}
Ok(())
}

#[tokio::test]
async fn cli_serve_only_one_process_allowed() -> Result<()> {
let wasm = CLI_SERVE_ECHO_ENV_COMPONENT;
let server = WasmtimeServe::new(wasm, |cmd| {
cmd.arg("-Scli");
})?;

let err = WasmtimeServe::spawn(
super::get_wasmtime_command()?
.arg("serve")
.arg("-Scli")
.arg(format!("--addr={}", server.addr))
.arg(wasm),
)
.err()
.expect("server spawn should have failed but it succeeded");
drop(server);

let err = format!("{err:?}");
println!("{err}");
assert!(err.contains("os error"));
Ok(())
}

// Technically this test is a little racy. This binds port 0 to acquire a
// random port, issues a single request to this port, but then kills this
// server while the request is still processing. The port is then rebound
// in the next process while it technically could be stolen by another
// process.
#[tokio::test]
async fn cli_serve_quick_rebind_allowed() -> Result<()> {
let wasm = CLI_SERVE_ECHO_ENV_COMPONENT;
let server = WasmtimeServe::new(wasm, |cmd| {
cmd.arg("-Scli");
})?;
let addr = server.addr;

// Start up a `send` and `conn_task` which represents a connection to
// this server.
let (mut send, conn_task) = server.start_requests().await?;
let _ = send
.send_request(
hyper::Request::builder()
.uri("http://localhost/")
.header("env", "FOO")
.body(String::new())
.context("failed to make request")?,
)
.await;

// ... once a response has been received (or at least the status
// code/headers) then kill the server. THis is done while `conn_task`
// and `send` are still alive so we're guaranteed that the other side
// got a request (we got a response) and our connection is still open.
//
// This forces the address/port into the `TIME_WAIT` state. The rebind
// below in the next process will fail if `SO_REUSEADDR` isn't set.
drop(server);
drop(send);
let _ = conn_task.await;

// If this is successfully bound then we'll create `WasmtimeServe`
// which reads off the first line of output to know which address was
// bound.
let _server2 = WasmtimeServe::spawn(
super::get_wasmtime_command()?
.arg("serve")
.arg("-Scli")
.arg(format!("--addr={addr}"))
.arg(wasm),
)?;

Ok(())
}
}

#[test]
Expand Down