Skip to content

Commit de50d1a

Browse files
committed
[rust] extend kj & worker ffi to support "OK" worker.
1 parent ff32115 commit de50d1a

15 files changed

Lines changed: 516 additions & 167 deletions

File tree

build/deps/gen/deps.MODULE.bazel

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ bazel_dep(name = "tcmalloc", version = "0.0.0-20250927-12f2552")
136136
# workerd-cxx
137137
http.archive(
138138
name = "workerd-cxx",
139-
sha256 = "979f8ffc98f8a8577264167f4c7d07ec08ec790dd085110dadbebc476a1d1d7f",
140-
strip_prefix = "cloudflare-workerd-cxx-5b4e067",
139+
sha256 = "fbba1b102b2c4fe879b2f610d7e94ceda6beceac3d57a27196482ce3e9536b50",
140+
strip_prefix = "cloudflare-workerd-cxx-c677ef5",
141141
type = "tgz",
142-
url = "https://github.com/cloudflare/workerd-cxx/tarball/5b4e067b5180b6a791443ef16b0068b57bbc88bd",
142+
url = "https://github.com/cloudflare/workerd-cxx/tarball/c677ef53092a8425ce9f059074441fdb1b7c1ed3",
143143
)
144144
use_repo(http, "workerd-cxx")
145145

compile_flags.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
-isystembazel-bin/src/rust/worker/_virtual_includes/error.rs@cxx
7777
-isystembazel-bin/src/rust/worker/_virtual_includes/ffi.rs@cxx
7878
-isystembazel-bin/src/rust/worker/_virtual_includes/kill_switch.rs@cxx
79+
-isystembazel-bin/src/rust/worker/_virtual_includes/ok.rs@cxx
7980
-D_FORTIFY_SOURCE=1
8081
-D_LIBCPP_REMOVE_TRANSITIVE_INCLUDES
8182
-D_LIBCPP_NO_ABI_TAG

src/rust/kj/ffi.c++

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ static_assert(alignof(kj::rust::HttpConnectSettings) == alignof(uint64_t),
1313
"HttpConnectSettings alignment mismatch");
1414

