From b31302fd64342f6d5b74f12e307820e1523bd0ab Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Thu, 25 Apr 2024 14:18:33 -0700 Subject: [PATCH] new event loop architecture --- README.md | 2 +- builtins/web/fetch/fetch-api.cpp | 5 -- builtins/web/fetch/fetch_event.cpp | 19 ++----- builtins/web/fetch/request-response.cpp | 10 ---- builtins/web/timers.cpp | 3 - .../wasi-0.2.0-rc-2023-10-18/host_api.cpp | 7 +-- .../wasi-0.2.0-rc-2023-12-05/host_api.cpp | 7 +-- host-apis/wasi-0.2.0/host_api.cpp | 12 +--- include/extension-api.h | 42 ++++++++++---- runtime/engine.cpp | 25 +++----- runtime/event_loop.cpp | 57 +++++++++++++------ runtime/event_loop.h | 7 ++- 12 files changed, 100 insertions(+), 96 deletions(-) diff --git a/README.md b/README.md index 6efce0d..b67ce2c 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ Then tests can be run with `ctest` directly via: CTEST_OUTPUT_ON_FAILURE=1 ctest --test-dir cmake-build-debug -j8 ``` -Alternative, the integration test server can be directly run with `wasmtime serve` via: +Alternatively, the integration test server can be directly run with `wasmtime serve` via: ```bash wasmtime serve -S common cmake-build-debug/test-server.wasm diff --git a/builtins/web/fetch/fetch-api.cpp b/builtins/web/fetch/fetch-api.cpp index e5eb067..9f45119 100644 --- a/builtins/web/fetch/fetch-api.cpp +++ b/builtins/web/fetch/fetch-api.cpp @@ -62,11 +62,6 @@ class ResponseFutureTask final : public api::AsyncTask { return true; } - bool ready() override { - // TODO(TS): implement - return true; - } - void trace(JSTracer *trc) override { TraceEdge(trc, &request_, "Request for response future"); } }; diff --git a/builtins/web/fetch/fetch_event.cpp b/builtins/web/fetch/fetch_event.cpp index 8ba4b53..3bc0cc0 100644 --- a/builtins/web/fetch/fetch_event.cpp +++ b/builtins/web/fetch/fetch_event.cpp @@ -41,6 +41,8 @@ void dec_pending_promise_count(JSObject *self) { .toInt32(); MOZ_ASSERT(count > 0); count--; + if (count == 0) + ENGINE->decr_event_loop_lifetime(); JS::SetReservedSlot(self, static_cast(FetchEvent::Slots::PendingPromiseCount), JS::Int32Value(count)); } @@ -589,19 +591,10 @@ void exports_wasi_http_incoming_handler(exports_wasi_http_incoming_request reque dispatch_fetch_event(fetch_event, &total_compute); - bool success = ENGINE->process_jobs(); - if (success) { - while (FetchEvent::is_active(fetch_event) && ENGINE->has_pending_async_tasks()) { - if (!ENGINE->process_async_tasks()) { - success = false; - break; - } - if (!ENGINE->process_jobs()) { - success = false; - break; - } - } - } + // track the fetch event lifetime, which when decremented ends the event loop + ENGINE->incr_event_loop_lifetime(); + + bool success = ENGINE->run_event_loop(); if (JS_IsExceptionPending(ENGINE->cx())) { ENGINE->dump_pending_exception("evaluating incoming request"); diff --git a/builtins/web/fetch/request-response.cpp b/builtins/web/fetch/request-response.cpp index 765e2b9..258a61c 100644 --- a/builtins/web/fetch/request-response.cpp +++ b/builtins/web/fetch/request-response.cpp @@ -122,11 +122,6 @@ class BodyFutureTask final : public api::AsyncTask { return true; } - bool ready() override { - // TODO(TS): implement - return true; - } - void trace(JSTracer *trc) override { TraceEdge(trc, &body_source_, "body source for future"); } }; @@ -187,11 +182,6 @@ class ResponseFutureTask final : public api::AsyncTask { return true; } - bool ready() override { - // TODO(TS): implement - return true; - } - void trace(JSTracer *trc) override { TraceEdge(trc, &request_, "Request for response future"); } }; diff --git a/builtins/web/timers.cpp b/builtins/web/timers.cpp index 7646d1c..340c852 100644 --- a/builtins/web/timers.cpp +++ b/builtins/web/timers.cpp @@ -37,7 +37,6 @@ class TimerTask final : public api::AsyncTask { } [[nodiscard]] bool run(api::Engine *engine) override { - MOZ_ASSERT(ready()); JSContext *cx = engine->cx(); const RootedObject callback(cx, callback_); @@ -69,8 +68,6 @@ class TimerTask final : public api::AsyncTask { return true; } - bool ready() override { return host_api::MonotonicClock::now() >= this->deadline_; } - void trace(JSTracer *trc) override { TraceEdge(trc, &callback_, "Timer callback"); for (auto &arg : arguments_) { diff --git a/host-apis/wasi-0.2.0-rc-2023-10-18/host_api.cpp b/host-apis/wasi-0.2.0-rc-2023-10-18/host_api.cpp index bccc090..a22896b 100644 --- a/host-apis/wasi-0.2.0-rc-2023-10-18/host_api.cpp +++ b/host-apis/wasi-0.2.0-rc-2023-10-18/host_api.cpp @@ -95,7 +95,7 @@ template <> struct HandleOps { } // namespace -size_t api::AsyncTask::select(std::vector *tasks) { +std::vector api::AsyncTask::poll(std::vector *tasks) { auto count = tasks->size(); vector> handles; for (const auto task : *tasks) { @@ -106,10 +106,7 @@ size_t api::AsyncTask::select(std::vector *tasks) { bindings_list_u32_t result{nullptr, 0}; wasi_io_0_2_0_rc_2023_10_18_poll_poll_list(&list, &result); MOZ_ASSERT(result.len > 0); - const auto ready_index = result.ptr[0]; - free(result.ptr); - - return ready_index; + return std::vector(result.ptr, result.ptr + result.len); } namespace host_api { diff --git a/host-apis/wasi-0.2.0-rc-2023-12-05/host_api.cpp b/host-apis/wasi-0.2.0-rc-2023-12-05/host_api.cpp index f074864..c05ae93 100644 --- a/host-apis/wasi-0.2.0-rc-2023-12-05/host_api.cpp +++ b/host-apis/wasi-0.2.0-rc-2023-12-05/host_api.cpp @@ -99,7 +99,7 @@ template <> struct HandleOps { } // namespace -size_t api::AsyncTask::select(std::vector *tasks) { +std::vector api::AsyncTask::poll(std::vector *tasks) { auto count = tasks->size(); vector> handles; for (const auto task : *tasks) { @@ -110,10 +110,7 @@ size_t api::AsyncTask::select(std::vector *tasks) { bindings_list_u32_t result{nullptr, 0}; wasi_io_0_2_0_rc_2023_11_10_poll_poll(&list, &result); MOZ_ASSERT(result.len > 0); - const auto ready_index = result.ptr[0]; - free(result.ptr); - - return ready_index; + return std::vector(result.ptr, result.ptr + result.len); } namespace host_api { diff --git a/host-apis/wasi-0.2.0/host_api.cpp b/host-apis/wasi-0.2.0/host_api.cpp index d1a990c..c2ce916 100644 --- a/host-apis/wasi-0.2.0/host_api.cpp +++ b/host-apis/wasi-0.2.0/host_api.cpp @@ -96,7 +96,7 @@ template <> struct HandleOps { } // namespace -size_t api::AsyncTask::select(std::vector *tasks) { +std::vector api::AsyncTask::poll(std::vector *tasks) { auto count = tasks->size(); vector> handles; for (const auto task : *tasks) { @@ -107,10 +107,7 @@ size_t api::AsyncTask::select(std::vector *tasks) { wasi_io_0_2_0_poll_list_u32_t result{nullptr, 0}; wasi_io_0_2_0_poll_poll(&list, &result); MOZ_ASSERT(result.len > 0); - const auto ready_index = result.ptr[0]; - free(result.ptr); - - return ready_index; + return std::vector(result.ptr, result.ptr + result.len); } namespace host_api { @@ -558,11 +555,6 @@ class BodyAppendTask final : public api::AsyncTask { return true; } - bool ready() override { - // TODO(TS): properly implement. This won't ever return `true` right now - return state_ == State::Ready; - } - [[nodiscard]] int32_t id() override { if (state_ == State::BlockedOnBoth || state_ == State::BlockedOnIncoming) { return incoming_pollable_; diff --git a/include/extension-api.h b/include/extension-api.h index 498f5ef..83b481e 100644 --- a/include/extension-api.h +++ b/include/extension-api.h @@ -32,6 +32,12 @@ namespace api { class AsyncTask; class Engine { + +typedef int Lifetime; + +#define LIFETIME_NONE -1; +#define LIFETIME_ALL -2; + public: Engine(); JSContext *cx(); @@ -62,8 +68,22 @@ class Engine { void enable_module_mode(bool enable); bool eval_toplevel(const char *path, MutableHandleValue result); - bool process_jobs(); - bool process_async_tasks(); + /** + * Run the JS task queue and wait on pending tasks until there + * are no outstanding lifetimes to wait on. + */ + bool run_event_loop(); + + /** + * Add an event loop lifetime to track + */ + void incr_event_loop_lifetime(); + + /** + * Remove an event loop lifetime to track + * The last decrementer marks the event loop as complete to finish + */ + void decr_event_loop_lifetime(); /** * Get the JS value associated with the top-level script execution - @@ -94,7 +114,6 @@ class AsyncTask { virtual bool run(Engine *engine) = 0; virtual bool cancel(Engine *engine) = 0; - virtual bool ready() = 0; [[nodiscard]] virtual PollableHandle id() { MOZ_ASSERT(handle_ != INVALID_POLLABLE_HANDLE); @@ -103,13 +122,16 @@ class AsyncTask { virtual void trace(JSTracer *trc) = 0; - /// Returns the first ready `AsyncTask`. - /// - /// TODO: as an optimization, return a vector containing the ready head of the queue. - /// Note that that works iff the very first entry in the queue is ready, and then only - /// for the dense head of the queue, without gaps. This is because during processing - /// of the ready tasks, other tasks might become ready that should be processed first. - static size_t select(std::vector *handles); + /** + * Poll for completion on the given async tasks + * A list of ready task indices is returned + */ + static std::vector poll(std::vector *handles); + + /** + * Returns whether or not the given task is ready + */ + static bool ready(AsyncTask *task); }; } // namespace api diff --git a/runtime/engine.cpp b/runtime/engine.cpp index f052b42..c75d4f2 100644 --- a/runtime/engine.cpp +++ b/runtime/engine.cpp @@ -363,18 +363,7 @@ bool api::Engine::eval_toplevel(const char *path, MutableHandleValue result) { } SCRIPT_VALUE.init(cx, ns); - - if (!this->process_jobs()) { - return false; - } - while (this->has_pending_async_tasks()) { - if (!this->process_async_tasks()) { - return false; - } - if (!this->process_jobs()) { - return false; - } - } + this->run_event_loop(); // TLA rejections during pre-initialization are treated as top-level exceptions. // TLA may remain unresolved, in which case it will continue tasks at runtime. @@ -425,12 +414,16 @@ bool api::Engine::eval_toplevel(const char *path, MutableHandleValue result) { return true; } -bool api::Engine::process_jobs() { - return core::EventLoop::process_jobs(this, 0); +bool api::Engine::run_event_loop() { + return core::EventLoop::run_event_loop(this, 0); +} + +void api::Engine::incr_event_loop_lifetime() { + return core::EventLoop::incr_event_loop_lifetime(); } -bool api::Engine::process_async_tasks() { - return core::EventLoop::process_async_tasks(this, 0); +void api::Engine::decr_event_loop_lifetime() { + return core::EventLoop::decr_event_loop_lifetime(); } bool api::Engine::dump_value(JS::Value val, FILE *fp) { return ::dump_value(CONTEXT, val, fp); } diff --git a/runtime/event_loop.cpp b/runtime/event_loop.cpp index 1cbb9c0..a4f43ad 100644 --- a/runtime/event_loop.cpp +++ b/runtime/event_loop.cpp @@ -9,6 +9,7 @@ struct TaskQueue { std::vector tasks = {}; + int lifetime_cnt = 0; void trace(JSTracer *trc) const { for (const auto task : tasks) { @@ -38,28 +39,52 @@ bool EventLoop::cancel_async_task(api::Engine *engine, const int32_t id) { bool EventLoop::has_pending_async_tasks() { return !queue.get().tasks.empty(); } -// TODO: implement compute limit -bool EventLoop::process_jobs(api::Engine *engine, double total_compute) { - JSContext *cx = engine->cx(); - while (js::HasJobsPending(cx)) { - js::RunJobs(cx); - if (JS_IsExceptionPending(cx)) - return false; - } - return true; +void EventLoop::incr_event_loop_lifetime() { + queue.get().lifetime_cnt++; +} + +void EventLoop::decr_event_loop_lifetime() { + MOZ_ASSERT(queue.get().lifetime_cnt > 0); + queue.get().lifetime_cnt--; +} + +bool lifetime_complete() { + return queue.get().lifetime_cnt == 0; } -// TODO: implement timeout limit -bool EventLoop::process_async_tasks(api::Engine *engine, double timeout) { - if (has_pending_async_tasks()) { +bool EventLoop::run_event_loop(api::Engine *engine, double total_compute) { + JSContext *cx = engine->cx(); + + while (true) { + while (js::HasJobsPending(cx)) { + js::RunJobs(cx); + if (JS_IsExceptionPending(cx)) + return false; + if (lifetime_complete()) + return true; + } + auto tasks = &queue.get().tasks; - const auto index = api::AsyncTask::select(tasks); - auto task = tasks->at(index); - if (!task->run(engine)) { + + // Unresolved lifetime error - + // if there are no async tasks, and the lifetime was not complete + // then we cannot complete the lifetime + if (tasks->size() == 0) { return false; } - tasks->erase(tasks->begin() + index); + + const auto ready = api::AsyncTask::poll(tasks); + for (auto index : ready) { + auto task = tasks->at(index); + if (!task->run(engine)) { + return false; + } + tasks->erase(tasks->begin() + index); + if (lifetime_complete()) + return true; + } } + return true; } diff --git a/runtime/event_loop.h b/runtime/event_loop.h index 98799ca..6fd9f83 100644 --- a/runtime/event_loop.h +++ b/runtime/event_loop.h @@ -25,9 +25,12 @@ class EventLoop { static bool has_pending_async_tasks(); /** - * Run the micro-tasks / pending Promise reactions + * Run the event loop until all lifetimes are complete */ - static bool process_jobs(api::Engine *engine, double total_compute); + static bool run_event_loop(api::Engine *engine, double total_compute); + + static void incr_event_loop_lifetime(); + static void decr_event_loop_lifetime(); /** * Select on the next async tasks