Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/wasi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ url = { workspace = true }
once_cell = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["time", "sync", "io-std", "io-util", "rt", "rt-multi-thread", "net", "macros"] }
tokio = { workspace = true, features = ["time", "sync", "io-std", "io-util", "rt", "rt-multi-thread", "net", "macros", "fs"] }
test-log = { workspace = true }
tracing-subscriber = { workspace = true }
test-programs-artifacts = { workspace = true }
Expand Down
16 changes: 16 additions & 0 deletions crates/wasi/src/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "preview1")]
use crate::WasiP1Ctx;
use crate::{
clocks::{
host::{monotonic_clock, wall_clock},
Expand Down Expand Up @@ -127,6 +129,10 @@ impl WasiCtxBuilder {
self
}

pub fn inherit_env(&mut self) -> &mut Self {
self.envs(&std::env::vars().collect::<Vec<(String, String)>>())
}

pub fn args(&mut self, args: &[impl AsRef<str>]) -> &mut Self {
self.args.extend(args.iter().map(|a| a.as_ref().to_owned()));
self
Expand All @@ -137,6 +143,10 @@ impl WasiCtxBuilder {
self
}

pub fn inherit_args(&mut self) -> &mut Self {
self.args(&std::env::args().collect::<Vec<String>>())
}

pub fn preopened_dir(
&mut self,
dir: cap_std::fs::Dir,
Expand Down Expand Up @@ -273,6 +283,12 @@ impl WasiCtxBuilder {
allowed_network_uses,
}
}

#[cfg(feature = "preview1")]
pub fn build_p1(&mut self) -> WasiP1Ctx {
let wasi = self.build();
WasiP1Ctx::new(wasi)
}
}

pub trait WasiView: Send {
Expand Down
7 changes: 6 additions & 1 deletion crates/wasi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ mod filesystem;
mod host;
mod ip_name_lookup;
mod network;
#[cfg(feature = "preview1")]
mod p1ctx;
pub mod pipe;
mod poll;
#[cfg(feature = "preview1")]
Expand All @@ -34,10 +36,13 @@ pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiView};
pub use self::error::{I32Exit, TrappableError};
pub use self::filesystem::{DirPerms, FilePerms, FsError, FsResult};
pub use self::network::{Network, SocketError, SocketResult};
#[cfg(feature = "preview1")]
pub use self::p1ctx::WasiP1Ctx;
pub use self::poll::{subscribe, ClosureFuture, MakeFuture, Pollable, PollableFuture, Subscribe};
pub use self::random::{thread_rng, Deterministic};
pub use self::stdio::{
stderr, stdin, stdout, IsATTY, Stderr, Stdin, StdinStream, Stdout, StdoutStream,
stderr, stdin, stdout, AsyncStdinStream, AsyncStdoutStream, IsATTY, Stderr, Stdin, StdinStream,
Stdout, StdoutStream,
};
pub use self::stream::{
HostInputStream, HostOutputStream, InputStream, OutputStream, StreamError, StreamResult,
Expand Down
37 changes: 37 additions & 0 deletions crates/wasi/src/p1ctx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use crate::preview1::{WasiPreview1Adapter, WasiPreview1View};
use crate::{WasiCtx, WasiView};
use wasmtime::component::ResourceTable;

pub struct WasiP1Ctx {
pub table: ResourceTable,
pub wasi: WasiCtx,
pub adapter: WasiPreview1Adapter,
}

impl WasiP1Ctx {
pub fn new(wasi: WasiCtx) -> Self {
Self {
table: ResourceTable::new(),
wasi,
adapter: WasiPreview1Adapter::new(),
}
}
}

impl WasiView for WasiP1Ctx {
fn table(&mut self) -> &mut ResourceTable {
&mut self.table
}
fn ctx(&mut self) -> &mut WasiCtx {
&mut self.wasi
}
}

impl WasiPreview1View for WasiP1Ctx {
fn adapter(&self) -> &WasiPreview1Adapter {
&self.adapter
}
fn adapter_mut(&mut self) -> &mut WasiPreview1Adapter {
&mut self.adapter
}
}
4 changes: 2 additions & 2 deletions crates/wasi/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ pub struct MemoryInputPipe {
}

impl MemoryInputPipe {
pub fn new(bytes: Bytes) -> Self {
pub fn new(bytes: impl Into<Bytes>) -> Self {
Self {
buffer: Arc::new(Mutex::new(bytes)),
buffer: Arc::new(Mutex::new(bytes.into())),
}
}

Expand Down
10 changes: 6 additions & 4 deletions crates/wasi/src/preview0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ use crate::preview1::wasi_snapshot_preview1::WasiSnapshotPreview1 as Snapshot1;
use crate::preview1::WasiPreview1View;
use wiggle::{GuestError, GuestPtr};

pub fn add_to_linker_async<T: WasiPreview1View>(
pub fn add_to_linker_async<T: Send, W: WasiPreview1View>(
linker: &mut wasmtime::Linker<T>,
f: impl Fn(&mut T) -> &mut W + Copy + Send + Sync + 'static,
) -> anyhow::Result<()> {
wasi_unstable::add_to_linker(linker, |t| t)
wasi_unstable::add_to_linker(linker, f)
}

pub fn add_to_linker_sync<T: WasiPreview1View>(
pub fn add_to_linker_sync<T: Send, W: WasiPreview1View>(
linker: &mut wasmtime::Linker<T>,
f: impl Fn(&mut T) -> &mut W + Copy + Send + Sync + 'static,
) -> anyhow::Result<()> {
sync::add_wasi_unstable_to_linker(linker, |t| t)
sync::add_wasi_unstable_to_linker(linker, f)
}

wiggle::from_witx!({
Expand Down
13 changes: 7 additions & 6 deletions crates/wasi/src/preview1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,18 +490,19 @@ trait WasiPreview1ViewExt:

impl<T: WasiPreview1View + preopens::Host> WasiPreview1ViewExt for T {}

pub fn add_to_linker_async<T: WasiPreview1View>(
pub fn add_to_linker_async<T: Send, W: WasiPreview1View>(
linker: &mut wasmtime::Linker<T>,
f: impl Fn(&mut T) -> &mut W + Copy + Send + Sync + 'static,
) -> anyhow::Result<()> {
wasi_snapshot_preview1::add_to_linker(linker, |t| t)
crate::preview1::wasi_snapshot_preview1::add_to_linker(linker, f)
}

pub fn add_to_linker_sync<T: WasiPreview1View>(
pub fn add_to_linker_sync<T: Send, W: WasiPreview1View>(
linker: &mut wasmtime::Linker<T>,
f: impl Fn(&mut T) -> &mut W + Copy + Send + Sync + 'static,
) -> anyhow::Result<()> {
sync::add_wasi_snapshot_preview1_to_linker(linker, |t| t)
crate::preview1::sync::add_wasi_snapshot_preview1_to_linker(linker, f)
}

// Generate the wasi_snapshot_preview1::WasiSnapshotPreview1 trait,
// and the module types.
// None of the generated modules, traits, or types should be used externally
Expand All @@ -520,7 +521,7 @@ wiggle::from_witx!({
errors: { errno => trappable Error },
});

mod sync {
pub(crate) mod sync {
use anyhow::Result;
use std::future::Future;

Expand Down
186 changes: 186 additions & 0 deletions crates/wasi/src/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use crate::bindings::io::streams;
use crate::pipe;
use crate::{HostInputStream, HostOutputStream, StreamError, StreamResult, Subscribe, WasiView};
use bytes::Bytes;
use std::future::Future;
use std::io::IsTerminal;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use wasmtime::component::Resource;

/// A trait used to represent the standard input to a guest program.
Expand Down Expand Up @@ -54,6 +58,52 @@ impl StdinStream for pipe::ClosedInputStream {
}
}

/// An impl of [`StdinStream`] built on top of [`crate::pipe::AsyncReadStream`].
pub struct AsyncStdinStream(Arc<Mutex<crate::pipe::AsyncReadStream>>);

impl AsyncStdinStream {
pub fn new(s: crate::pipe::AsyncReadStream) -> Self {
Self(Arc::new(Mutex::new(s)))
}
}

impl StdinStream for AsyncStdinStream {
fn stream(&self) -> Box<dyn HostInputStream> {
Box::new(Self(self.0.clone()))
}
fn isatty(&self) -> bool {
false
}
}

impl HostInputStream for AsyncStdinStream {
fn read(&mut self, size: usize) -> Result<bytes::Bytes, crate::StreamError> {
self.0.lock().unwrap().read(size)
}
fn skip(&mut self, size: usize) -> Result<usize, crate::StreamError> {
self.0.lock().unwrap().skip(size)
}
}

impl Subscribe for AsyncStdinStream {
fn ready<'a, 'b>(&'a mut self) -> Pin<Box<dyn Future<Output = ()> + Send + 'b>>
where
Self: 'b,
'a: 'b,
{
struct F(AsyncStdinStream);
impl Future for F {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut inner = self.0 .0.lock().unwrap();
let mut fut = inner.ready();
fut.as_mut().poll(cx)
}
}
Box::pin(F(Self(self.0.clone())))
}
}

mod worker_thread_stdin;
pub use self::worker_thread_stdin::{stdin, Stdin};

Expand Down Expand Up @@ -181,6 +231,72 @@ impl Subscribe for OutputStream {
async fn ready(&mut self) {}
}

/// A wrapper of [`crate::pipe::AsyncWriteStream`] that implements
/// [`StdoutStream`]. Note that the [`HostOutputStream`] impl for this is not
/// correct when used for interleaved async IO.
pub struct AsyncStdoutStream(Arc<Mutex<crate::pipe::AsyncWriteStream>>);

impl AsyncStdoutStream {
pub fn new(s: crate::pipe::AsyncWriteStream) -> Self {
Self(Arc::new(Mutex::new(s)))
}
}

impl StdoutStream for AsyncStdoutStream {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(Self(self.0.clone()))
}
fn isatty(&self) -> bool {
false
}
}

// This implementation is known to be bogus. All check-writes and writes are
// directed at the same underlying stream. The check-write/write protocol does
// require the size returned by a check-write to be accepted by write, even if
// other side-effects happen between those calls, and this implementation
// permits another view (created by StdoutStream::stream()) of the same
// underlying stream to accept a write which will invalidate a prior
// check-write of another view.
// Ultimately, the Std{in,out}Stream::stream() methods exist because many
// different places in a linked component (which may itself contain many
// modules) may need to access stdio without any coordination to keep those
// accesses all using pointing to the same resource. So, we allow many
// resources to be created. We have the reasonable expectation that programs
// won't attempt to interleave async IO from these disparate uses of stdio.
// If that expectation doesn't turn out to be true, and you find yourself at
// this comment to correct it: sorry about that.
impl HostOutputStream for AsyncStdoutStream {
fn check_write(&mut self) -> Result<usize, StreamError> {
self.0.lock().unwrap().check_write()
}
fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
self.0.lock().unwrap().write(bytes)
}
fn flush(&mut self) -> Result<(), StreamError> {
self.0.lock().unwrap().flush()
}
}

impl Subscribe for AsyncStdoutStream {
fn ready<'a, 'b>(&'a mut self) -> Pin<Box<dyn Future<Output = ()> + Send + 'b>>
where
Self: 'b,
'a: 'b,
{
struct F(AsyncStdoutStream);
impl Future for F {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut inner = self.0 .0.lock().unwrap();
let mut fut = inner.ready();
fut.as_mut().poll(cx)
}
}
Box::pin(F(Self(self.0.clone())))
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsATTY {
Yes,
Expand Down Expand Up @@ -255,3 +371,73 @@ impl<T: WasiView> terminal_stderr::Host for T {
}
}
}

#[cfg(test)]
mod test {
#[test]
fn memory_stdin_stream() {
// A StdinStream has the property that there are multiple
// HostInputStreams created, using the stream() method which are each
// views on the same shared state underneath. Consuming input on one
// stream results in consuming that input on all streams.
//
// The simplest way to measure this is to check if the MemoryInputPipe
// impl of StdinStream follows this property.

let pipe = super::pipe::MemoryInputPipe::new(
"the quick brown fox jumped over the three lazy dogs",
);

use super::StdinStream;

let mut view1 = pipe.stream();
let mut view2 = pipe.stream();

let read1 = view1.read(10).expect("read first 10 bytes");
assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
let read2 = view2.read(10).expect("read second 10 bytes");
assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
let read3 = view1.read(10).expect("read third 10 bytes");
assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
let read4 = view2.read(10).expect("read fourth 10 bytes");
assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
}
#[tokio::test]
async fn async_stdin_stream() {
// A StdinStream has the property that there are multiple
// HostInputStreams created, using the stream() method which are each
// views on the same shared state underneath. Consuming input on one
// stream results in consuming that input on all streams.
//
// AsyncStdinStream is a slightly more complex impl of StdinStream
// than the MemoryInputPipe above. We can create an AsyncReadStream
// from a file on the disk, and an AsyncStdinStream from that common
// stream, then check that the same property holds as above.

let dir = tempfile::tempdir().unwrap();
let mut path = std::path::PathBuf::from(dir.path());
path.push("file");
std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();

let file = tokio::fs::File::open(&path)
.await
.expect("open created file");
let stdin_stream = super::AsyncStdinStream::new(crate::pipe::AsyncReadStream::new(file));

use super::StdinStream;

let mut view1 = stdin_stream.stream();
let mut view2 = stdin_stream.stream();

view1.ready().await;

let read1 = view1.read(10).expect("read first 10 bytes");
assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
let read2 = view2.read(10).expect("read second 10 bytes");
assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
let read3 = view1.read(10).expect("read third 10 bytes");
assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
let read4 = view2.read(10).expect("read fourth 10 bytes");
assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
}
}
Loading