Skip to content

Commit

Permalink
new event loop architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
guybedford committed Apr 25, 2024
1 parent 12738d7 commit b31302f
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 96 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Then tests can be run with `ctest` directly via:
CTEST_OUTPUT_ON_FAILURE=1 ctest --test-dir cmake-build-debug -j8
```

Alternative, the integration test server can be directly run with `wasmtime serve` via:
Alternatively, the integration test server can be directly run with `wasmtime serve` via:

```bash
wasmtime serve -S common cmake-build-debug/test-server.wasm
Expand Down
5 changes: 0 additions & 5 deletions builtins/web/fetch/fetch-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ class ResponseFutureTask final : public api::AsyncTask {
return true;
}

bool ready() override {
// TODO(TS): implement
return true;
}

void trace(JSTracer *trc) override { TraceEdge(trc, &request_, "Request for response future"); }
};

Expand Down
19 changes: 6 additions & 13 deletions builtins/web/fetch/fetch_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ void dec_pending_promise_count(JSObject *self) {
.toInt32();
MOZ_ASSERT(count > 0);
count--;
if (count == 0)
ENGINE->decr_event_loop_lifetime();
JS::SetReservedSlot(self, static_cast<uint32_t>(FetchEvent::Slots::PendingPromiseCount),
JS::Int32Value(count));
}
Expand Down Expand Up @@ -589,19 +591,10 @@ void exports_wasi_http_incoming_handler(exports_wasi_http_incoming_request reque

dispatch_fetch_event(fetch_event, &total_compute);

bool success = ENGINE->process_jobs();
if (success) {
while (FetchEvent::is_active(fetch_event) && ENGINE->has_pending_async_tasks()) {
if (!ENGINE->process_async_tasks()) {
success = false;
break;
}
if (!ENGINE->process_jobs()) {
success = false;
break;
}
}
}
// track the fetch event lifetime, which when decremented ends the event loop
ENGINE->incr_event_loop_lifetime();

bool success = ENGINE->run_event_loop();

if (JS_IsExceptionPending(ENGINE->cx())) {
ENGINE->dump_pending_exception("evaluating incoming request");
Expand Down
10 changes: 0 additions & 10 deletions builtins/web/fetch/request-response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,6 @@ class BodyFutureTask final : public api::AsyncTask {
return true;
}

bool ready() override {
// TODO(TS): implement
return true;
}

void trace(JSTracer *trc) override { TraceEdge(trc, &body_source_, "body source for future"); }
};

Expand Down Expand Up @@ -187,11 +182,6 @@ class ResponseFutureTask final : public api::AsyncTask {
return true;
}

bool ready() override {
// TODO(TS): implement
return true;
}

void trace(JSTracer *trc) override { TraceEdge(trc, &request_, "Request for response future"); }
};

Expand Down
3 changes: 0 additions & 3 deletions builtins/web/timers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class TimerTask final : public api::AsyncTask {
}