1515
namespace kj::rust {
16+
17+
// This stays out-of-line because HttpConnectSettings is defined in the generated cxx bridge
18+
// header, and ffi.h cannot include that header without creating an include cycle.
1619
kj::Promise<void> connect(HttpService& service,
1720
::rust::Slice<const kj::byte> host,
1821
const HttpHeaders& headers,

src/rust/kj/ffi.h

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,27 @@ using AsyncInputStream = kj::AsyncInputStream;
1919
using AsyncOutputStream = kj::AsyncOutputStream;
2020
using AsyncIoStream = kj::AsyncIoStream;
2121

22+
inline kj::Promise<void> async_output_stream_write(
23+
AsyncOutputStream& stream, ::rust::Slice<const kj::byte> buffer) {
24+
return stream.write(kj::from<kj_rs::Rust>(buffer));
25+
}
26+
27+
inline kj::Promise<void> async_output_stream_when_write_disconnected(AsyncOutputStream& stream) {
28+
return stream.whenWriteDisconnected();
29+
}
30+
2231
// --- kj::HttpHeaders ffi
2332

2433
using BuiltinIndicesEnum = kj::HttpHeaders::BuiltinIndicesEnum;
34+
using HttpHeaderTable = kj::HttpHeaderTable;
2535
using HttpHeaders = kj::HttpHeaders;
2636
using HttpHeaderId = kj::HttpHeaderId;
2737

38+
inline kj::Own<kj::HttpHeaders> new_http_headers(const HttpHeaderTable& table) {
39+
// There is no C++ stack frame to hold the new instance, so we heap allocate it for Rust.
40+
return kj::heap<kj::HttpHeaders>(table);
41+
}
42+
2843
inline kj::Own<kj::HttpHeaders> clone_shallow(const HttpHeaders& headers) {
2944
// there is no c++ stack frame to hold the new instance,
3045
// so sadly we have to heap allocate it.
@@ -96,6 +111,29 @@ using HttpService = kj::HttpService;
96111
using HttpServiceResponse = kj::HttpService::Response;
97112
using TlsStarterCallback = kj::TlsStarterCallback;
98113

114+
inline kj::Own<AsyncOutputStream> response_send(HttpServiceResponse& response,
115+
uint32_t statusCode,
116+
::rust::Str statusText,
117+
const HttpHeaders& headers,
118+
kj::Maybe<uint64_t> expectedBodySize) {
119+
return response.send(statusCode, kj::str(statusText), headers, expectedBodySize);
120+
}
121+
122+
inline void connect_response_accept(ConnectResponse& response,
123+
uint32_t statusCode,
124+
::rust::Str statusText,
125+
const HttpHeaders& headers) {
126+
response.accept(statusCode, kj::str(statusText), headers);
127+
}
128+
129+
inline kj::Own<AsyncOutputStream> connect_response_reject(ConnectResponse& response,
130+
uint32_t statusCode,
131+
::rust::Str statusText,
132+
const HttpHeaders& headers,
133+
kj::Maybe<uint64_t> expectedBodySize) {
134+
return response.reject(statusCode, kj::str(statusText), headers, expectedBodySize);
135+
}
136+
99137
inline kj::Promise<void> request(HttpService& service,
100138
HttpMethod method,
101139
::rust::Slice<const kj::byte> url,

src/rust/kj/http.rs

Lines changed: 158 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use kj_rs::KjOwn;
66
use static_assertions::assert_eq_align;
77
use static_assertions::assert_eq_size;
88

9-
use crate::OwnOrRef;
9+
use crate::OwnOrMut;
1010
use crate::Result;
1111
use crate::io::AsyncInputStream;
1212
use crate::io::AsyncIoStream;
@@ -95,7 +95,9 @@ pub mod ffi {
9595

9696
unsafe extern "C++" {
9797
type BuiltinIndicesEnum;
98+
type HttpHeaderTable;
9899
type HttpHeaders;
100+
fn new_http_headers(table: &HttpHeaderTable) -> KjOwn<HttpHeaders>;
99101
fn clone_shallow(this_: &HttpHeaders) -> KjOwn<HttpHeaders>;
100102
fn set_header(this_: Pin<&mut HttpHeaders>, id: BuiltinIndicesEnum, value: &str);
101103
unsafe fn get_header<'a>(
@@ -121,12 +123,36 @@ pub mod ffi {
121123
}
122124

123125
unsafe extern "C++" {
124-
type AsyncInputStream = crate::io::AsyncInputStream;
125-
type AsyncIoStream = crate::io::AsyncIoStream;
126+
type AsyncInputStream = crate::io::ffi::AsyncInputStream;
127+
type AsyncIoStream = crate::io::ffi::AsyncIoStream;
128+
type AsyncOutputStream = crate::io::ffi::AsyncOutputStream;
126129
type ConnectResponse;
127130
type HttpServiceResponse;
128131
type HttpService;
129132

133+
fn response_send(
134+
this_: Pin<&mut HttpServiceResponse>,
135+
status_code: u32,
136+
status_text: &str,
137+
headers: &HttpHeaders,
138+
expected_body_size: KjMaybe<u64>,
139+
) -> Result<KjOwn<AsyncOutputStream>>;
140+
141+
fn connect_response_accept(
142+
this_: Pin<&mut ConnectResponse>,
143+
status_code: u32,
144+
status_text: &str,
145+
headers: &HttpHeaders,
146+
) -> Result<()>;
147+
148+
fn connect_response_reject(
149+
this_: Pin<&mut ConnectResponse>,
150+
status_code: u32,
151+
status_text: &str,
152+
headers: &HttpHeaders,
153+
expected_body_size: KjMaybe<u64>,
154+
) -> Result<KjOwn<AsyncOutputStream>>;
155+
130156
/// Corresponds to `kj::HttpService::request`.
131157
async fn request(
132158
this_: Pin<&mut HttpService>,
@@ -178,6 +204,7 @@ assert_eq_size!(ffi::HttpConnectSettings, [u8; 16]);
178204
assert_eq_align!(ffi::HttpConnectSettings, u64);
179205

180206
pub type HeaderId = ffi::BuiltinIndicesEnum;
207+
pub type HttpHeaderTable = ffi::HttpHeaderTable;
181208
pub type CustomHttpHeader = ffi::HttpHeaderId;
182209
// TODO(tewaro) soon: replace by enum HeaderId
183210

@@ -219,6 +246,7 @@ impl<'a> CustomHttpHeaderId<'a> {
219246
}
220247

221248
/// Non-owning constant reference to `kj::HttpHeaders`
249+
#[derive(Clone, Copy)]
222250
pub struct HttpHeadersRef<'a>(&'a ffi::HttpHeaders);
223251

224252
impl HttpHeadersRef<'_> {
@@ -259,6 +287,14 @@ pub struct HttpHeaders<'a> {
259287
}
260288

261289
impl<'a> HttpHeaders<'a> {
290+
#[must_use]
291+
pub fn new(table: &'a HttpHeaderTable) -> Self {
292+
Self {
293+
own: ffi::new_http_headers(table),
294+
_marker: PhantomData,
295+
}
296+
}
297+
262298
pub fn set(&mut self, id: HeaderId, value: &str) {
263299
ffi::set_header(self.own.as_mut(), id, value);
264300
}
@@ -268,11 +304,96 @@ impl<'a> HttpHeaders<'a> {
268304
}
269305
}
270306

307+
impl<'a, 'b> From<&'b HttpHeaders<'a>> for HttpHeadersRef<'b> {
308+
fn from(value: &'b HttpHeaders<'a>) -> Self {
309+
value.as_ref()
310+
}
311+
}
312+
271313
pub type HttpMethod = ffi::HttpMethod;
272-
pub type HttpServiceResponse = ffi::HttpServiceResponse;
273-
pub type ConnectResponse = ffi::ConnectResponse;
274314
pub type HttpConnectSettings<'a> = ffi::HttpConnectSettings<'a>;
275315

316+
/// Non-owning mutable reference to `kj::HttpService::Response`.
317+
pub struct HttpServiceResponse<'a>(Pin<&'a mut ffi::HttpServiceResponse>);
318+
319+
impl<'a> HttpServiceResponse<'a> {
320+
/// Send response metadata and obtain the writable response body stream.
321+
pub fn send<'h>(
322+
self,
323+
status_code: u32,
324+
status_text: &str,
325+
headers: impl Into<HttpHeadersRef<'h>>,
326+
expected_body_size: Option<u64>,
327+
) -> Result<crate::io::AsyncOutputStream<'a>> {
328+
Ok(ffi::response_send(
329+
self.0,
330+
status_code,
331+
status_text,
332+
headers.into().0,
333+
expected_body_size.into(),
334+
)?
335+
.into())
336+
}
337+
338+
pub(crate) fn into_ffi(self) -> Pin<&'a mut ffi::HttpServiceResponse> {
339+
self.0
340+
}
341+
}
342+
343+
impl<'a> From<Pin<&'a mut ffi::HttpServiceResponse>> for HttpServiceResponse<'a> {
344+
fn from(value: Pin<&'a mut ffi::HttpServiceResponse>) -> Self {
345+
Self(value)
346+
}
347+
}
348+
349+
/// Non-owning mutable reference to `kj::HttpService::ConnectResponse`.
350+
pub struct ConnectResponse<'a>(Pin<&'a mut ffi::ConnectResponse>);
351+
352+
impl<'a> ConnectResponse<'a> {
353+
/// Accept the CONNECT request without a response body.
354+
pub fn accept<'h>(
355+
self,
356+
status_code: u32,
357+
status_text: &str,
358+
headers: impl Into<HttpHeadersRef<'h>>,
359+
) -> Result<()> {
360+
Ok(ffi::connect_response_accept(
361+
self.0,
362+
status_code,
363+
status_text,
364+
headers.into().0,
365+
)?)
366+
}
367+
368+
/// Reject the CONNECT request and obtain the writable rejection body stream.
369+
pub fn reject<'h>(
370+
self,
371+
status_code: u32,
372+
status_text: &str,
373+
headers: impl Into<HttpHeadersRef<'h>>,
374+
expected_body_size: Option<u64>,
375+
) -> Result<crate::io::AsyncOutputStream<'a>> {
376+
Ok(ffi::connect_response_reject(
377+
self.0,
378+
status_code,
379+
status_text,
380+
headers.into().0,
381+
expected_body_size.into(),
382+
)?
383+
.into())
384+
}
385+
386+
pub(crate) fn into_ffi(self) -> Pin<&'a mut ffi::ConnectResponse> {
387+
self.0
388+
}
389+
}
390+
391+
impl<'a> From<Pin<&'a mut ffi::ConnectResponse>> for ConnectResponse<'a> {
392+
fn from(value: Pin<&'a mut ffi::ConnectResponse>) -> Self {
393+
Self(value)
394+
}
395+
}
396+
276397
#[async_trait::async_trait(?Send)]
277398
pub trait HttpService {
278399
/// Make an HTTP request.
@@ -282,7 +403,7 @@ pub trait HttpService {
282403
url: &'a [u8],
283404
headers: HttpHeadersRef<'a>,
284405
request_body: Pin<&'a mut AsyncInputStream>,
285-
response: Pin<&'a mut HttpServiceResponse>,
406+
response: HttpServiceResponse<'a>,
286407
) -> Result<()>;
287408

288409
/// Make a CONNECT request
@@ -297,7 +418,7 @@ pub trait HttpService {
297418
host: &'a [u8],
298419
headers: HttpHeadersRef<'a>,
299420
connection: Pin<&'a mut AsyncIoStream>,
300-
response: Pin<&'a mut ConnectResponse>,
421+
response: ConnectResponse<'a>,
301422
settings: HttpConnectSettings<'a>,
302423
) -> Result<()>;
303424

@@ -311,7 +432,7 @@ pub trait HttpService {
311432
}
312433
}
313434

