From f50ab049c87d5ee9b3603eec3f9f3e7082b1079f Mon Sep 17 00:00:00 2001 From: Till Schneidereit Date: Thu, 24 Oct 2024 19:29:01 +0200 Subject: [PATCH 1/4] Cleanup: improve some functions dealing with `Request` `Response` object handles It was pretty hard to follow when a caller would expect to have a valid handle, or when a `nullptr` would be acceptable. Signed-off-by: Till Schneidereit --- builtins/web/fetch/fetch_event.cpp | 5 +- builtins/web/fetch/request-response.cpp | 63 ++++++++++++------------- builtins/web/fetch/request-response.h | 8 ++-- 3 files changed, 34 insertions(+), 42 deletions(-) diff --git a/builtins/web/fetch/fetch_event.cpp b/builtins/web/fetch/fetch_event.cpp index 6a4c4b3..a0696bd 100644 --- a/builtins/web/fetch/fetch_event.cpp +++ b/builtins/web/fetch/fetch_event.cpp @@ -85,8 +85,7 @@ bool FetchEvent::init_incoming_request(JSContext *cx, JS::HandleObject self, JS::RootedObject request( cx, &JS::GetReservedSlot(self, static_cast(Slots::Request)).toObject()); - MOZ_ASSERT(!Request::request_handle(request)); - + MOZ_ASSERT(!RequestOrResponse::maybe_handle(request)); JS::SetReservedSlot(request, static_cast(Request::Slots::Request), JS::PrivateValue(req)); @@ -175,7 +174,7 @@ bool start_response(JSContext *cx, JS::HandleObject response_obj) { host_api::HttpOutgoingResponse* response = host_api::HttpOutgoingResponse::make(status, std::move(headers)); - auto existing_handle = Response::response_handle(response_obj); + auto existing_handle = Response::maybe_response_handle(response_obj); if (existing_handle) { MOZ_ASSERT(existing_handle->is_incoming()); } else { diff --git a/builtins/web/fetch/request-response.cpp b/builtins/web/fetch/request-response.cpp index 187e9e3..0739935 100644 --- a/builtins/web/fetch/request-response.cpp +++ b/builtins/web/fetch/request-response.cpp @@ -156,24 +156,33 @@ struct ReadResult { } // namespace -host_api::HttpRequestResponseBase *RequestOrResponse::handle(JSObject *obj) { +host_api::HttpRequestResponseBase *RequestOrResponse::maybe_handle(JSObject *obj) { MOZ_ASSERT(is_instance(obj)); auto slot = JS::GetReservedSlot(obj, static_cast(Slots::RequestOrResponse)); return static_cast(slot.toPrivate()); } +host_api::HttpRequestResponseBase *RequestOrResponse::handle(JSObject *obj) { + auto handle = maybe_handle(obj); + MOZ_ASSERT(handle); + return handle; +} + bool RequestOrResponse::is_instance(JSObject *obj) { return Request::is_instance(obj) || Response::is_instance(obj); } bool RequestOrResponse::is_incoming(JSObject *obj) { - auto handle = RequestOrResponse::handle(obj); + auto handle = RequestOrResponse::maybe_handle(obj); return handle && handle->is_incoming(); } -host_api::HttpHeadersReadOnly *RequestOrResponse::headers_handle(JSObject *obj) { - MOZ_ASSERT(is_instance(obj)); - auto res = handle(obj)->headers(); +host_api::HttpHeadersReadOnly *RequestOrResponse::maybe_headers_handle(JSObject *obj) { + auto handle = maybe_handle(obj); + if (!handle) { + return nullptr; + } + auto res = handle->headers(); MOZ_ASSERT(!res.is_err(), "TODO: proper error handling"); return res.unwrap(); } @@ -185,19 +194,21 @@ bool RequestOrResponse::has_body(JSObject *obj) { host_api::HttpIncomingBody *RequestOrResponse::incoming_body_handle(JSObject *obj) { MOZ_ASSERT(is_incoming(obj)); - if (handle(obj)->is_request()) { - return reinterpret_cast(handle(obj))->body().unwrap(); + auto handle = RequestOrResponse::handle(obj); + if (handle->is_request()) { + return reinterpret_cast(handle)->body().unwrap(); } else { - return reinterpret_cast(handle(obj))->body().unwrap(); + return reinterpret_cast(handle)->body().unwrap(); } } host_api::HttpOutgoingBody *RequestOrResponse::outgoing_body_handle(JSObject *obj) { MOZ_ASSERT(!is_incoming(obj)); - if (handle(obj)->is_request()) { - return reinterpret_cast(handle(obj))->body().unwrap(); + auto handle = RequestOrResponse::handle(obj); + if (handle->is_request()) { + return reinterpret_cast(handle)->body().unwrap(); } else { - return reinterpret_cast(handle(obj))->body().unwrap(); + return reinterpret_cast(handle)->body().unwrap(); } } @@ -424,7 +435,7 @@ unique_ptr RequestOrResponse::headers_handle_clone(JSCont return Headers::handle_clone(cx, headers); } - auto handle = RequestOrResponse::handle(self); + auto handle = RequestOrResponse::maybe_handle(self); if (!handle) { return std::make_unique(); } @@ -501,7 +512,7 @@ JSObject *RequestOrResponse::headers(JSContext *cx, JS::HandleObject obj) { : Request::is_instance(obj) ? Headers::HeadersGuard::Request : Headers::HeadersGuard::Response; host_api::HttpHeadersReadOnly *handle; - if (is_incoming(obj) && (handle = headers_handle(obj))) { + if (is_incoming(obj) && (handle = maybe_headers_handle(obj))) { headers = Headers::create(cx, handle, guard); } else { headers = Headers::create(cx, guard); @@ -1122,23 +1133,6 @@ bool RequestOrResponse::body_get(JSContext *cx, JS::CallArgs args, JS::HandleObj return true; } -host_api::HttpRequest *Request::request_handle(JSObject *obj) { - auto base = RequestOrResponse::handle(obj); - return reinterpret_cast(base); -} - -host_api::HttpOutgoingRequest *Request::outgoing_handle(JSObject *obj) { - auto base = RequestOrResponse::handle(obj); - MOZ_ASSERT(base->is_outgoing()); - return reinterpret_cast(base); -} - -host_api::HttpIncomingRequest *Request::incoming_handle(JSObject *obj) { - auto base = RequestOrResponse::handle(obj); - MOZ_ASSERT(base->is_incoming()); - return reinterpret_cast(base); -} - JSObject *Request::response_promise(JSObject *obj) { MOZ_ASSERT(is_instance(obj)); return &JS::GetReservedSlot(obj, static_cast(Request::Slots::ResponsePromise)) @@ -1218,7 +1212,7 @@ bool Request::clone(JSContext *cx, unsigned argc, JS::Value *vp) { return false; } cloned_headers_val.set(ObjectValue(*cloned_headers)); - } else if (RequestOrResponse::handle(self)) { + } else if (RequestOrResponse::maybe_handle(self)) { auto handle = RequestOrResponse::headers_handle_clone(cx, self); JSObject *cloned_headers = Headers::create(cx, handle.release(), @@ -1754,9 +1748,10 @@ static_assert((int)Response::Slots::BodyUsed == (int)Request::Slots::BodyUsed); static_assert((int)Response::Slots::Headers == (int)Request::Slots::Headers); static_assert((int)Response::Slots::Response == (int)Request::Slots::Request); -host_api::HttpResponse *Response::response_handle(JSObject *obj) { - MOZ_ASSERT(is_instance(obj)); - return static_cast(RequestOrResponse::handle(obj)); +host_api::HttpResponse *Response::maybe_response_handle(JSObject *obj) { + auto base = RequestOrResponse::maybe_handle(obj); + MOZ_ASSERT_IF(base, base->is_response()); + return static_cast(base); } uint16_t Response::status(JSObject *obj) { diff --git a/builtins/web/fetch/request-response.h b/builtins/web/fetch/request-response.h index 15a5a52..17dc31e 100644 --- a/builtins/web/fetch/request-response.h +++ b/builtins/web/fetch/request-response.h @@ -32,7 +32,8 @@ class RequestOrResponse final { static bool is_instance(JSObject *obj); static bool is_incoming(JSObject *obj); static host_api::HttpRequestResponseBase *handle(JSObject *obj); - static host_api::HttpHeadersReadOnly *headers_handle(JSObject *obj); + static host_api::HttpRequestResponseBase *maybe_handle(JSObject *obj); + static host_api::HttpHeadersReadOnly *maybe_headers_handle(JSObject *obj); static bool has_body(JSObject *obj); static host_api::HttpIncomingBody *incoming_body_handle(JSObject *obj); static host_api::HttpOutgoingBody *outgoing_body_handle(JSObject *obj); @@ -142,9 +143,6 @@ class Request final : public BuiltinImpl { static JSObject *response_promise(JSObject *obj); static JSString *method(JS::HandleObject obj); - static host_api::HttpRequest *request_handle(JSObject *obj); - static host_api::HttpOutgoingRequest *outgoing_handle(JSObject *obj); - static host_api::HttpIncomingRequest *incoming_handle(JSObject *obj); static const JSFunctionSpec static_methods[]; static const JSPropertySpec static_properties[]; @@ -209,7 +207,7 @@ class Response final : public BuiltinImpl { static JSObject *init_slots(HandleObject response); static JSObject *create_incoming(JSContext *cx, host_api::HttpIncomingResponse *response); - static host_api::HttpResponse *response_handle(JSObject *obj); + static host_api::HttpResponse *maybe_response_handle(JSObject *obj); static uint16_t status(JSObject *obj); static JSString *status_message(JSObject *obj); static void set_status_message_from_code(JSContext *cx, JSObject *obj, uint16_t code); From a372708a29750079f7020dad772a0826659fcdb1 Mon Sep 17 00:00:00 2001 From: Till Schneidereit Date: Tue, 29 Oct 2024 12:52:48 +0100 Subject: [PATCH 2/4] Fix bugs in handling multiple incoming bodies piped through a `TransFormStream` to an outgoing body This was pretty gnarly to work through, but I think came out reasonably clean. The two key pieces are: - delaying the first read from an incoming stream that's been piped to a `TransformStream` until the latter is actually read from, and hence doesn't have backpressure applied anymore. This part guarantees that when the read would happen, we have the full pipeline in place and can hand things off to the host API instead of handling anything in the JS `fetch` API implementation. - properly integrating these two pieces, and only closing the incoming body's `ReadableStream` once the host API reports that it's been dealt with to completion. Signed-off-by: Till Schneidereit --- builtins/web/fetch/request-response.cpp | 115 +++++++++++++++--------- builtins/web/fetch/request-response.h | 3 +- host-apis/wasi-0.2.0/host_api.cpp | 9 +- tests/tests.cmake | 1 + 4 files changed, 79 insertions(+), 49 deletions(-) diff --git a/builtins/web/fetch/request-response.cpp b/builtins/web/fetch/request-response.cpp index 0739935..b697cae 100644 --- a/builtins/web/fetch/request-response.cpp +++ b/builtins/web/fetch/request-response.cpp @@ -481,13 +481,13 @@ bool finish_outgoing_body_streaming(JSContext *cx, HandleObject body_owner) { return true; } -bool RequestOrResponse::append_body(JSContext *cx, JS::HandleObject self, JS::HandleObject source) { +bool RequestOrResponse::append_body(JSContext *cx, JS::HandleObject self, JS::HandleObject source, + api::TaskCompletionCallback callback, HandleObject callback_receiver) { MOZ_ASSERT(!body_used(source)); - MOZ_ASSERT(!body_used(self)); MOZ_ASSERT(self != source); host_api::HttpIncomingBody *source_body = incoming_body_handle(source); host_api::HttpOutgoingBody *dest_body = outgoing_body_handle(self); - auto res = dest_body->append(ENGINE, source_body, finish_outgoing_body_streaming, self); + auto res = dest_body->append(ENGINE, source_body, callback, callback_receiver); if (auto *err = res.to_err()) { HANDLE_ERROR(cx, *err); return false; @@ -495,7 +495,9 @@ bool RequestOrResponse::append_body(JSContext *cx, JS::HandleObject self, JS::Ha mozilla::DebugOnly success = mark_body_used(cx, source); MOZ_ASSERT(success); - if (body_stream(source) != body_stream(self)) { + // append_body can be called multiple times, but we only want to mark the body as used the first + // time it happens. + if (body_stream(source) != body_stream(self) && !body_used(self)) { success = mark_body_used(cx, self); MOZ_ASSERT(success); } @@ -871,39 +873,82 @@ bool RequestOrResponse::bodyAll(JSContext *cx, JS::CallArgs args, JS::HandleObje return true; } -bool RequestOrResponse::body_source_pull_algorithm(JSContext *cx, CallArgs args, - HandleObject source, HandleObject body_owner, - HandleObject controller) { +/** + * Closes the ReadableStream representing a body after it's been appended to an outgoing body. + * + * The append operation is performed using the host API, not via the JS Streams API, so this + * explicit closing operation is needed to close the loop on the latter. + */ +bool close_appended_body(JSContext *cx, HandleObject body_owner) { + RootedObject body(cx, RequestOrResponse::body_stream(body_owner)); + return ReadableStreamClose(cx, body); +} + +bool do_body_source_pull(JSContext *cx, HandleObject source, HandleObject body_owner) { // If the stream has been piped to a TransformStream whose readable end was // then passed to a Request or Response as the body, we can just append the // entire source body to the destination using a single native hostcall, and // then close the source stream, instead of reading and writing it in - // individual chunks. Note that even in situations where multiple streams are - // piped to the same destination this is guaranteed to happen in the right - // order: ReadableStream#pipeTo locks the destination WritableStream until the + // individual chunks. + // Note that even in situations where multiple streams are piped to the same + // destination this is guaranteed to happen in the right order: + // ReadableStream#pipeTo locks the destination WritableStream until the // source ReadableStream is closed/canceled, so only one stream can ever be // piped in at the same time. RootedObject pipe_dest(cx, streams::NativeStreamSource::piped_to_transform_stream(source)); - if (pipe_dest) { - if (streams::TransformStream::readable_used_as_body(pipe_dest)) { - RootedObject dest_owner(cx, streams::TransformStream::owner(pipe_dest)); - MOZ_ASSERT(!JS_IsExceptionPending(cx)); - if (!append_body(cx, dest_owner, body_owner)) { - return false; - } + if (pipe_dest && streams::TransformStream::readable_used_as_body(pipe_dest)) { + MOZ_ASSERT(!streams::TransformStream::backpressure(pipe_dest)); + RootedObject dest_owner(cx, streams::TransformStream::owner(pipe_dest)); + MOZ_ASSERT(!JS_IsExceptionPending(cx)); + if (!RequestOrResponse::append_body(cx, dest_owner, body_owner, close_appended_body, body_owner)) { + return false; + } - MOZ_ASSERT(!JS_IsExceptionPending(cx)); - RootedObject stream(cx, streams::NativeStreamSource::stream(source)); - bool success = ReadableStreamClose(cx, stream); - MOZ_RELEASE_ASSERT(success); + MOZ_ASSERT(!JS_IsExceptionPending(cx)); + return true; + } - args.rval().setUndefined(); - MOZ_ASSERT(!JS_IsExceptionPending(cx)); - return true; + ENGINE->queue_async_task(new BodyFutureTask(source)); + return true; +} + +bool bp_change_then_handler(JSContext *cx, HandleObject source, HandleValue extra, CallArgs args) { + MOZ_ASSERT(extra.isObject()); + RootedObject body_owner(cx, &extra.toObject()); + MOZ_ASSERT(RequestOrResponse::is_instance(body_owner)); + + if (!do_body_source_pull(cx, source, body_owner)) { + return false; + } + + args.rval().setUndefined(); + return true; +} + +bool RequestOrResponse::body_source_pull_algorithm(JSContext *cx, CallArgs args, + HandleObject source, HandleObject body_owner, + HandleObject _controller) { + // If the stream has been piped to a TransformStream whose readable end has backpressure applied, + // we wait for the backpressure to be removed before actually reading from the body. + // That way, we can avoid reading any chunks from the body for now, and might be able to append + // it in full to an outgoing stream later. + RootedObject pipe_dest(cx, streams::NativeStreamSource::piped_to_transform_stream(source)); + if (pipe_dest && streams::TransformStream::backpressure(pipe_dest)) { + RootedObject bp_change_promise(cx, + streams::TransformStream::backpressureChangePromise(pipe_dest)); + JS::RootedObject then_handler(cx); + RootedValue extra(cx, ObjectValue(*body_owner)); + then_handler = create_internal_method(cx, source, extra); + if (!then_handler) { + return false; } + return AddPromiseReactionsIgnoringUnhandledRejection(cx, bp_change_promise, then_handler, + nullptr); } - ENGINE->queue_async_task(new BodyFutureTask(source)); + if (!do_body_source_pull(cx, source, body_owner)) { + return false; + } args.rval().setUndefined(); return true; @@ -1034,21 +1079,6 @@ bool RequestOrResponse::maybe_stream_body(JSContext *cx, JS::HandleObject body_o return api::throw_error(cx, FetchErrors::BodyStreamUnusable); } - // If the body stream is backed by an HTTP body handle, we can directly pipe - // that handle into the body we're about to send. - if (streams::NativeStreamSource::stream_is_body(cx, stream)) { - MOZ_ASSERT(!is_incoming(body_owner)); - // First, directly append the source's body to the target's and lock the stream. - JS::RootedObject stream_source(cx, streams::NativeStreamSource::get_stream_source(cx, stream)); - JS::RootedObject source_owner(cx, streams::NativeStreamSource::owner(stream_source)); - if (!append_body(cx, body_owner, source_owner)) { - return false; - } - - *requires_streaming = true; - return true; - } - JS::RootedObject reader( cx, JS::ReadableStreamGetReader(cx, stream, JS::ReadableStreamReaderMode::Default)); if (!reader) @@ -1703,7 +1733,10 @@ bool Request::initialize(JSContext *cx, JS::HandleObject request, JS::HandleValu // content can't have access to it. Instead of reifying it here to pass it // into a TransformStream, we just append the body on the host side and // mark it as used on the input Request. - RequestOrResponse::append_body(cx, request, input_request); + if (!RequestOrResponse::append_body(cx, request, input_request, + finish_outgoing_body_streaming,request)) { + return false; + } } else { inputBody = streams::TransformStream::create_rs_proxy(cx, inputBody); if (!inputBody) { diff --git a/builtins/web/fetch/request-response.h b/builtins/web/fetch/request-response.h index 17dc31e..b963ca2 100644 --- a/builtins/web/fetch/request-response.h +++ b/builtins/web/fetch/request-response.h @@ -67,7 +67,8 @@ class RequestOrResponse final { */ static JSObject *headers(JSContext *cx, JS::HandleObject obj); - static bool append_body(JSContext *cx, JS::HandleObject self, JS::HandleObject source); + static bool append_body(JSContext *cx, JS::HandleObject self, JS::HandleObject source, + api::TaskCompletionCallback callback, HandleObject callback_receiver); using ParseBodyCB = bool(JSContext *cx, JS::HandleObject self, JS::UniqueChars buf, size_t len); diff --git a/host-apis/wasi-0.2.0/host_api.cpp b/host-apis/wasi-0.2.0/host_api.cpp index a345887..9ef063d 100644 --- a/host-apis/wasi-0.2.0/host_api.cpp +++ b/host-apis/wasi-0.2.0/host_api.cpp @@ -638,13 +638,8 @@ class BodyAppendTask final : public api::AsyncTask { HandleObject callback_receiver) : incoming_body_(incoming_body), outgoing_body_(outgoing_body), cb_(completion_callback), cb_receiver_(callback_receiver), state_(State::BlockedOnBoth) { - auto res = incoming_body_->subscribe(); - MOZ_ASSERT(!res.is_err()); - incoming_pollable_ = res.unwrap(); - - res = outgoing_body_->subscribe(); - MOZ_ASSERT(!res.is_err()); - outgoing_pollable_ = res.unwrap(); + incoming_pollable_ = incoming_body_->subscribe().unwrap(); + outgoing_pollable_ = outgoing_body_->subscribe().unwrap(); } [[nodiscard]] bool run(api::Engine *engine) override { diff --git a/tests/tests.cmake b/tests/tests.cmake index 07f4648..025231b 100644 --- a/tests/tests.cmake +++ b/tests/tests.cmake @@ -38,6 +38,7 @@ test_e2e(syntax-err) test_e2e(tla-err) test_e2e(tla-runtime-resolve) test_e2e(tla) +test_e2e(stream-forwarding) test_integration(btoa) test_integration(crypto) From 9a4705da72896002b0966e89d3bfb7e96d41e8da Mon Sep 17 00:00:00 2001 From: Till Schneidereit Date: Tue, 29 Oct 2024 12:59:26 +0100 Subject: [PATCH 3/4] Handle writing chunks to outgoing bodies fully asynchronously Before, any chunk would be written synchronously, with write operations to the outgoing body in a loop. That both meant that large chunks would block all script execution until they're done, and that things wouldn't necessarily always work. To wit, the test added here doesn't pass without this change. It streams a sentence in a very convoluted way, sending nested requests for each word, which then respond by sending the word back, letter for letter. Without this patch, that part breaks after the second letter in a word has been sent back. Signed-off-by: Till Schneidereit --- builtins/web/fetch/request-response.cpp | 37 +++++----- host-apis/wasi-0.2.0/host_api.cpp | 74 ++++++++++++++----- include/host_api.h | 3 +- .../expect_serve_body.txt | 1 + .../multi-stream-forwarding.js | 58 +++++++++++++++ tests/tests.cmake | 1 + 6 files changed, 138 insertions(+), 36 deletions(-) create mode 100644 tests/e2e/multi-stream-forwarding/expect_serve_body.txt create mode 100644 tests/e2e/multi-stream-forwarding/multi-stream-forwarding.js diff --git a/builtins/web/fetch/request-response.cpp b/builtins/web/fetch/request-response.cpp index b697cae..ee30bdf 100644 --- a/builtins/web/fetch/request-response.cpp +++ b/builtins/web/fetch/request-response.cpp @@ -963,13 +963,24 @@ bool RequestOrResponse::body_source_cancel_algorithm(JSContext *cx, JS::CallArgs return true; } +bool write_all_finish_callback(JSContext *cx, HandleObject then_handler) { + // The reader is stored in the catch handler, which we need here as well. + // So we get that first, then the reader. + JS::RootedObject catch_handler(cx, &GetFunctionNativeReserved(then_handler, 1).toObject()); + JS::RootedObject reader(cx, &GetFunctionNativeReserved(catch_handler, 1).toObject()); + + // Read the next chunk. + JS::RootedObject promise(cx, ReadableStreamDefaultReaderRead(cx, reader)); + if (!promise) { + return false; + } + + return AddPromiseReactions(cx, promise, then_handler, catch_handler); +} + bool reader_for_outgoing_body_then_handler(JSContext *cx, JS::HandleObject body_owner, JS::HandleValue extra, JS::CallArgs args) { JS::RootedObject then_handler(cx, &args.callee()); - // The reader is stored in the catch handler, which we need here as well. - // So we get that first, then the reader. - JS::RootedObject catch_handler(cx, &extra.toObject()); - JS::RootedObject reader(cx, &js::GetFunctionNativeReserved(catch_handler, 1).toObject()); // We're guaranteed to work with a native ReadableStreamDefaultReader here, // which in turn is guaranteed to vend {done: bool, value: any} objects to @@ -1010,25 +1021,17 @@ bool reader_for_outgoing_body_then_handler(JSContext *cx, JS::HandleObject body_ bool is_shared; RootedObject buffer(cx, JS_GetArrayBufferViewBuffer(cx, array, &is_shared)); MOZ_ASSERT(!is_shared); - auto bytes = static_cast(StealArrayBufferContents(cx, buffer)); - // TODO: change this to write in chunks, respecting backpressure. + auto ptr = static_cast(StealArrayBufferContents(cx, buffer)); + host_api::HostBytes bytes(unique_ptr(ptr), length); auto body = RequestOrResponse::outgoing_body_handle(body_owner); - auto res = body->write_all(bytes, length); - js_free(bytes); - - // Needs to be outside the nogc block in case we need to create an exception. + auto res = body->write_all(ENGINE, std::move(bytes), + write_all_finish_callback, then_handler); if (auto *err = res.to_err()) { HANDLE_ERROR(cx, *err); return false; } - // Read the next chunk. - JS::RootedObject promise(cx, JS::ReadableStreamDefaultReaderRead(cx, reader)); - if (!promise) { - return false; - } - - return JS::AddPromiseReactions(cx, promise, then_handler, catch_handler); + return true; } bool reader_for_outgoing_body_catch_handler(JSContext *cx, JS::HandleObject body_owner, diff --git a/host-apis/wasi-0.2.0/host_api.cpp b/host-apis/wasi-0.2.0/host_api.cpp index 9ef063d..70e89b2 100644 --- a/host-apis/wasi-0.2.0/host_api.cpp +++ b/host-apis/wasi-0.2.0/host_api.cpp @@ -574,31 +574,69 @@ void HttpOutgoingBody::write(const uint8_t *bytes, size_t len) { MOZ_RELEASE_ASSERT(write_to_outgoing_body(borrow, bytes, len)); } -Result HttpOutgoingBody::write_all(const uint8_t *bytes, size_t len) { - if (!valid()) { - // TODO: proper error handling for all 154 error codes. - return Result::err(154); - } +class BodyWriteAllTask final : public api::AsyncTask { + HttpOutgoingBody *outgoing_body_; + PollableHandle outgoing_pollable_; - auto *state = static_cast(handle_state_.get()); - Borrow borrow(state->stream_handle_); + api::TaskCompletionCallback cb_; + Heap cb_receiver_; + HostBytes bytes_; + size_t offset_ = 0; - while (len > 0) { - auto capacity_res = capacity(); - if (capacity_res.is_err()) { - // TODO: proper error handling for all 154 error codes. - return Result::err(154); +public: + explicit BodyWriteAllTask(HttpOutgoingBody *outgoing_body, HostBytes bytes, + api::TaskCompletionCallback completion_callback, + HandleObject callback_receiver) + : outgoing_body_(outgoing_body), cb_(completion_callback), + cb_receiver_(callback_receiver), bytes_(std::move(bytes)) { + outgoing_pollable_ = outgoing_body_->subscribe().unwrap(); + } + + [[nodiscard]] bool run(api::Engine *engine) override { + auto res = outgoing_body_->capacity(); + if (res.is_err()) { + return false; } - auto capacity = capacity_res.unwrap(); - auto bytes_to_write = std::min(len, static_cast(capacity)); - if (!write_to_outgoing_body(borrow, bytes, len)) { - return Result::err(154); + uint64_t capacity = res.unwrap(); + MOZ_ASSERT(capacity >= 0); + auto bytes_to_write = std::min(bytes_.len - offset_, static_cast(capacity)); + outgoing_body_->write(bytes_.ptr.get() + offset_, bytes_to_write); + offset_ += bytes_to_write; + if (offset_ < bytes_.len) { + engine->queue_async_task(this); + } else { + bytes_.ptr.reset(); + RootedObject receiver(engine->cx(), cb_receiver_); + bool result = cb_(engine->cx(), receiver); + cb_ = nullptr; + cb_receiver_ = nullptr; + return result; } - bytes += bytes_to_write; - len -= bytes_to_write; + return true; } + [[nodiscard]] bool cancel(api::Engine *engine) override { + MOZ_ASSERT_UNREACHABLE("BodyWriteAllTask's semantics don't allow for cancellation"); + return true; + } + + [[nodiscard]] int32_t id() override { + return outgoing_pollable_; + } + + void trace(JSTracer *trc) override { + JS::TraceEdge(trc, &cb_receiver_, "BodyWriteAllTask completion callback receiver"); + } +}; + +Result HttpOutgoingBody::write_all(api::Engine *engine, HostBytes bytes, + api::TaskCompletionCallback callback, HandleObject cb_receiver) { + if (!valid()) { + // TODO: proper error handling for all 154 error codes. + return Result::err(154); + } + engine->queue_async_task(new BodyWriteAllTask(this, std::move(bytes), callback, cb_receiver)); return {}; } diff --git a/include/host_api.h b/include/host_api.h index e7ab8df..5b9c315 100644 --- a/include/host_api.h +++ b/include/host_api.h @@ -294,7 +294,8 @@ class HttpOutgoingBody final : public Pollable { /// The host doesn't necessarily write all bytes in any particular call to /// `write`, so to ensure all bytes are written, we call it in a loop. /// TODO: turn into an async task that writes chunks of the passed buffer until done. - Result write_all(const uint8_t *bytes, size_t len); + Result write_all(api::Engine *engine, HostBytes bytes, api::TaskCompletionCallback callback, + HandleObject cb_receiver); /// Append an HttpIncomingBody to this one. Result append(api::Engine *engine, HttpIncomingBody *other, diff --git a/tests/e2e/multi-stream-forwarding/expect_serve_body.txt b/tests/e2e/multi-stream-forwarding/expect_serve_body.txt new file mode 100644 index 0000000..80b8059 --- /dev/null +++ b/tests/e2e/multi-stream-forwarding/expect_serve_body.txt @@ -0,0 +1 @@ +This sentence will be streamed in chunks. diff --git a/tests/e2e/multi-stream-forwarding/multi-stream-forwarding.js b/tests/e2e/multi-stream-forwarding/multi-stream-forwarding.js new file mode 100644 index 0000000..67693bd --- /dev/null +++ b/tests/e2e/multi-stream-forwarding/multi-stream-forwarding.js @@ -0,0 +1,58 @@ +addEventListener('fetch', async (event) => { + try { + if (!event.request.url.includes('/nested')) { + event.respondWith(main(event)); + return; + } + + let encoder = new TextEncoder(); + let body = new TransformStream({ + start(controller) { + }, + transform(chunk, controller) { + controller.enqueue(encoder.encode(chunk)); + }, + flush(controller) { + } + }); + let writer = body.writable.getWriter(); + event.respondWith(new Response(body.readable)); + let word = new URL(event.request.url).searchParams.get('word'); + console.log(`streaming word: ${word}`); + for (let letter of word) { + console.log(`Writing letter ${letter}`); + await writer.write(letter); + } + if (word.endsWith(".")) { + await writer.write("\n"); + } + await writer.close(); + } catch (e) { + console.error(e); + } +}); +async function main(event) { + let fullBody = "This sentence will be streamed in chunks."; + let responses = []; + for (let word of fullBody.split(" ").join("+ ").split(" ")) { + responses.push((await fetch(`${event.request.url}/nested?word=${word}`)).body); + } + return new Response(concatStreams(responses)); +} + +function concatStreams(streams) { + let { readable, writable } = new TransformStream(); + async function iter() { + for (let stream of streams) { + try { + await stream.pipeTo(writable, {preventClose: true}); + } catch (e) { + console.error(`error during pipeline execution: ${e}`); + } + } + console.log("closing writable"); + await writable.close(); + } + iter(); + return readable; +} diff --git a/tests/tests.cmake b/tests/tests.cmake index 025231b..8649a91 100644 --- a/tests/tests.cmake +++ b/tests/tests.cmake @@ -39,6 +39,7 @@ test_e2e(tla-err) test_e2e(tla-runtime-resolve) test_e2e(tla) test_e2e(stream-forwarding) +test_e2e(multi-stream-forwarding) test_integration(btoa) test_integration(crypto) From 57957d2594875be8af387a8055a6f86ca837e3c9 Mon Sep 17 00:00:00 2001 From: Till Schneidereit Date: Sun, 3 Nov 2024 15:51:26 +0100 Subject: [PATCH 4/4] Write as many bytes as possible at a time in the `BodyWriteAllTask` Signed-off-by: Till Schneidereit --- host-apis/wasi-0.2.0/host_api.cpp | 44 +++++++++++++++++-------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/host-apis/wasi-0.2.0/host_api.cpp b/host-apis/wasi-0.2.0/host_api.cpp index 70e89b2..6a7d2c5 100644 --- a/host-apis/wasi-0.2.0/host_api.cpp +++ b/host-apis/wasi-0.2.0/host_api.cpp @@ -593,27 +593,31 @@ class BodyWriteAllTask final : public api::AsyncTask { } [[nodiscard]] bool run(api::Engine *engine) override { - auto res = outgoing_body_->capacity(); - if (res.is_err()) { - return false; - } - uint64_t capacity = res.unwrap(); - MOZ_ASSERT(capacity >= 0); - auto bytes_to_write = std::min(bytes_.len - offset_, static_cast(capacity)); - outgoing_body_->write(bytes_.ptr.get() + offset_, bytes_to_write); - offset_ += bytes_to_write; - if (offset_ < bytes_.len) { - engine->queue_async_task(this); - } else { - bytes_.ptr.reset(); - RootedObject receiver(engine->cx(), cb_receiver_); - bool result = cb_(engine->cx(), receiver); - cb_ = nullptr; - cb_receiver_ = nullptr; - return result; - } + MOZ_ASSERT(offset_ < bytes_.len); + while (true) { + auto res = outgoing_body_->capacity(); + if (res.is_err()) { + return false; + } + uint64_t capacity = res.unwrap(); + if (capacity == 0) { + engine->queue_async_task(this); + return true; + } - return true; + auto bytes_to_write = std::min(bytes_.len - offset_, static_cast(capacity)); + outgoing_body_->write(bytes_.ptr.get() + offset_, bytes_to_write); + offset_ += bytes_to_write; + MOZ_ASSERT(offset_ <= bytes_.len); + if (offset_ == bytes_.len) { + bytes_.ptr.reset(); + RootedObject receiver(engine->cx(), cb_receiver_); + bool result = cb_(engine->cx(), receiver); + cb_ = nullptr; + cb_receiver_ = nullptr; + return result; + } + } } [[nodiscard]] bool cancel(api::Engine *engine) override {