[[nodiscard]] bool run(api::Engine *engine) override {
MOZ_ASSERT(ready());
JSContext *cx = engine->cx();

const RootedObject callback(cx, callback_);
Expand Down Expand Up @@ -69,8 +68,6 @@ class TimerTask final : public api::AsyncTask {
return true;
}

bool ready() override { return host_api::MonotonicClock::now() >= this->deadline_; }

void trace(JSTracer *trc) override {
TraceEdge(trc, &callback_, "Timer callback");
for (auto &arg : arguments_) {
Expand Down
7 changes: 2 additions & 5 deletions host-apis/wasi-0.2.0-rc-2023-10-18/host_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ template <> struct HandleOps<Pollable> {

} // namespace

size_t api::AsyncTask::select(std::vector<api::AsyncTask *> *tasks) {
std::vector<size_t> api::AsyncTask::poll(std::vector<api::AsyncTask *> *tasks) {
auto count = tasks->size();
vector<Borrow<Pollable>> handles;
for (const auto task : *tasks) {
Expand All @@ -106,10 +106,7 @@ size_t api::AsyncTask::select(std::vector<api::AsyncTask *> *tasks) {
bindings_list_u32_t result{nullptr, 0};
wasi_io_0_2_0_rc_2023_10_18_poll_poll_list(&list, &result);
MOZ_ASSERT(result.len > 0);
const auto ready_index = result.ptr[0];
free(result.ptr);

return ready_index;
return std::vector<size_t>(result.ptr, result.ptr + result.len);
}

namespace host_api {
Expand Down
7 changes: 2 additions & 5 deletions host-apis/wasi-0.2.0-rc-2023-12-05/host_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ template <> struct HandleOps<Pollable> {

} // namespace

size_t api::AsyncTask::select(std::vector<api::AsyncTask *> *tasks) {
std::vector<size_t> api::AsyncTask::poll(std::vector<api::AsyncTask *> *tasks) {
auto count = tasks->size();
vector<Borrow<Pollable>> handles;
for (const auto task : *tasks) {
Expand All @@ -110,10 +110,7 @@ size_t api::AsyncTask::select(std::vector<api::AsyncTask *> *tasks) {
bindings_list_u32_t result{nullptr, 0};
wasi_io_0_2_0_rc_2023_11_10_poll_poll(&list, &result);
MOZ_ASSERT(result.len > 0);
const auto ready_index = result.ptr[0];
free(result.ptr);

return ready_index;
return std::vector<size_t>(result.ptr, result.ptr + result.len);
}

namespace host_api {
Expand Down
12 changes: 2 additions & 10 deletions host-apis/wasi-0.2.0/host_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ template <> struct HandleOps<Pollable> {

} // namespace

size_t api::AsyncTask::select(std::vector<api::AsyncTask *> *tasks) {
std::vector<size_t> api::AsyncTask::poll(std::vector<api::AsyncTask *> *tasks) {
auto count = tasks->size();
vector<Borrow<Pollable>> handles;
for (const auto task : *tasks) {
Expand All @@ -107,10 +107,7 @@ size_t api::AsyncTask::select(std::vector<api::AsyncTask *> *tasks) {
wasi_io_0_2_0_poll_list_u32_t result{nullptr, 0};
wasi_io_0_2_0_poll_poll(&list, &result);
MOZ_ASSERT(result.len > 0);
const auto ready_index = result.ptr[0];
free(result.ptr);

return ready_index;
return std::vector<size_t>(result.ptr, result.ptr + result.len);
}

namespace host_api {
Expand Down Expand Up @@ -558,11 +555,6 @@ class BodyAppendTask final : public api::AsyncTask {
return true;
}

bool ready() override {
// TODO(TS): properly implement. This won't ever return `true` right now
return state_ == State::Ready;
}

[[nodiscard]] int32_t id() override {
if (state_ == State::BlockedOnBoth || state_ == State::BlockedOnIncoming) {
return incoming_pollable_;
Expand Down
42 changes: 32 additions & 10 deletions include/extension-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ namespace api {
class AsyncTask;

class Engine {

typedef int Lifetime;

#define LIFETIME_NONE -1;
#define LIFETIME_ALL -2;

public:
Engine();
JSContext *cx();
Expand Down Expand Up @@ -62,8 +68,22 @@ class Engine {
void enable_module_mode(bool enable);
bool eval_toplevel(const char *path, MutableHandleValue result);

bool process_jobs();
bool process_async_tasks();
/**
* Run the JS task queue and wait on pending tasks until there
* are no outstanding lifetimes to wait on.
*/
bool run_event_loop();

/**
* Add an event loop lifetime to track
*/
void incr_event_loop_lifetime();

/**
* Remove an event loop lifetime to track
* The last decrementer marks the event loop as complete to finish
*/
void decr_event_loop_lifetime();

/**
* Get the JS value associated with the top-level script execution -
Expand Down Expand Up @@ -94,7 +114,6 @@ class AsyncTask {

virtual bool run(Engine *engine) = 0;
virtual bool cancel(Engine *engine) = 0;
virtual bool ready() = 0;

[[nodiscard]] virtual PollableHandle id() {
MOZ_ASSERT(handle_ != INVALID_POLLABLE_HANDLE);
Expand All @@ -103,13 +122,16 @@ class AsyncTask {

virtual void trace(JSTracer *trc) = 0;

/// Returns the first ready `AsyncTask`.
///
/// TODO: as an optimization, return a vector containing the ready head of the queue.
/// Note that that works iff the very first entry in the queue is ready, and then only
/// for the dense head of the queue, without gaps. This is because during processing
/// of the ready tasks, other tasks might become ready that should be processed first.
static size_t select(std::vector<AsyncTask *> *handles);
/**
* Poll for completion on the given async tasks
* A list of ready task indices is returned
*/
static std::vector<size_t> poll(std::vector<AsyncTask *> *handles);

/**
* Returns whether or not the given task is ready
*/
static bool ready(AsyncTask *task);
};

} // namespace api
Expand Down
25 changes: 9 additions & 16 deletions runtime/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,18 +363,7 @@ bool api::Engine::eval_toplevel(const char *path, MutableHandleValue result) {
}

SCRIPT_VALUE.init(cx, ns);

if (!this->process_jobs()) {
return false;
}
while (this->has_pending_async_tasks()) {
if (!this->process_async_tasks()) {
return false;
}
if (!this->process_jobs()) {
return false;
}
}
this->run_event_loop();

// TLA rejections during pre-initialization are treated as top-level exceptions.
// TLA may remain unresolved, in which case it will continue tasks at runtime.
Expand Down Expand Up @@ -425,12 +414,16 @@ bool api::Engine::eval_toplevel(const char *path, MutableHandleValue result) {
return true;
}

bool api::Engine::process_jobs() {
return core::EventLoop::process_jobs(this, 0);
bool api::Engine::run_event_loop() {
return core::EventLoop::run_event_loop(this, 0);
}

void api::Engine::incr_event_loop_lifetime() {
return core::EventLoop::incr_event_loop_lifetime();
}

bool api::Engine::process_async_tasks() {
return core::EventLoop::process_async_tasks(this, 0);
void api::Engine::decr_event_loop_lifetime() {
return core::EventLoop::decr_event_loop_lifetime();
}

bool api::Engine::dump_value(JS::Value val, FILE *fp) { return ::dump_value(CONTEXT, val, fp); }
Expand Down
57 changes: 41 additions & 16 deletions runtime/event_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

struct TaskQueue {
std::vector<api::AsyncTask *> tasks = {};
int lifetime_cnt = 0;

void trace(JSTracer *trc) const {
for (const auto task : tasks) {
Expand Down Expand Up @@ -38,28 +39,52 @@ bool EventLoop::cancel_async_task(api::Engine *engine, const int32_t id) {

bool EventLoop::has_pending_async_tasks() { return !queue.get().tasks.empty(); }

// TODO: implement compute limit
bool EventLoop::process_jobs(api::Engine *engine, double total_compute) {
JSContext *cx = engine->cx();
while (js::HasJobsPending(cx)) {
js::RunJobs(cx);
if (JS_IsExceptionPending(cx))
return false;
}
return true;
void EventLoop::incr_event_loop_lifetime() {
queue.get().lifetime_cnt++;
}

void EventLoop::decr_event_loop_lifetime() {
MOZ_ASSERT(queue.get().lifetime_cnt > 0);
queue.get().lifetime_cnt--;
}

bool lifetime_complete() {
return queue.get().lifetime_cnt == 0;
}

// TODO: implement timeout limit
bool EventLoop::process_async_tasks(api::Engine *engine, double timeout) {
if (has_pending_async_tasks()) {
bool EventLoop::run_event_loop(api::Engine *engine, double total_compute) {
JSContext *cx = engine->cx();

while (true) {
while (js::HasJobsPending(cx)) {
js::RunJobs(cx);
if (JS_IsExceptionPending(cx))
return false;
if (lifetime_complete())
return true;
}

auto tasks = &queue.get().tasks;
const auto index = api::AsyncTask::select(tasks);
auto task = tasks->at(index);
if (!task->run(engine)) {

// Unresolved lifetime error -
// if there are no async tasks, and the lifetime was not complete
// then we cannot complete the lifetime
if (tasks->size() == 0) {
return false;
}
tasks->erase(tasks->begin() + index);

const auto ready = api::AsyncTask::poll(tasks);
for (auto index : ready) {
auto task = tasks->at(index);
if (!task->run(engine)) {
return false;
}
tasks->erase(tasks->begin() + index);
if (lifetime_complete())
return true;
}
}

return true;
}

Expand Down
7 changes: 5 additions & 2 deletions runtime/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ class EventLoop {
static bool has_pending_async_tasks();

/**
* Run the micro-tasks / pending Promise reactions
* Run the event loop until all lifetimes are complete
*/
static bool process_jobs(api::Engine *engine, double total_compute);
static bool run_event_loop(api::Engine *engine, double total_compute);

static void incr_event_loop_lifetime();
static void decr_event_loop_lifetime();

/**
* Select on the next async tasks
Expand Down

0 comments on commit b31302f

Please sign in to comment.