Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/single-tick' into update-upstr…
Browse files Browse the repository at this point in the history
…eam-and-apply-fetch-rework

# Conflicts:
#	builtins/web/fetch/fetch_event.cpp
#	builtins/web/fetch/request-response.cpp
  • Loading branch information
noise64 committed Jun 5, 2024
2 parents e8870ef + 6a8ae9e commit 25bb72d
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 14 deletions.
83 changes: 82 additions & 1 deletion builtins/web/fetch/fetch_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ void inc_pending_promise_count(JSObject *self) {
auto count =
JS::GetReservedSlot(self, static_cast<uint32_t>(FetchEvent::Slots::PendingPromiseCount))
.toInt32();
if (count == 0) {
ENGINE->incr_event_loop_interest();
}
count++;
MOZ_ASSERT(count > 0);
JS::SetReservedSlot(self, static_cast<uint32_t>(FetchEvent::Slots::PendingPromiseCount),
Expand All @@ -43,8 +46,9 @@ void dec_pending_promise_count(JSObject *self) {
.toInt32();
MOZ_ASSERT(count > 0);
count--;
if (count == 0)
if (count == 0) {
ENGINE->decr_event_loop_interest();
}
JS::SetReservedSlot(self, static_cast<uint32_t>(FetchEvent::Slots::PendingPromiseCount),
JS::Int32Value(count));
}
Expand Down Expand Up @@ -204,6 +208,10 @@ bool start_response(JSContext *cx, JS::HandleObject response_obj, bool streaming
STREAMING_BODY = response->body().unwrap();
}

if (streaming) {
ENGINE->incr_event_loop_interest();
}

return send_response(response, FetchEvent::instance(),
streaming ? FetchEvent::State::responseStreaming
: FetchEvent::State::responseDone);
Expand Down Expand Up @@ -430,11 +438,13 @@ bool FetchEvent::is_dispatching(JSObject *self) {

void FetchEvent::start_dispatching(JSObject *self) {
MOZ_ASSERT(!is_dispatching(self));
ENGINE->incr_event_loop_interest();
JS::SetReservedSlot(self, static_cast<uint32_t>(Slots::Dispatch), JS::TrueValue());
}

void FetchEvent::stop_dispatching(JSObject *self) {
MOZ_ASSERT(is_dispatching(self));
ENGINE->decr_event_loop_interest();
JS::SetReservedSlot(self, static_cast<uint32_t>(Slots::Dispatch), JS::FalseValue());
}

Expand Down Expand Up @@ -638,3 +648,74 @@ bool install(api::Engine *engine) {
}

} // namespace builtins::web::fetch::fetch_event

// #define S_TO_NS(s) ((s) * 1000000000)
// static int64_t now_ns() {
// timespec now{};
// clock_gettime(CLOCK_MONOTONIC, &now);
// return S_TO_NS(now.tv_sec) + now.tv_nsec;
// }
using namespace builtins::web::fetch::fetch_event;
// TODO: change this to fully work in terms of host_api.
void exports_wasi_http_incoming_handler(exports_wasi_http_incoming_request request_handle,
exports_wasi_http_response_outparam response_out) {

// auto begin = now_ns();
// auto id1 = host_api::MonotonicClock::subscribe(begin + 1, true);
// auto id2 = host_api::MonotonicClock::subscribe(begin + 1000000*1000, true);
// bindings_borrow_pollable_t handles[2] = {bindings_borrow_pollable_t{id2},
// bindings_borrow_pollable_t{id1}}; auto list = bindings_list_borrow_pollable_t{handles, 2};
// bindings_list_u32_t res = {.ptr = nullptr,.len = 0};
// wasi_io_0_2_0_rc_2023_10_18_poll_poll_list(&list, &res);
// fprintf(stderr, "first ready after first poll: %d. diff: %lld\n", handles[res.ptr[0]].__handle,
// (now_ns() - begin) / 1000);
//
// wasi_io_0_2_0_rc_2023_10_18_poll_pollable_drop_own(bindings_own_pollable_t{id1});
//
// bindings_borrow_pollable_t handles2[1] = {bindings_borrow_pollable_t{id2}};
// list = bindings_list_borrow_pollable_t{handles2, 1};
// wasi_io_0_2_0_rc_2023_10_18_poll_poll_list(&list, &res);
// fprintf(stderr, "first ready after second poll: %d. diff: %lld\n",
// handles2[res.ptr[0]].__handle, (now_ns() - begin) / 1000);
//
// return;

RESPONSE_OUT = response_out.__handle;

auto *request = new host_api::HttpIncomingRequest(request_handle.__handle);
HandleObject fetch_event = FetchEvent::instance();
MOZ_ASSERT(FetchEvent::is_instance(fetch_event));
if (!FetchEvent::init_incoming_request(ENGINE->cx(), fetch_event, request)) {
ENGINE->dump_pending_exception("initialization of FetchEvent");
return;
}

double total_compute = 0;

dispatch_fetch_event(fetch_event, &total_compute);

bool success = ENGINE->run_event_loop();

if (JS_IsExceptionPending(ENGINE->cx())) {
ENGINE->dump_pending_exception("evaluating incoming request");
}

if (!success) {
fprintf(stderr, "Internal error.");
}

if (ENGINE->debug_logging_enabled() && ENGINE->has_pending_async_tasks()) {
fprintf(stderr, "Event loop terminated with async tasks pending. "
"Use FetchEvent#waitUntil to extend the component's "
"lifetime if needed.\n");
}

if (!FetchEvent::response_started(fetch_event)) {
FetchEvent::respondWithError(ENGINE->cx(), fetch_event);
return;
}

if (STREAMING_BODY && STREAMING_BODY->valid()) {
STREAMING_BODY->close();
}
}
30 changes: 26 additions & 4 deletions builtins/web/fetch/request-response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,28 @@ bool reader_for_outgoing_body_then_handler(JSContext *cx, JS::HandleObject body_
return false;

if (done_val.toBoolean()) {
return finish_outgoing_body_streaming(cx, body_owner);
// The only response we ever send is the one passed to
// `FetchEvent#respondWith` to send to the client. As such, we can be
// certain that if we have a response here, we can advance the FetchState to
// `responseDone`.
// TODO(TS): factor this out to remove dependency on fetch-event.h
if (Response::is_instance(body_owner)) {
ENGINE->decr_event_loop_interest();
fetch_event::FetchEvent::set_state(fetch_event::FetchEvent::instance(),
fetch_event::FetchEvent::State::responseDone);
}

auto res = body->close();
if (auto *err = res.to_err()) {
HANDLE_ERROR(cx, *err);
return false;
}

if (Request::is_instance(body_owner)) {
ENGINE->queue_async_task(new BodyFutureTask(body_owner));
}

return true;
}

JS::RootedValue val(cx);
Expand Down Expand Up @@ -1015,9 +1036,10 @@ bool reader_for_outgoing_body_catch_handler(JSContext *cx, JS::HandleObject body
// `responseDone` is the right state: `respondedWithError` is for when sending
// a response at all failed.)
// TODO(TS): investigate why this is disabled.
// if (Response::is_instance(body_owner)) {
// FetchEvent::set_state(FetchEvent::instance(), FetchEvent::State::responseDone);
// }
if (Response::is_instance(body_owner)) {
ENGINE->decr_event_loop_interest();
// FetchEvent::set_state(FetchEvent::instance(), FetchEvent::State::responseDone);
}
return true;
}

Expand Down
17 changes: 17 additions & 0 deletions host-apis/wasi-0.2.0-rc-2023-10-18/host_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ size_t api::AsyncTask::select(std::vector<api::AsyncTask *> *tasks) {
return ready_index;
}

std::optional<size_t> api::AsyncTask::ready(std::vector<api::AsyncTask *> *tasks) {
auto count = tasks->size();
vector<Borrow<Pollable>> handles;
for (const auto task : *tasks) {
handles.emplace_back(task->id());
}
auto list = list_borrow_pollable_t{
reinterpret_cast<HandleOps<Pollable>::borrow *>(handles.data()), count};
bindings_list_u32_t result{nullptr, 0};
wasi_io_0_2_0_rc_2023_10_18_poll_poll_list(&list, &result);
MOZ_ASSERT(result.len > 0);
const auto ready_index = result.ptr[0];
free(result.ptr);

return ready_index;
}

namespace host_api {

HostString::HostString(const char *c_str) {
Expand Down
14 changes: 14 additions & 0 deletions host-apis/wasi-0.2.0-rc-2023-12-05/host_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ size_t api::AsyncTask::select(std::vector<api::AsyncTask *> *tasks) {
return ready_index;
}

std::optional<size_t> api::AsyncTask::ready(std::vector<api::AsyncTask *> *tasks) {
auto count = tasks->size();
vector<Borrow<Pollable>> handles;
auto list = list_borrow_pollable_t{
reinterpret_cast<HandleOps<Pollable>::borrow *>(handles.data()), count};
bindings_list_u32_t result{nullptr, 0};
wasi_io_0_2_0_rc_2023_10_18_poll_poll_list(&list, &result);
MOZ_ASSERT(result.len > 0);
const auto ready_index = result.ptr[0];
free(result.ptr);

return ready_index;
}

namespace host_api {

HostString::HostString(const char *c_str) {
Expand Down
13 changes: 13 additions & 0 deletions host-apis/wasi-0.2.0/host_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,19 @@ size_t api::AsyncTask::select(std::vector<AsyncTask *> *tasks) {
return ready_index;
}

std::optional<size_t> api::AsyncTask::ready(std::vector<api::AsyncTask *> *tasks) {
auto count = tasks->size();
vector<Borrow<Pollable>> handles;
for (size_t idx = 0; idx < count; ++idx) {
auto task = tasks->at(idx);
Borrow<Pollable> poll = (task->id());
if (wasi_io_0_2_0_poll_method_pollable_ready(poll)) {
return idx;
}
}
return std::nullopt;
}

namespace host_api {

HostString::HostString(const char *c_str) {
Expand Down
5 changes: 5 additions & 0 deletions include/extension-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ class AsyncTask {
* Select for the next available ready task, providing the oldest ready first.
*/
static size_t select(std::vector<AsyncTask *> *handles);

/**
* Non-blocking check for a ready task, providing the oldest ready first, if any.
*/
static std::optional<size_t> ready(std::vector<AsyncTask *> *handles);
};

} // namespace api
Expand Down
31 changes: 22 additions & 9 deletions runtime/event_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,32 +59,42 @@ bool EventLoop::run_event_loop(api::Engine *engine, double total_compute) {
queue.get().event_loop_running = true;
JSContext *cx = engine->cx();

while (true) {
do {
// Run a microtask checkpoint
js::RunJobs(cx);

if (JS_IsExceptionPending(cx)) {
exit_event_loop();
return false;
}
// if there is no interest in the event loop at all, just run one tick
if (interest_complete()) {
exit_event_loop();
return true;
}

const auto tasks = &queue.get().tasks;
size_t tasks_size = tasks->size();

if (tasks_size == 0) {
if (interest_complete()) {
break;
}
exit_event_loop();
MOZ_ASSERT(!interest_complete());
fprintf(stderr, "event loop error - both task and job queues are empty, but expected "
"operations did not resolve");
return false;
}

size_t task_idx;

// Select the next task to run according to event-loop semantics of oldest-first.
size_t task_idx = api::AsyncTask::select(tasks);
if (interest_complete()) {
// Perform a non-blocking select in the case of there being no event loop interest
// (we are thus only performing a "single tick", but must still progress work that is ready)
std::optional<size_t> maybe_task_idx = api::AsyncTask::ready(tasks);
if (!maybe_task_idx.has_value()) {
break;
}
task_idx = maybe_task_idx.value();
} else {
task_idx = api::AsyncTask::select(tasks);
}

auto task = tasks->at(task_idx);
bool success = task->run(engine);
Expand All @@ -93,7 +103,10 @@ bool EventLoop::run_event_loop(api::Engine *engine, double total_compute) {
exit_event_loop();
return false;
}
}
} while (!interest_complete());

exit_event_loop();
return true;
}

void EventLoop::init(JSContext *cx) { queue.init(cx); }
Expand Down

0 comments on commit 25bb72d

Please sign in to comment.