314-
pub struct CxxHttpService<'a>(OwnOrRef<'a, ffi::HttpService>);
435+
pub struct CxxHttpService<'a>(OwnOrMut<'a, ffi::HttpService>);
315436

316437
#[async_trait::async_trait(?Send)]
317438
impl HttpService for CxxHttpService<'_> {
@@ -321,11 +442,18 @@ impl HttpService for CxxHttpService<'_> {
321442
url: &'a [u8],
322443
headers: HttpHeadersRef<'a>,
323444
request_body: Pin<&'a mut AsyncInputStream>,
324-
response: Pin<&'a mut HttpServiceResponse>,
445+
response: HttpServiceResponse<'a>,
325446
) -> Result<()> {
326-
// SAFETY: self.0 is a valid owned-or-borrowed HttpService.
327-
let service = unsafe { self.0.as_mut() };
328-
ffi::request(service, method, url, headers.0, request_body, response).await?;
447+
let service = self.0.as_mut();
448+
ffi::request(
449+
service,
450+
method,
451+
url,
452+
headers.0,
453+
request_body,
454+
response.into_ffi(),
455+
)
456+
.await?;
329457
Ok(())
330458
}
331459

@@ -334,18 +462,24 @@ impl HttpService for CxxHttpService<'_> {
334462
host: &'a [u8],
335463
headers: HttpHeadersRef<'a>,
336464
connection: Pin<&'a mut AsyncIoStream>,
337-
response: Pin<&'a mut ConnectResponse>,
465+
response: ConnectResponse<'a>,
338466
settings: HttpConnectSettings<'a>,
339467
) -> ::core::pin::Pin<Box<dyn ::core::future::Future<Output = Result<()>> + 'b>>
340468
where
341469
'a: 'b,
342470
Self: 'b,
343471
{
344-
// SAFETY: self.0 is a valid owned-or-borrowed HttpService.
345-
let service = unsafe { self.0.as_mut() };
472+
let service = self.0.as_mut();
346473
Box::pin(
347-
ffi::connect(service, host, headers.0, connection, response, settings)
348-
.map_err(Into::into),
474+
ffi::connect(
475+
service,
476+
host,
477+
headers.0,
478+
connection,
479+
response.into_ffi(),
480+
settings,
481+
)
482+
.map_err(Into::into),
349483
)
350484
}
351485
}
@@ -365,8 +499,9 @@ impl DynHttpService {
365499
url: &'a [u8],
366500
headers: &'a ffi::HttpHeaders,
367501
request_body: Pin<&'a mut AsyncInputStream>,
368-
response: Pin<&'a mut HttpServiceResponse>,
502+
response: Pin<&'a mut ffi::HttpServiceResponse>,
369503
) -> Result<()> {
504+
let response = HttpServiceResponse::from(response);
370505
self.0
371506
.request(method, url, HttpHeadersRef(headers), request_body, response)
372507
.await?;
@@ -378,15 +513,12 @@ impl DynHttpService {
378513
host: &'a [u8],
379514
headers: &'a ffi::HttpHeaders,
380515
connection: Pin<&'a mut AsyncIoStream>,
381-
response: Pin<&'a mut ConnectResponse>,
516+
response: Pin<&'a mut ffi::ConnectResponse>,
382517
settings: HttpConnectSettings<'a>,
383518
) -> impl Future<Output = Result<()>> {
384-
self.0.connect(
385-
host,
386-
HttpHeadersRef(headers),
387-
connection,
388-
response,
389-
settings,
390-
)
519+
let headers = HttpHeadersRef(headers);
520+
let response = ConnectResponse::from(response);
521+
self.0
522+
.connect(host, headers, connection, response, settings)
391523
}
392524
}

0 commit comments

Comments
 (0)