From db48ea0731610f42dd94cda3cb7fde4384788789 Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Thu, 7 Sep 2023 19:53:18 -0700 Subject: [PATCH] refactor(proxy-wasm) improve pwexec resurrection and instance lifecycle The main goal of this overhaul is to simplify `on_context_create`, make it fully re-entrant *and* properly handle instance recycling at the same time. The way to do so, in my opinion, was to move `pwexec` creation where `rexec` already was. In other words, always lookup the context id in the instance rbtree, and if not found, create it. This means that surrounding code also needed big overhauls. It also removes the reference counting poor man's GC of the older implementation. The code became really ugly by then so I took the time to also review this module's code structure instead of making a *very* ugly commit. This new ngx_proxy_wasm.c file should be much easier to read and follow now. One change I do not fully like is moving the `next_id` to a global counter, but we do not have a "global proxy-wasm conf" object yet. I also started thinking about pre-allocating a number of `pwexecs` (like `worker_connections`) and use free/busy queue that all filter chains can dip into to get a context id + context memory zone. Perhaps for a later time. --- src/common/proxy_wasm/ngx_proxy_wasm.c | 1633 +++++++++-------- src/common/proxy_wasm/ngx_proxy_wasm.h | 32 +- src/common/proxy_wasm/ngx_proxy_wasm_util.c | 28 +- src/http/ngx_http_wasm_module.c | 7 +- src/http/ngx_http_wasm_util.c | 2 +- src/http/proxy_wasm/ngx_http_proxy_wasm.h | 2 +- .../proxy_wasm/ngx_http_proxy_wasm_dispatch.c | 2 +- src/wasm/vm/ngx_wavm.c | 6 +- .../007-on_http_instance_isolation.t | 7 +- .../hfuncs/114-proxy_set_http_request_body.t | 3 +- .../hfuncs/119-proxy_properties_get_ngx.t | 6 - .../hfuncs/120-proxy_properties_get_host.t | 13 +- .../hfuncs/122-proxy_properties_set_host.t | 11 +- .../hfuncs/123-proxy_properties_set_ngx.t | 6 - 14 files changed, 928 insertions(+), 830 deletions(-) diff --git a/src/common/proxy_wasm/ngx_proxy_wasm.c b/src/common/proxy_wasm/ngx_proxy_wasm.c index 05b035172..df22e9bef 100644 --- a/src/common/proxy_wasm/ngx_proxy_wasm.c +++ b/src/common/proxy_wasm/ngx_proxy_wasm.c @@ -10,115 +10,55 @@ #endif -static ngx_int_t ngx_proxy_wasm_init_abi(ngx_proxy_wasm_filter_t *filter); -static ngx_int_t ngx_proxy_wasm_start_filter(ngx_proxy_wasm_filter_t *filter); -static void ngx_proxy_wasm_instance_destroy(ngx_proxy_wasm_instance_t *ictx); -static ngx_proxy_wasm_err_e ngx_proxy_wasm_on_start( - ngx_proxy_wasm_instance_t *ictx, ngx_proxy_wasm_filter_t *filter, - unsigned start); +static ngx_proxy_wasm_err_e ngx_proxy_wasm_create_context( + ngx_proxy_wasm_filter_t *filter, ngx_proxy_wasm_ctx_t *pwctx, + ngx_uint_t id, ngx_proxy_wasm_exec_t *in, ngx_proxy_wasm_exec_t **out); static void ngx_proxy_wasm_on_log(ngx_proxy_wasm_exec_t *pwexec); static void ngx_proxy_wasm_on_done(ngx_proxy_wasm_exec_t *pwexec); static ngx_int_t ngx_proxy_wasm_on_tick(ngx_proxy_wasm_exec_t *pwexec); +static ngx_proxy_wasm_filter_t *ngx_proxy_wasm_lookup_filter(ngx_uint_t id); +static ngx_proxy_wasm_exec_t *ngx_proxy_wasm_lookup_root_ctx( + ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id); +static ngx_proxy_wasm_exec_t *ngx_proxy_wasm_lookup_ctx( + ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id); +static ngx_int_t ngx_proxy_wasm_filter_init_abi( + ngx_proxy_wasm_filter_t *filter); +static ngx_int_t ngx_proxy_wasm_filter_start(ngx_proxy_wasm_filter_t *filter); +static void ngx_proxy_wasm_instance_update( + ngx_proxy_wasm_instance_t *ictx, ngx_proxy_wasm_exec_t *pwexec); +static void ngx_proxy_wasm_instance_invalidate(ngx_proxy_wasm_instance_t *ictx); +static void ngx_proxy_wasm_instance_destroy(ngx_proxy_wasm_instance_t *ictx); +static void ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store); +static void ngx_proxy_wasm_store_sweep(ngx_proxy_wasm_store_t *store); +#if 0 +static void ngx_proxy_wasm_store_schedule_sweep_handler(ngx_event_t *ev); +static void ngx_proxy_wasm_store_schedule_sweep(ngx_proxy_wasm_store_t *store); +#endif +static ngx_uint_t next_id = 1; static ngx_rbtree_t ngx_proxy_wasm_filters_rbtree; static ngx_rbtree_node_t ngx_proxy_wasm_filters_sentinel; -static ngx_proxy_wasm_filter_t * -ngx_proxy_wasm_filter_lookup(ngx_uint_t id) -{ - ngx_rbtree_t *rbtree; - ngx_rbtree_node_t *node, *sentinel; - ngx_proxy_wasm_filter_t *filter; - - rbtree = &ngx_proxy_wasm_filters_rbtree; - node = rbtree->root; - sentinel = rbtree->sentinel; - - while (node != sentinel) { - - if (id != node->key) { - node = (id < node->key) ? node->left : node->right; - continue; - } - - filter = ngx_rbtree_data(node, ngx_proxy_wasm_filter_t, node); - - return filter; - } - - return NULL; -} - - -static ngx_proxy_wasm_exec_t * -ngx_proxy_wasm_root_lookup(ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id) -{ - ngx_rbtree_t *rbtree; - ngx_rbtree_node_t *node, *sentinel; - ngx_proxy_wasm_exec_t *pwexec; - - rbtree = &ictx->root_ctxs; - node = rbtree->root; - sentinel = rbtree->sentinel; - - while (node != sentinel) { - - if (id != node->key) { - node = (id < node->key) ? node->left : node->right; - continue; - } - - pwexec = ngx_rbtree_data(node, ngx_proxy_wasm_exec_t, node); - - return pwexec; - } - - return NULL; -} - - -static ngx_proxy_wasm_exec_t * -ngx_proxy_wasm_ctx_lookup(ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id) -{ - ngx_rbtree_t *rbtree; - ngx_rbtree_node_t *node, *sentinel; - ngx_proxy_wasm_exec_t *pwexec; - - rbtree = &ictx->tree_ctxs; - node = rbtree->root; - sentinel = rbtree->sentinel; - - while (node != sentinel) { - - if (id != node->key) { - node = (id < node->key) ? node->left : node->right; - continue; - } - - pwexec = ngx_rbtree_data(node, ngx_proxy_wasm_exec_t, node); - - return pwexec; - } - - return NULL; -} +/* context - root */ void -ngx_proxy_wasm_init(ngx_conf_t *cf) +ngx_proxy_wasm_init(ngx_conf_t *cf, ngx_proxy_wasm_store_t *gstore) { ngx_rbtree_init(&ngx_proxy_wasm_filters_rbtree, &ngx_proxy_wasm_filters_sentinel, ngx_rbtree_insert_value); ngx_proxy_wasm_properties_init(cf); + + ngx_proxy_wasm_store_init(gstore, cf->pool); } void -ngx_proxy_wasm_exit() +ngx_proxy_wasm_exit(ngx_proxy_wasm_store_t *gstore) { ngx_rbtree_node_t **root_node, **sentinel, *node; ngx_proxy_wasm_filter_t *filter; @@ -132,17 +72,15 @@ ngx_proxy_wasm_exit() ngx_rbtree_delete(&ngx_proxy_wasm_filters_rbtree, node); - ngx_log_debug1(NGX_LOG_DEBUG_WASM, filter->log, 0, - "proxy_wasm releasing \"%V\" filter instance", - filter->name); - ngx_proxy_wasm_store_destroy(filter->store); } + + ngx_proxy_wasm_store_destroy(gstore); } -ngx_uint_t -ngx_proxy_wasm_id(ngx_str_t *name, ngx_str_t *config, uintptr_t data) +static ngx_uint_t +get_filter_id(ngx_str_t *name, ngx_str_t *config, uintptr_t data) { u_char buf[NGX_INT64_LEN]; uint32_t hash; @@ -178,12 +116,12 @@ ngx_proxy_wasm_load(ngx_proxy_wasm_filter_t *filter, ngx_log_t *log) filter->log = log; filter->name = &filter->module->name; - filter->id = ngx_proxy_wasm_id(filter->name, &filter->config, filter->data); + filter->id = get_filter_id(filter->name, &filter->config, filter->data); filter->node.key = filter->id; ngx_rbtree_insert(&ngx_proxy_wasm_filters_rbtree, &filter->node); - if (ngx_proxy_wasm_init_abi(filter) != NGX_OK) { + if (ngx_proxy_wasm_filter_init_abi(filter) != NGX_OK) { return NGX_ERROR; } @@ -217,7 +155,7 @@ ngx_proxy_wasm_start(ngx_cycle_t *cycle) { filter = ngx_rbtree_data(node, ngx_proxy_wasm_filter_t, node); - rc = ngx_proxy_wasm_start_filter(filter); + rc = ngx_proxy_wasm_filter_start(filter); if (rc != NGX_OK) { ngx_proxy_wasm_log_error(NGX_LOG_EMERG, filter->log, filter->ecode, "failed initializing \"%V\" filter", @@ -232,6 +170,9 @@ ngx_proxy_wasm_start(ngx_cycle_t *cycle) } +/* context - stream */ + + ngx_proxy_wasm_ctx_t * ngx_proxy_wasm_ctx_alloc(ngx_pool_t *pool) { @@ -260,15 +201,11 @@ ngx_proxy_wasm_ctx_t * ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, size_t nfilters, ngx_uint_t isolation, ngx_proxy_wasm_subsystem_t *subsys, void *data) { - size_t i; - ngx_uint_t id; - ngx_pool_t *pool; - ngx_log_t *log; - ngx_proxy_wasm_ctx_t *pwctx; - ngx_proxy_wasm_exec_t *pwexec; - ngx_proxy_wasm_filter_t *filter; - ngx_proxy_wasm_instance_t *ictx; - ngx_proxy_wasm_store_t *pwstore = NULL; + size_t i; + ngx_uint_t id; + ngx_proxy_wasm_ctx_t *pwctx; + ngx_proxy_wasm_filter_t *filter; + ngx_proxy_wasm_exec_t *pwexec = NULL; pwctx = subsys->get_context(data); if (pwctx == NULL) { @@ -292,113 +229,107 @@ ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, size_t nfilters, for (i = 0; i < nfilters; i++) { id = filter_ids[i]; - filter = ngx_proxy_wasm_filter_lookup(id); + filter = ngx_proxy_wasm_lookup_filter(id); if (filter == NULL) { /* TODO: log error */ ngx_wasm_assert(0); return NULL; } - pool = pwctx->pool; - log = pwctx->log; - - switch (pwctx->isolation) { - case NGX_PROXY_WASM_ISOLATION_NONE: - pwstore = filter->store; - break; - case NGX_PROXY_WASM_ISOLATION_STREAM: - pwstore = &pwctx->store; - break; - case NGX_PROXY_WASM_ISOLATION_FILTER: - break; - default: - ngx_proxy_wasm_log_error(NGX_LOG_WASM_NYI, pwctx->log, 0, - "NYI - instance isolation: %d", - pwctx->isolation); - return NULL; - } - - pwexec = ngx_array_push(&pwctx->pwexecs); + (void) ngx_proxy_wasm_create_context(filter, pwctx, + next_id++, NULL, &pwexec); if (pwexec == NULL) { return NULL; } - ngx_memzero(pwexec, sizeof(ngx_proxy_wasm_exec_t)); + } /* for () */ - pwexec->index = i; - pwexec->pool = pool; - pwexec->filter = filter; - pwexec->parent = pwctx; - pwexec->root_id = filter->id; + pwctx->ready = 1; - pwexec->log = ngx_pcalloc(pwexec->pool, sizeof(ngx_log_t)); - if (pwexec->log == NULL) { - return NULL; - } + } /* !ready */ - pwexec->log->file = log->file; - pwexec->log->next = log->next; - pwexec->log->writer = log->writer; - pwexec->log->wdata = log->wdata; - pwexec->log->log_level = log->log_level; - pwexec->log->connection = log->connection; - pwexec->log->handler = ngx_proxy_wasm_log_error_handler; - pwexec->log->data = &pwexec->log_ctx; + return pwctx; +} - pwexec->log_ctx.pwexec = pwexec; - pwexec->log_ctx.orig_log = log; - ictx = ngx_proxy_wasm_get_instance(filter, pwstore, pwexec, log); - if (ictx == NULL) { - return NULL; - } +static void +destroy_pwexec(ngx_proxy_wasm_exec_t *pwexec) +{ + if (pwexec->ev) { + ngx_del_timer(pwexec->ev); + ngx_free(pwexec->ev); + + pwexec->ev = NULL; + } - pwexec->ictx = ictx; + if (pwexec->log) { + if (pwexec->log_ctx.log_prefix.data) { + ngx_pfree(pwexec->pool, pwexec->log_ctx.log_prefix.data); + } - ngx_wasm_assert(pwexec->index + 1 == pwctx->pwexecs.nelts); + ngx_pfree(pwexec->pool, pwexec->log); - } /* for () */ + pwexec->log = NULL; + } - pwctx->ready = 1; + if (pwexec->parent) { + if (pwexec->log_ctx.log_prefix.data) { + ngx_pfree(pwexec->pool, pwexec->log_ctx.log_prefix.data); - } /* !ready */ + pwexec->log_ctx.log_ctx.data = NULL; + } - return pwctx; + if (pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID) { + ngx_pfree(pwexec->pool, pwexec->parent); + + pwexec->parent = NULL; + + } else if (pwexec->parent->isolation + == NGX_PROXY_WASM_ISOLATION_FILTER + && pwexec->store) + { + /* store specifically allocated for the pwexec */ + ngx_pfree(pwexec->parent->pool, pwexec->store); + + pwexec->store = NULL; + } + } } void ngx_proxy_wasm_ctx_destroy(ngx_proxy_wasm_ctx_t *pwctx) { - size_t i; - ngx_proxy_wasm_exec_t *pwexec, *pwexecs; - ngx_proxy_wasm_instance_t *ictx; + size_t i; + ngx_proxy_wasm_exec_t *pwexec, *pwexecs; pwexecs = (ngx_proxy_wasm_exec_t *) pwctx->pwexecs.elts; for (i = 0; i < pwctx->pwexecs.nelts; i++) { pwexec = &pwexecs[i]; - ictx = pwexec->ictx; ngx_wasm_assert(pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID); - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwexec->log, 0, + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwctx->log, 0, "\"%V\" filter freeing context #%d " "(%l/%l)", pwexec->filter->name, pwexec->id, pwexec->index + 1, pwctx->nfilters); - if (pwexec->log) { - if (pwexec->log_ctx.log_prefix.data) { - ngx_pfree(pwexec->pool, pwexec->log_ctx.log_prefix.data); - } + if (pwctx->isolation == NGX_PROXY_WASM_ISOLATION_NONE) { + /* sweep if an instance has trapped */ + ngx_proxy_wasm_store_sweep(pwexec->ictx->store); + } - ngx_pfree(pwexec->pool, pwexec->log); + if (pwexec->node.key) { + ngx_rbtree_delete(&pwexec->ictx->tree_ctxs, &pwexec->node); } - ngx_proxy_wasm_release_instance(ictx, 0); + destroy_pwexec(pwexec); } + ngx_array_destroy(&pwctx->pwexecs); + ngx_proxy_wasm_store_destroy(&pwctx->store); #if 0 @@ -454,7 +385,7 @@ ngx_proxy_wasm_ctx_destroy(ngx_proxy_wasm_ctx_t *pwctx) static ngx_int_t -ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx, +action2rc(ngx_proxy_wasm_ctx_t *pwctx, ngx_proxy_wasm_exec_t *pwexec) { ngx_int_t rc = NGX_ERROR; @@ -516,6 +447,7 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx, /* determine current action rc */ switch (action) { + case NGX_PROXY_WASM_ACTION_DONE: ngx_log_debug0(NGX_LOG_DEBUG_WASM, pwctx->log, 0, "proxy_wasm action " "\"DONE\" -> \"CONTINUE\" to resume later phases"); @@ -533,11 +465,10 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx, case NGX_HTTP_REWRITE_PHASE: case NGX_HTTP_ACCESS_PHASE: case NGX_HTTP_CONTENT_PHASE: - ngx_log_debug7(NGX_LOG_DEBUG_WASM, pwctx->log, 0, + ngx_log_debug6(NGX_LOG_DEBUG_WASM, pwctx->log, 0, "proxy_wasm pausing in \"%V\" phase " - "(root: %d, filter: %l/%l, step: %d, action: %d, " + "(filter: %l/%l, step: %d, action: %d, " "pwctx: %p)", &pwctx->phase->name, - pwexec->id == NGX_PROXY_WASM_ROOT_CTX_ID, pwexec->index + 1, pwctx->nfilters, pwctx->step, action, pwctx); goto yield; @@ -595,31 +526,142 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx, } +ngx_int_t +ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx, + ngx_wasm_phase_t *phase, ngx_proxy_wasm_step_e step) +{ + size_t i; + ngx_int_t rc = NGX_OK; + ngx_proxy_wasm_exec_t *pwexec, *pwexecs; + + dd("enter"); + + switch (step) { + case NGX_PROXY_WASM_STEP_TICK: + case NGX_PROXY_WASM_STEP_DONE: + case NGX_PROXY_WASM_STEP_RESP_BODY: + case NGX_PROXY_WASM_STEP_RESP_HEADERS: + case NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE: + break; + default: + if (step <= pwctx->last_step) { + dd("step %d already completed, exit", step); + ngx_wasm_assert(rc == NGX_OK); + goto ret; + } + } + + pwctx->step = step; + + /* resume filters chain */ + + pwexecs = (ngx_proxy_wasm_exec_t *) pwctx->pwexecs.elts; + + dd("pwctx->exec_index: %ld, nelts: %ld", + pwctx->exec_index, pwctx->pwexecs.nelts); + + for (i = pwctx->exec_index; i < pwctx->pwexecs.nelts; i++) { + dd("exec_index: %ld", i); + + pwexec = &pwexecs[i]; + + ngx_wasm_assert(pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID); + + /* check for yielded state */ + + rc = action2rc(pwctx, pwexec); + if (rc != NGX_OK) { + goto ret; + } + + /* run step */ + + pwexec->ecode = ngx_proxy_wasm_run_step(pwexec, step); + dd("pwexec->ecode: %d, pwctx->action: %d (pwctx: %p)", + pwexec->ecode, pwctx->action, pwctx); + if (pwexec->ecode != NGX_PROXY_WASM_ERR_NONE) { + rc = pwexec->filter->subsystem->ecode(pwexec->ecode); + goto ret; + } + + /* check for yield/done */ + + rc = action2rc(pwctx, pwexec); + if (rc != NGX_OK) { + if (rc == NGX_AGAIN + && pwctx->exec_index + 1 <= pwctx->nfilters) + { + dd("yield: resume on next filter " + "(idx: %ld -> %ld, nelts: %ld)", + pwctx->exec_index, pwctx->exec_index + 1, + pwctx->pwexecs.nelts); + + pwctx->exec_index++; + } + + goto ret; + } + + pwctx->exec_index++; + + dd("-------- next filter --------"); + } + + ngx_wasm_assert(rc == NGX_OK); + + /* next step */ + + pwctx->last_step = pwctx->step; + pwctx->exec_index = 0; + +ret: + + if (step == NGX_PROXY_WASM_STEP_DONE) { + ngx_proxy_wasm_ctx_destroy(pwctx); + } + + dd("exit rc: %ld", rc); + + return rc; +} + + ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, - ngx_proxy_wasm_instance_t *ictx, ngx_proxy_wasm_step_e step) + ngx_proxy_wasm_step_e step) { ngx_int_t rc; + ngx_proxy_wasm_err_e ecode; ngx_proxy_wasm_action_e action = NGX_PROXY_WASM_ACTION_CONTINUE; ngx_proxy_wasm_ctx_t *pwctx = pwexec->parent; ngx_proxy_wasm_filter_t *filter = pwexec->filter; - ngx_wavm_instance_t *instance = ictx->instance; #if (NGX_DEBUG) ngx_proxy_wasm_action_e old_action = pwctx->action; #endif - ngx_wasm_assert(ictx->module == filter->module); ngx_wasm_assert(pwctx->phase); - ictx->pwexec = pwexec; /* set instance current pwexec */ - pwexec->ictx = ictx; /* link pwexec to instance */ + dd("enter (pwexec: %p, ictx: %p, trapped: %d)", + pwexec, pwexec->ictx, pwexec->ictx->instance->trapped); - /* - * update instance log - * (instance->data = ictx already set by instance_new) - */ - ngx_wavm_instance_set_data(instance, ictx, - pwexec ? pwexec->log : filter->log); +#if 0 + /* possible optimization, commented for linter */ + if (pwexec->ictx->instance->trapped) { +#endif + ecode = ngx_proxy_wasm_create_context(filter, pwctx, pwexec->id, + pwexec, NULL); + if (ecode != NGX_PROXY_WASM_ERR_NONE) { + ngx_wasm_assert(0); + return ecode; + } +#if 0 + } +#endif + + ngx_wasm_assert(!pwexec->ictx->instance->trapped); + ngx_wasm_assert(pwexec->ictx->module == filter->module); + + ngx_proxy_wasm_instance_update(pwexec->ictx, pwexec); if (pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID) { ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwexec->log, 0, @@ -659,7 +701,9 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, rc = filter->subsystem->resume(pwexec, step, &action); break; case NGX_PROXY_WASM_STEP_TICK: + pwexec->in_tick = 1; rc = ngx_proxy_wasm_on_tick(pwexec); + pwexec->in_tick = 0; break; default: ngx_proxy_wasm_log_error(NGX_LOG_WASM_NYI, pwctx->log, 0, @@ -668,8 +712,8 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, break; } - dd("rc: %ld, old_action: %d, pwctx->action: %d, ret action: %d", - rc, old_action, pwctx->action, action); + dd("rc: %ld, old_action: %d, pwctx->action: %d, ret action: %d, ictx: %p", + rc, old_action, pwctx->action, action, pwexec->ictx); /* pwctx->action writes in host calls overwrite action return value */ @@ -701,181 +745,565 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, done: - ictx->pwexec = NULL; - return pwexec->ecode; } -ngx_int_t -ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx, - ngx_wasm_phase_t *phase, ngx_proxy_wasm_step_e step) -{ - size_t i; - ngx_int_t rc = NGX_OK; - ngx_proxy_wasm_filter_t *filter; - ngx_proxy_wasm_instance_t *ictx; - ngx_proxy_wasm_exec_t *pwexec, *pwexecs; - - dd("enter"); +/* host handlers */ - switch (step) { - case NGX_PROXY_WASM_STEP_TICK: - case NGX_PROXY_WASM_STEP_DONE: - case NGX_PROXY_WASM_STEP_RESP_BODY: - case NGX_PROXY_WASM_STEP_RESP_HEADERS: - case NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE: - break; - default: - if (step <= pwctx->last_step) { - dd("step %d already completed, exit", step); - ngx_wasm_assert(rc == NGX_OK); - goto ret; - } - } - pwctx->step = step; +ngx_wavm_ptr_t +ngx_proxy_wasm_alloc(ngx_proxy_wasm_exec_t *pwexec, size_t size) +{ + ngx_wavm_ptr_t p; + ngx_int_t rc; + wasm_val_vec_t *rets; + ngx_proxy_wasm_filter_t *filter = pwexec->filter; + ngx_wavm_instance_t *instance = ngx_proxy_wasm_pwexec2instance(pwexec); - /* resume filters chain */ + rc = ngx_wavm_instance_call_funcref(instance, + filter->proxy_on_memory_allocate, + &rets, size); + if (rc != NGX_OK) { + ngx_proxy_wasm_log_error(NGX_LOG_CRIT, pwexec->log, 0, + "proxy_wasm_alloc(%uz) failed", size); + return 0; + } - pwexecs = (ngx_proxy_wasm_exec_t *) pwctx->pwexecs.elts; + p = rets->data[0].of.i32; - for (i = pwctx->exec_index; i < pwctx->pwexecs.nelts; i++) { - dd("exec_index: %ld", i); + ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwexec->log, 0, + "proxy_wasm_alloc: %uz:%uz:%uz", + ngx_wavm_memory_data_size(instance->memory), p, size); - pwexec = &pwexecs[i]; - filter = pwexec->filter; - ictx = pwexec->ictx; + return p; +} - ngx_wasm_assert(pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID); - /* check for trap */ +static ngx_proxy_wasm_instance_t * +get_instance(ngx_proxy_wasm_filter_t *filter, + ngx_proxy_wasm_store_t *store, ngx_log_t *log) +{ + ngx_queue_t *q; + ngx_wavm_module_t *module = filter->module; + ngx_proxy_wasm_instance_t *ictx; + + dd("get instance in store: %p", store); - if (ictx->instance->trapped && !pwexec->ecode) { - pwexec->ecode = NGX_PROXY_WASM_ERR_INSTANCE_TRAPPED; + for (q = ngx_queue_head(&store->busy); + q != ngx_queue_sentinel(&store->busy); + q = ngx_queue_next(q)) + { + ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); + + if (ictx->instance->trapped) { + q = ngx_queue_next(&ictx->q); + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, + "\"%V\" filter invalidating trapped " + "instance (ictx: %p, store: %p)", + filter->name, ictx, store); + ngx_proxy_wasm_instance_invalidate(ictx); + continue; } - /* check for yielded state */ + if (ictx->module == module) { + dd("reuse busy instance"); + goto reuse; + } + } - rc = ngx_proxy_wasm_action2rc(pwctx, pwexec); - if (rc != NGX_OK) { - goto ret; + for (q = ngx_queue_head(&store->free); + q != ngx_queue_sentinel(&store->free); + q = ngx_queue_next(q)) + { + ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); + + ngx_wasm_assert(!ictx->instance->trapped); + + if (ictx->module == module) { + dd("reuse free instance, going to busy"); + ngx_queue_remove(&ictx->q); + ngx_queue_insert_tail(&store->busy, &ictx->q); + goto reuse; } + } - /* run step */ + dd("create instance in store: %p", store); - pwexec->ecode = ngx_proxy_wasm_run_step(pwexec, ictx, step); - dd("pwexec->ecode: %d, pwctx->action: %d (pwctx: %p)", - pwexec->ecode, pwctx->action, pwctx); - if (pwexec->ecode != NGX_PROXY_WASM_ERR_NONE) { - rc = filter->subsystem->ecode(pwexec->ecode); - goto ret; + ictx = ngx_pcalloc(store->pool, sizeof(ngx_proxy_wasm_instance_t)); + if (ictx == NULL) { + goto error; + } + + ictx->pool = store->pool; + ictx->log = log; + ictx->store = store; + ictx->module = module; + + ngx_rbtree_init(&ictx->root_ctxs, &ictx->sentinel_root_ctxs, + ngx_rbtree_insert_value); + + ngx_rbtree_init(&ictx->tree_ctxs, &ictx->sentinel_ctxs, + ngx_rbtree_insert_value); + + ictx->instance = ngx_wavm_instance_create(ictx->module, ictx->pool, + ictx->log, ictx); + if (ictx->instance == NULL) { + ngx_pfree(store->pool, ictx); + goto error; + } + + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, + "\"%V\" filter new instance (ictx: %p, store: %p)", + filter->name, ictx, store); + + ngx_queue_insert_tail(&store->busy, &ictx->q); + + goto done; + +reuse: + + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, + "\"%V\" filter reusing instance " + "(ictx: %p, store: %p)", + filter->name, ictx, store); + +done: + + return ictx; + +error: + + return NULL; +} + + +static ngx_proxy_wasm_err_e +ngx_proxy_wasm_create_context(ngx_proxy_wasm_filter_t *filter, + ngx_proxy_wasm_ctx_t *pwctx, ngx_uint_t id, ngx_proxy_wasm_exec_t *in, + ngx_proxy_wasm_exec_t **out) +{ + ngx_int_t rc; + ngx_log_t *log; + wasm_val_vec_t *rets; + ngx_proxy_wasm_instance_t *ictx; + ngx_proxy_wasm_store_t *store; + ngx_proxy_wasm_exec_t *rexec = NULL, *pwexec = NULL; + ngx_proxy_wasm_err_e ecode = NGX_PROXY_WASM_ERR_UNKNOWN; + + dd("enter (filter: \"%.*s\", id: %ld)", + (int) filter->name->len, filter->name->data, id); + + if (pwctx == NULL) { + /* root context */ + ngx_wasm_assert(id == 0); + log = filter->log; + store = filter->store; + + } else { + /* filter context */ + log = pwctx->log; + + switch (pwctx->isolation) { + case NGX_PROXY_WASM_ISOLATION_NONE: + store = filter->store; + break; + case NGX_PROXY_WASM_ISOLATION_STREAM: + store = &pwctx->store; + break; + case NGX_PROXY_WASM_ISOLATION_FILTER: + store = ngx_palloc(pwctx->pool, sizeof(ngx_proxy_wasm_store_t)); + if (store == NULL) { + goto error; + } + + ngx_proxy_wasm_store_init(store, pwctx->pool); + break; + default: + ngx_proxy_wasm_log_error(NGX_LOG_WASM_NYI, pwctx->log, 0, + "NYI - instance isolation: %d", + pwctx->isolation); + goto error; } + } - /* check for yield/done */ + /* get instance */ + + if (in && in->ictx && !in->ictx->instance->trapped) { + ictx = in->ictx; + + } else { + ictx = get_instance(filter, store, log); + if (ictx == NULL) { + goto error; + } + } + + /* create root context */ + + rexec = ngx_proxy_wasm_lookup_root_ctx(ictx, filter->id); + dd("rexec for id %ld: %p", filter->id, rexec); + if (rexec == NULL) { + rexec = ngx_pcalloc(ictx->pool, sizeof(ngx_proxy_wasm_exec_t)); + if (rexec == NULL) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } + + rexec->root_id = NGX_PROXY_WASM_ROOT_CTX_ID; + rexec->id = filter->id; + rexec->pool = ictx->pool; + rexec->log = filter->log; + rexec->filter = filter; + rexec->ictx = ictx; + + log = filter->log; + + rexec->log = ngx_pcalloc(rexec->pool, sizeof(ngx_log_t)); + if (rexec->log == NULL) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } - rc = ngx_proxy_wasm_action2rc(pwctx, pwexec); + rexec->log->file = log->file; + rexec->log->next = log->next; + rexec->log->writer = log->writer; + rexec->log->wdata = log->wdata; + rexec->log->log_level = log->log_level; + rexec->log->handler = ngx_proxy_wasm_log_error_handler; + rexec->log->data = &rexec->log_ctx; + + rexec->log_ctx.pwexec = rexec; + rexec->log_ctx.orig_log = log; + + rexec->parent = ngx_pcalloc(rexec->pool, sizeof(ngx_proxy_wasm_ctx_t)); + if (rexec->parent == NULL) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } + + rexec->parent->id = NGX_PROXY_WASM_ROOT_CTX_ID; + rexec->parent->pool = rexec->pool; + rexec->parent->log = rexec->log; + rexec->parent->isolation = NGX_PROXY_WASM_ISOLATION_STREAM; + + rexec->node.key = rexec->id; + ngx_rbtree_insert(&ictx->root_ctxs, &rexec->node); + } + + /* start root context */ + + if (!rexec->started) { + dd("start root exec ctx (rexec: %p, root_id: %ld, id: %ld, ictx: %p)", + rexec, rexec->root_id, rexec->id, ictx); + + ngx_proxy_wasm_instance_update(ictx, rexec); + + rc = ngx_wavm_instance_call_funcref(ictx->instance, + filter->proxy_on_context_create, + NULL, + rexec->id, rexec->root_id); if (rc != NGX_OK) { - if (rc == NGX_AGAIN - && pwctx->exec_index + 1 <= pwctx->nfilters) - { - dd("yield: resume on next filter " - "(idx: %ld -> %ld, nelts: %ld)", - pwctx->exec_index, pwctx->exec_index + 1, - pwctx->pwexecs.nelts); + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } - pwctx->exec_index++; + if (id == NGX_PROXY_WASM_ROOT_CTX_ID) { + rc = ngx_wavm_instance_call_funcref(ictx->instance, + filter->proxy_on_vm_start, + &rets, + rexec->id, rexec->root_id); + if (rc != NGX_OK || !rets->data[0].of.i32) { + ecode = NGX_PROXY_WASM_ERR_VM_START_FAILED; + goto error; } - - goto ret; } - pwctx->exec_index++; + rc = ngx_wavm_instance_call_funcref(ictx->instance, + filter->proxy_on_plugin_start, + &rets, + rexec->id, filter->config.len); + if (rc != NGX_OK || !rets->data[0].of.i32) { + ecode = NGX_PROXY_WASM_ERR_CONFIGURE_FAILED; + goto error; + } - dd("-------- next filter --------"); + rexec->started = 1; } - ngx_wasm_assert(rc == NGX_OK); + /* start filter context */ - /* next step */ + if (id) { + pwexec = ngx_proxy_wasm_lookup_ctx(ictx, id); - pwctx->last_step = pwctx->step; - pwctx->exec_index = 0; + dd("pwexec for id %ld: %p", id, pwexec); -ret: + if (pwexec == NULL) { - if (step == NGX_PROXY_WASM_STEP_DONE) { - ngx_proxy_wasm_ctx_destroy(pwctx); + if (in == NULL) { + pwexec = ngx_array_push(&pwctx->pwexecs); + if (pwexec == NULL) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } + + ngx_memzero(pwexec, sizeof(ngx_proxy_wasm_exec_t)); + + pwexec->id = id; + pwexec->root_id = filter->id; + pwexec->index = pwctx->pwexecs.nelts - 1; + pwexec->pool = ictx->store->pool; + pwexec->filter = filter; + pwexec->parent = pwctx; + pwexec->ictx = ictx; + pwexec->store = ictx->store; + + log = pwctx->log; + + pwexec->log = ngx_pcalloc(pwexec->pool, sizeof(ngx_log_t)); + if (pwexec->log == NULL) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } + + pwexec->log->file = log->file; + pwexec->log->next = log->next; + pwexec->log->writer = log->writer; + pwexec->log->wdata = log->wdata; + pwexec->log->log_level = log->log_level; + pwexec->log->connection = log->connection; + pwexec->log->handler = ngx_proxy_wasm_log_error_handler; + pwexec->log->data = &pwexec->log_ctx; + + pwexec->log_ctx.pwexec = pwexec; + pwexec->log_ctx.orig_log = log; + + } else { + if (in->ictx != ictx) { + dd("replace pwexec instance"); + + in->ictx = ictx; + in->started = 0; + } + + pwexec = in; + } + } +#if (NGX_DEBUG) + else { + ngx_wasm_assert(pwexec->id == id); + dd("pwexec #%ld found in instance %p", pwexec->id, ictx); + } +#endif + + if (!pwexec->started) { + dd("start exec ctx (pwexec: %p, id: %ld, ictx: %p)", + pwexec, id, ictx); + + ngx_proxy_wasm_instance_update(ictx, pwexec); + + rc = ngx_wavm_instance_call_funcref(ictx->instance, + filter->proxy_on_context_create, + NULL, + id, filter->id); + if (rc != NGX_OK) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } + + pwexec->node.key = pwexec->id; + ngx_rbtree_insert(&ictx->tree_ctxs, &pwexec->node); + + pwexec->started = 1; + } } - dd("exit rc: %ld", rc); + if (out) { + *out = pwexec; + } - return rc; + return NGX_PROXY_WASM_ERR_NONE; + +error: + + if (ecode != NGX_PROXY_WASM_ERR_NONE) { + if (pwexec) { + pwexec->ecode = ecode; + + } else { + filter->ecode = ecode; + } + } + + return ecode; } -ngx_wavm_ptr_t -ngx_proxy_wasm_alloc(ngx_proxy_wasm_exec_t *pwexec, size_t size) +static void +ngx_proxy_wasm_on_log(ngx_proxy_wasm_exec_t *pwexec) { - ngx_wavm_ptr_t p; - ngx_int_t rc; - wasm_val_vec_t *rets; ngx_proxy_wasm_filter_t *filter = pwexec->filter; ngx_wavm_instance_t *instance = ngx_proxy_wasm_pwexec2instance(pwexec); - rc = ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_memory_allocate, - &rets, size); - if (rc != NGX_OK) { - ngx_proxy_wasm_log_error(NGX_LOG_CRIT, pwexec->log, 0, - "proxy_wasm_alloc(%uz) failed", size); - return 0; + if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { + /* 0.1.0 - 0.2.1 */ + (void) ngx_wavm_instance_call_funcref(instance, filter->proxy_on_done, + NULL, pwexec->id); } - p = rets->data[0].of.i32; + (void) ngx_wavm_instance_call_funcref(instance, filter->proxy_on_log, + NULL, pwexec->id); +} - ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwexec->log, 0, - "proxy_wasm_alloc: %uz:%uz:%uz", - ngx_wavm_memory_data_size(instance->memory), p, size); - return p; +static void +ngx_proxy_wasm_on_done(ngx_proxy_wasm_exec_t *pwexec) +{ + ngx_wavm_instance_t *instance; + ngx_proxy_wasm_filter_t *filter = pwexec->filter; +#ifdef NGX_WASM_HTTP + ngx_http_proxy_wasm_dispatch_t *call; +#endif + + instance = ngx_proxy_wasm_pwexec2instance(pwexec); + + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwexec->log, 0, + "filter %l/%l finalizing context", + pwexec->index + 1, pwexec->parent->nfilters); + +#ifdef NGX_WASM_HTTP + call = pwexec->call; + if (call) { + ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwexec->log, 0, + "proxy_wasm \"%V\" filter (%l/%l) " + "cancelling HTTP dispatch", + pwexec->filter->name, pwexec->index + 1, + pwexec->parent->nfilters); + + ngx_http_proxy_wasm_dispatch_destroy(call); + + pwexec->call = NULL; + } +#endif + + (void) ngx_wavm_instance_call_funcref(instance, + filter->proxy_on_context_finalize, + NULL, pwexec->id); + + if (pwexec->node.key) { + ngx_rbtree_delete(&pwexec->ictx->tree_ctxs, &pwexec->node); + } } -void -ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store) +static ngx_int_t +ngx_proxy_wasm_on_tick(ngx_proxy_wasm_exec_t *pwexec) { - ngx_queue_t *q; - ngx_proxy_wasm_instance_t *ictx; + ngx_int_t rc; + wasm_val_vec_t args; + ngx_proxy_wasm_filter_t *filter = pwexec->filter; - dd("enter"); + ngx_wasm_assert(pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID); - while (!ngx_queue_empty(&store->busy)) { - dd("remove busy"); - q = ngx_queue_head(&store->busy); - ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); + wasm_val_vec_new_uninitialized(&args, 1); + ngx_wasm_vec_set_i32(&args, 0, pwexec->id); - ngx_queue_remove(&ictx->q); - ngx_queue_insert_tail(&store->free, &ictx->q); + rc = ngx_wavm_instance_call_funcref_vec(pwexec->ictx->instance, + filter->proxy_on_timer_ready, + NULL, &args); + + wasm_val_vec_delete(&args); + + return rc; +} + + +/* utils */ + + +static ngx_proxy_wasm_filter_t * +ngx_proxy_wasm_lookup_filter(ngx_uint_t id) +{ + ngx_rbtree_t *rbtree; + ngx_rbtree_node_t *node, *sentinel; + ngx_proxy_wasm_filter_t *filter; + + rbtree = &ngx_proxy_wasm_filters_rbtree; + node = rbtree->root; + sentinel = rbtree->sentinel; + + while (node != sentinel) { + + if (id != node->key) { + node = (id < node->key) ? node->left : node->right; + continue; + } + + filter = ngx_rbtree_data(node, ngx_proxy_wasm_filter_t, node); + + return filter; } - while (!ngx_queue_empty(&store->free)) { - q = ngx_queue_head(&store->free); - ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); + return NULL; +} - dd("remove free"); - ngx_proxy_wasm_release_instance(ictx, 1); + +static ngx_proxy_wasm_exec_t * +ngx_proxy_wasm_lookup_root_ctx(ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id) +{ + ngx_rbtree_t *rbtree; + ngx_rbtree_node_t *node, *sentinel; + ngx_proxy_wasm_exec_t *pwexec; + + rbtree = &ictx->root_ctxs; + node = rbtree->root; + sentinel = rbtree->sentinel; + + while (node != sentinel) { + + if (id != node->key) { + node = (id < node->key) ? node->left : node->right; + continue; + } + + pwexec = ngx_rbtree_data(node, ngx_proxy_wasm_exec_t, node); + + return pwexec; } - dd("exit"); + return NULL; } -/* static */ +static ngx_proxy_wasm_exec_t * +ngx_proxy_wasm_lookup_ctx(ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id) +{ + ngx_rbtree_t *rbtree; + ngx_rbtree_node_t *node, *sentinel; + ngx_proxy_wasm_exec_t *pwexec; + + rbtree = &ictx->tree_ctxs; + node = rbtree->root; + sentinel = rbtree->sentinel; + + while (node != sentinel) { + + if (id != node->key) { + node = (id < node->key) ? node->left : node->right; + continue; + } + + pwexec = ngx_rbtree_data(node, ngx_proxy_wasm_exec_t, node); + + return pwexec; + } + + return NULL; +} -static ngx_inline ngx_wavm_funcref_t * -ngx_proxy_wasm_lookup_func(ngx_proxy_wasm_filter_t *filter, const char *n) +static ngx_wavm_funcref_t * +get_func(ngx_proxy_wasm_filter_t *filter, const char *n) { ngx_str_t name; @@ -887,7 +1315,7 @@ ngx_proxy_wasm_lookup_func(ngx_proxy_wasm_filter_t *filter, const char *n) static ngx_proxy_wasm_abi_version_e -ngx_proxy_wasm_abi_version(ngx_proxy_wasm_filter_t *filter) +get_abi_version(ngx_proxy_wasm_filter_t *filter) { size_t i; ngx_wavm_module_t *module = filter->module; @@ -930,14 +1358,14 @@ ngx_proxy_wasm_abi_version(ngx_proxy_wasm_filter_t *filter) static ngx_int_t -ngx_proxy_wasm_init_abi(ngx_proxy_wasm_filter_t *filter) +ngx_proxy_wasm_filter_init_abi(ngx_proxy_wasm_filter_t *filter) { ngx_log_debug4(NGX_LOG_DEBUG_WASM, filter->log, 0, "proxy_wasm initializing \"%V\" filter " "(config size: %d, filter: %p, filter->id: %ui)", filter->name, filter->config.len, filter, filter->id); - filter->abi_version = ngx_proxy_wasm_abi_version(filter); + filter->abi_version = get_abi_version(filter); switch (filter->abi_version) { case NGX_PROXY_WASM_0_1_0: @@ -961,12 +1389,11 @@ ngx_proxy_wasm_init_abi(ngx_proxy_wasm_filter_t *filter) /* malloc */ - filter->proxy_on_memory_allocate = - ngx_proxy_wasm_lookup_func(filter, "malloc"); + filter->proxy_on_memory_allocate = get_func(filter, "malloc"); if (filter->proxy_on_memory_allocate == NULL) { filter->proxy_on_memory_allocate = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_memory_allocate"); + get_func(filter, "proxy_on_memory_allocate"); if (filter->proxy_on_memory_allocate == NULL) { filter->ecode = NGX_PROXY_WASM_ERR_BAD_MODULE_INTERFACE; @@ -981,408 +1408,252 @@ ngx_proxy_wasm_init_abi(ngx_proxy_wasm_filter_t *filter) /* context */ filter->proxy_on_context_create = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_context_create"); + get_func(filter, "proxy_on_context_create"); filter->proxy_on_context_finalize = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_context_finalize"); + get_func(filter, "proxy_on_context_finalize"); if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { /* 0.1.0 - 0.2.1 */ filter->proxy_on_done = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_done"); + get_func(filter, "proxy_on_done"); filter->proxy_on_log = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_log"); + get_func(filter, "proxy_on_log"); filter->proxy_on_context_finalize = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_delete"); + get_func(filter, "proxy_on_delete"); } /* configuration */ filter->proxy_on_vm_start = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_vm_start"); + get_func(filter, "proxy_on_vm_start"); filter->proxy_on_plugin_start = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_plugin_start"); + get_func(filter, "proxy_on_plugin_start"); if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { /* 0.1.0 - 0.2.1 */ filter->proxy_on_plugin_start = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_configure"); + get_func(filter, "proxy_on_configure"); } /* stream */ filter->proxy_on_new_connection = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_new_connection"); + get_func(filter, "proxy_on_new_connection"); filter->proxy_on_downstream_data = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_downstream_data"); + get_func(filter, "proxy_on_downstream_data"); filter->proxy_on_upstream_data = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_upstream_data"); + get_func(filter, "proxy_on_upstream_data"); filter->proxy_on_downstream_close = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_downstream_close"); + get_func(filter, "proxy_on_downstream_close"); filter->proxy_on_upstream_close = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_upstream_close"); + get_func(filter, "proxy_on_upstream_close"); if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { /* 0.1.0 - 0.2.1 */ filter->proxy_on_downstream_close = - ngx_proxy_wasm_lookup_func( - filter, "proxy_on_downstream_connection_close"); + get_func(filter, "proxy_on_downstream_connection_close"); filter->proxy_on_upstream_close = - ngx_proxy_wasm_lookup_func( - filter, "proxy_on_upstream_connection_close"); + get_func(filter, "proxy_on_upstream_connection_close"); } /* http */ filter->proxy_on_http_request_headers = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_request_headers"); + get_func(filter, "proxy_on_http_request_headers"); filter->proxy_on_http_request_body = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_request_body"); + get_func(filter, "proxy_on_http_request_body"); filter->proxy_on_http_request_trailers = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_request_trailers"); + get_func(filter, "proxy_on_http_request_trailers"); filter->proxy_on_http_request_metadata = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_request_metadata"); + get_func(filter, "proxy_on_http_request_metadata"); filter->proxy_on_http_response_headers = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_response_headers"); + get_func(filter, "proxy_on_http_response_headers"); filter->proxy_on_http_response_body = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_response_body"); + get_func(filter, "proxy_on_http_response_body"); filter->proxy_on_http_response_trailers = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_response_trailers"); + get_func(filter, "proxy_on_http_response_trailers"); filter->proxy_on_http_response_metadata = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_response_metadata"); + get_func(filter, "proxy_on_http_response_metadata"); if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { /* 0.1.0 - 0.2.1 */ filter->proxy_on_http_request_headers = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_request_headers"); + get_func(filter, "proxy_on_request_headers"); filter->proxy_on_http_request_body = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_request_body"); + get_func(filter, "proxy_on_request_body"); filter->proxy_on_http_request_trailers = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_request_trailers"); + get_func(filter, "proxy_on_request_trailers"); filter->proxy_on_http_request_metadata = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_request_metadata"); + get_func(filter, "proxy_on_request_metadata"); filter->proxy_on_http_response_headers = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_response_headers"); + get_func(filter, "proxy_on_response_headers"); filter->proxy_on_http_response_body = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_response_body"); + get_func(filter, "proxy_on_response_body"); filter->proxy_on_http_response_trailers = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_response_trailers"); + get_func(filter, "proxy_on_response_trailers"); filter->proxy_on_http_response_metadata = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_response_metadata"); + get_func(filter, "proxy_on_response_metadata"); } /* shared queue */ filter->proxy_on_queue_ready = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_queue_ready"); + get_func(filter, "proxy_on_queue_ready"); /* timers */ filter->proxy_create_timer = - ngx_proxy_wasm_lookup_func(filter, "proxy_create_timer"); + get_func(filter, "proxy_create_timer"); filter->proxy_delete_timer = - ngx_proxy_wasm_lookup_func(filter, "proxy_delete_timer"); + get_func(filter, "proxy_delete_timer"); filter->proxy_on_timer_ready = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_timer_ready"); + get_func(filter, "proxy_on_timer_ready"); if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { /* 0.1.0 - 0.2.1 */ filter->proxy_on_timer_ready = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_tick"); + get_func(filter, "proxy_on_tick"); } /* http callouts */ filter->proxy_on_http_call_response = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_http_call_response"); + get_func(filter, "proxy_on_http_call_response"); /* grpc callouts */ filter->proxy_on_grpc_call_response_header_metadata = - ngx_proxy_wasm_lookup_func( - filter, "proxy_on_grpc_call_response_header_metadata"); + get_func(filter, "proxy_on_grpc_call_response_header_metadata"); filter->proxy_on_grpc_call_response_message = - ngx_proxy_wasm_lookup_func( - filter, "proxy_on_grpc_call_response_message"); + get_func(filter, "proxy_on_grpc_call_response_message"); filter->proxy_on_grpc_call_response_trailer_metadata = - ngx_proxy_wasm_lookup_func( - filter, "proxy_on_grpc_call_response_trailer_metadata"); + get_func(filter, "proxy_on_grpc_call_response_trailer_metadata"); filter->proxy_on_grpc_call_close = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_grpc_call_close"); + get_func(filter, "proxy_on_grpc_call_close"); /* custom extensions */ - filter->proxy_on_custom_callback = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_custom_callback"); - - if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { - /* 0.2.0 - 0.2.1 */ - filter->proxy_on_custom_callback = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_foreign_function"); - } - - /* validate */ - - if (filter->proxy_on_context_create == NULL - || filter->proxy_on_vm_start == NULL - || filter->proxy_on_plugin_start == NULL) - { - filter->ecode = NGX_PROXY_WASM_ERR_BAD_MODULE_INTERFACE; - - ngx_proxy_wasm_log_error(NGX_LOG_EMERG, filter->log, filter->ecode, - "\"%V\" filter missing one of: " - "on_context_create, on_vm_start, " - "on_plugin_start", filter->name); - return NGX_ERROR; - } - - return NGX_OK; -} - - -static ngx_int_t -ngx_proxy_wasm_start_filter(ngx_proxy_wasm_filter_t *filter) -{ - ngx_proxy_wasm_instance_t *ictx; - - ngx_wasm_assert(filter->loaded); - - if (filter->ecode) { - return NGX_ERROR; - } - - if (filter->started) { - return NGX_OK; - } - - ictx = ngx_proxy_wasm_get_instance(filter, filter->store, NULL, - filter->log); - if (ictx == NULL) { - return NGX_ERROR; - } - - filter->started = 1; - - return NGX_OK; -} - - -ngx_proxy_wasm_instance_t * -ngx_proxy_wasm_get_instance(ngx_proxy_wasm_filter_t *filter, - ngx_proxy_wasm_store_t *store, ngx_proxy_wasm_exec_t *pwexec, - ngx_log_t *log) -{ - ngx_queue_t *q; - ngx_pool_t *pool; - ngx_wavm_module_t *module = filter->module; - ngx_proxy_wasm_instance_t *ictx; - ngx_proxy_wasm_err_e ecode; - - dd("enter (pwexec: %p)", pwexec); - - if (store == NULL) { - dd("no store, jump to create"); - pool = filter->pool; - goto create; - } - - pool = store->pool; - - dd("lookup instance in store: %p", store); - - for (q = ngx_queue_head(&store->busy); - q != ngx_queue_sentinel(&store->busy); - q = ngx_queue_next(q)) - { - ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - - if (ictx->instance->trapped) { - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, - "\"%V\" filter freeing trapped instance " - "(ictx: %p, store: %p)", - filter->name, ictx, store); - q = ngx_queue_next(&ictx->q); - ngx_proxy_wasm_release_instance(ictx, 1); - continue; - } - - if (ictx->module == filter->module) { - dd("reuse busy instance"); - ngx_wasm_assert(ictx->nrefs); - goto reuse; - } - } - - for (q = ngx_queue_head(&store->free); - q != ngx_queue_sentinel(&store->free); - q = ngx_queue_next(q)) - { - ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - - if (ictx->instance->trapped) { - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, - "\"%V\" filter freeing trapped instance " - "(ictx: %p, store: %p)", - filter->name, ictx, store); - - q = ngx_queue_next(&ictx->q); - ngx_proxy_wasm_release_instance(ictx, 1); - continue; - } - - if (ictx->module == filter->module) { - dd("reuse free instance"); - ngx_wasm_assert(ictx->nrefs == 0); - ngx_queue_remove(&ictx->q); - goto reuse; - } - } - -create: - - dd("create instance in store: %p", store); + filter->proxy_on_custom_callback = + get_func(filter, "proxy_on_custom_callback"); - ictx = ngx_pcalloc(pool, sizeof(ngx_proxy_wasm_instance_t)); - if (ictx == NULL) { - goto error; + if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { + /* 0.2.0 - 0.2.1 */ + filter->proxy_on_custom_callback = + get_func(filter, "proxy_on_foreign_function"); } - ictx->next_id = 1; - ictx->pool = pool; - ictx->log = log; - ictx->store = store; - ictx->module = module; - - ngx_rbtree_init(&ictx->root_ctxs, &ictx->sentinel_root_ctxs, - ngx_rbtree_insert_value); + /* validate */ - ngx_rbtree_init(&ictx->tree_ctxs, &ictx->sentinel_ctxs, - ngx_rbtree_insert_value); + if (filter->proxy_on_context_create == NULL + || filter->proxy_on_vm_start == NULL + || filter->proxy_on_plugin_start == NULL) + { + filter->ecode = NGX_PROXY_WASM_ERR_BAD_MODULE_INTERFACE; - ictx->instance = ngx_wavm_instance_create(module, pool, log, ictx); - if (ictx->instance == NULL) { - ngx_pfree(pool, ictx); - goto error; + ngx_proxy_wasm_log_error(NGX_LOG_EMERG, filter->log, filter->ecode, + "\"%V\" filter missing one of: " + "on_context_create, on_vm_start, " + "on_plugin_start", filter->name); + return NGX_ERROR; } - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, - "\"%V\" filter new instance (ictx: %p, store: %p)", - filter->name, ictx, store); + return NGX_OK; +} - goto done; -reuse: +static ngx_int_t +ngx_proxy_wasm_filter_start(ngx_proxy_wasm_filter_t *filter) +{ + ngx_proxy_wasm_err_e ecode; - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, - "\"%V\" filter reusing instance " - "(ictx: %p, nrefs: %d, store: %p)", - filter->name, ictx, ictx->nrefs + 1, store); + ngx_wasm_assert(filter->loaded); - goto done; + if (filter->ecode) { + return NGX_ERROR; + } -done: + if (filter->started) { + return NGX_OK; + } - if (store && !ictx->nrefs) { - ngx_queue_insert_tail(&store->busy, &ictx->q); + ecode = ngx_proxy_wasm_create_context(filter, NULL, 0, NULL, NULL); + if (ecode != NGX_PROXY_WASM_ERR_NONE) { + return NGX_ERROR; } - if (pwexec) { - if (pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID) { - ngx_wasm_assert(pwexec->id == 0); - pwexec->id = ictx->next_id++; - } + filter->started = 1; - if (pwexec->ecode) { - /* recycled instance */ - pwexec->ecode = NGX_PROXY_WASM_ERR_NONE; - pwexec->ecode_logged = 0; - } - } + return NGX_OK; +} - ictx->nrefs++; - ictx->pwexec = pwexec; +static void +ngx_proxy_wasm_instance_update(ngx_proxy_wasm_instance_t *ictx, + ngx_proxy_wasm_exec_t *pwexec) +{ /** - * update instance log + * Update instance * FFI-injected filters have a valid log while the instance's * might be outdated. */ - ngx_wavm_instance_set_data(ictx->instance, ictx, log); + ictx->pwexec = pwexec; - /* create wasm context (root/http) */ + if (pwexec) { + ngx_wavm_instance_set_data(ictx->instance, ictx, pwexec->log); + } +} - ecode = ngx_proxy_wasm_on_start(ictx, filter, pwexec == NULL); - if (ecode != NGX_PROXY_WASM_ERR_NONE) { - if (pwexec) { - pwexec->ecode = ecode; - } else { - filter->ecode = ecode; - } +static void +ngx_proxy_wasm_instance_invalidate(ngx_proxy_wasm_instance_t *ictx) +{ + ngx_rbtree_node_t **root, **sentinel, *s, *n; + ngx_proxy_wasm_exec_t *pwexec = NULL; - goto error; - } + dd("enter (ictx: %p)", ictx); - return ictx; + /* root context */ -error: + root = &ictx->root_ctxs.root; + s = &ictx->sentinel_root_ctxs; + sentinel = &s; - return NULL; -} + while (*root != *sentinel) { + n = ngx_rbtree_min(*root, *sentinel); + pwexec = ngx_rbtree_data(n, ngx_proxy_wasm_exec_t, node); + dd("invalidate root ctx #%ld instance", pwexec->id); -void -ngx_proxy_wasm_release_instance(ngx_proxy_wasm_instance_t *ictx, - unsigned sweep) -{ - ngx_queue_t *q; + ngx_rbtree_delete(&ictx->root_ctxs, n); - if (sweep) { - ictx->nrefs = 0; + destroy_pwexec(pwexec); - } else if (ictx->nrefs) { - ictx->nrefs--; + ngx_pfree(pwexec->pool, pwexec); } - dd("ictx: %p (nrefs: %ld, sweep: %d, trapped: %d)", - ictx, ictx->nrefs, sweep, ictx->instance->trapped); - - if (ictx->nrefs == 0) { - if (ictx->store) { - dd("remove from busy"); - ngx_queue_remove(&ictx->q); /* remove from busy/free */ + /* stream context */ - if (sweep || ictx->instance->trapped) { - dd("insert in sweep"); - ngx_queue_insert_tail(&ictx->store->sweep, &ictx->q); + root = &ictx->tree_ctxs.root; + s = &ictx->sentinel_ctxs; + sentinel = &s; - } else { - dd("insert in free"); - ngx_queue_insert_tail(&ictx->store->free, &ictx->q); - } + while (*root != *sentinel) { + n = ngx_rbtree_min(*root, *sentinel); + pwexec = ngx_rbtree_data(n, ngx_proxy_wasm_exec_t, node); - while (!ngx_queue_empty(&ictx->store->sweep)) { - dd("sweep (destroy)"); - q = ngx_queue_head(&ictx->store->sweep); - ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); + dd("invalidate ctx #%ld instance", pwexec->id); - ngx_queue_remove(&ictx->q); - ngx_proxy_wasm_instance_destroy(ictx); - } + ngx_rbtree_delete(&ictx->tree_ctxs, n); - } else { - dd("destroy"); - ngx_proxy_wasm_instance_destroy(ictx); - } + destroy_pwexec(pwexec); } + /* remove from busy/free, schedule for sweeping */ + + ngx_queue_remove(&ictx->q); + + ngx_queue_insert_tail(&ictx->store->sweep, &ictx->q); + dd("exit"); } @@ -1435,198 +1706,80 @@ ngx_proxy_wasm_instance_destroy(ngx_proxy_wasm_instance_t *ictx) } -static ngx_proxy_wasm_err_e -ngx_proxy_wasm_on_start(ngx_proxy_wasm_instance_t *ictx, - ngx_proxy_wasm_filter_t *filter, unsigned start) +static void +ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store) { - ngx_int_t rc; - ngx_log_t *log; - ngx_proxy_wasm_exec_t *rexec, *pwexec = ictx->pwexec; - ngx_wavm_instance_t *instance = ictx->instance; - wasm_val_vec_t *rets; - - dd("enter (pwexec: %p, ictx: %p)", pwexec, ictx); - - rexec = ngx_proxy_wasm_root_lookup(ictx, filter->id); - if (rexec == NULL) { - rexec = ngx_pcalloc(ictx->pool, sizeof(ngx_proxy_wasm_exec_t)); - if (rexec == NULL) { - return NGX_PROXY_WASM_ERR_START_FAILED; - } - - rexec->root_id = NGX_PROXY_WASM_ROOT_CTX_ID; - rexec->id = filter->id; - rexec->pool = ictx->pool; - rexec->log = filter->log; - rexec->filter = filter; - rexec->ictx = ictx; - - log = filter->log; - - rexec->log = ngx_pcalloc(rexec->pool, sizeof(ngx_log_t)); - if (rexec->log == NULL) { - return NGX_PROXY_WASM_ERR_START_FAILED; - } - - rexec->log->file = log->file; - rexec->log->next = log->next; - rexec->log->writer = log->writer; - rexec->log->wdata = log->wdata; - rexec->log->log_level = log->log_level; - rexec->log->handler = ngx_proxy_wasm_log_error_handler; - rexec->log->data = &rexec->log_ctx; - - rexec->log_ctx.pwexec = rexec; - rexec->log_ctx.orig_log = log; + ngx_queue_t *q; + ngx_proxy_wasm_instance_t *ictx; - rexec->parent = ngx_pcalloc(rexec->pool, sizeof(ngx_proxy_wasm_ctx_t)); - if (rexec->parent == NULL) { - return NGX_PROXY_WASM_ERR_START_FAILED; - } + dd("enter"); - rexec->parent->id = NGX_PROXY_WASM_ROOT_CTX_ID; - rexec->parent->pool = rexec->pool; - rexec->parent->log = rexec->log; + while (!ngx_queue_empty(&store->busy)) { + dd("remove busy"); + q = ngx_queue_head(&store->busy); + ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - rexec->node.key = rexec->id; - ngx_rbtree_insert(&ictx->root_ctxs, &rexec->node); + ngx_queue_remove(&ictx->q); + ngx_queue_insert_tail(&store->free, &ictx->q); } - ictx->pwexec = rexec; /* set instance current pwexec */ - - if (!rexec->started) { - dd("start root exec ctx (rexec: %p, root_id: %ld, id: %ld, ictx: %p)", - rexec, rexec->root_id, rexec->id, ictx); - - rc = ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_context_create, - NULL, - rexec->id, rexec->root_id); - if (rc != NGX_OK) { - return NGX_PROXY_WASM_ERR_START_FAILED; - } - - if (start) { - rc = ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_vm_start, - &rets, - rexec->id, 0); - if (rc != NGX_OK || !rets->data[0].of.i32) { - return NGX_PROXY_WASM_ERR_VM_START_FAILED; - } - } - - rc = ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_plugin_start, - &rets, - rexec->id, filter->config.len); - if (rc != NGX_OK || !rets->data[0].of.i32) { - return NGX_PROXY_WASM_ERR_CONFIGURE_FAILED; - } + while (!ngx_queue_empty(&store->free)) { + dd("remove free"); + q = ngx_queue_head(&store->free); + ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - rexec->started = 1; + ngx_queue_remove(&ictx->q); + ngx_queue_insert_tail(&store->sweep, &ictx->q); } - ictx->pwexec = pwexec; /* set instance current pwexec */ - - if (pwexec && pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID - && ngx_proxy_wasm_ctx_lookup(ictx, pwexec->id) == NULL) - { - dd("start exec ctx (pwexec: %p, root_id: %ld, id: %ld, ictx: %p)", - pwexec, pwexec->root_id, pwexec->id, ictx); - - rc = ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_context_create, - NULL, - pwexec->id, pwexec->root_id); - if (rc != NGX_OK) { - return NGX_PROXY_WASM_ERR_START_FAILED; - } - - pwexec->node.key = pwexec->id; - ngx_rbtree_insert(&ictx->tree_ctxs, &pwexec->node); - } + ngx_proxy_wasm_store_sweep(store); - return NGX_PROXY_WASM_ERR_NONE; + dd("exit"); } static void -ngx_proxy_wasm_on_log(ngx_proxy_wasm_exec_t *pwexec) +ngx_proxy_wasm_store_sweep(ngx_proxy_wasm_store_t *store) { - ngx_proxy_wasm_filter_t *filter = pwexec->filter; - ngx_wavm_instance_t *instance = ngx_proxy_wasm_pwexec2instance(pwexec); + ngx_queue_t *q; + ngx_proxy_wasm_instance_t *ictx; - if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { - /* 0.1.0 - 0.2.1 */ - (void) ngx_wavm_instance_call_funcref(instance, filter->proxy_on_done, - NULL, pwexec->id); - } + while (!ngx_queue_empty(&store->sweep)) { + q = ngx_queue_head(&store->sweep); + ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - (void) ngx_wavm_instance_call_funcref(instance, filter->proxy_on_log, - NULL, pwexec->id); + dd("sweep (ictx: %p)", ictx); + + ngx_queue_remove(&ictx->q); + ngx_proxy_wasm_instance_destroy(ictx); + } } +#if 0 static void -ngx_proxy_wasm_on_done(ngx_proxy_wasm_exec_t *pwexec) +ngx_proxy_wasm_store_schedule_sweep_handler(ngx_event_t *ev) { - ngx_wavm_instance_t *instance; - ngx_proxy_wasm_filter_t *filter = pwexec->filter; -#if 1 -#ifdef NGX_WASM_HTTP - ngx_http_proxy_wasm_dispatch_t *call; -#endif -#endif - - instance = ngx_proxy_wasm_pwexec2instance(pwexec); - - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwexec->log, 0, - "filter %l/%l finalizing context", - pwexec->index + 1, pwexec->parent->nfilters); - -#if 1 -#ifdef NGX_WASM_HTTP - call = pwexec->call; - if (call) { - ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwexec->log, 0, - "proxy_wasm \"%V\" filter (%l/%l) " - "cancelling HTTP dispatch", - pwexec->filter->name, pwexec->index + 1, - pwexec->parent->nfilters); - - ngx_http_proxy_wasm_dispatch_destroy(call); - - pwexec->call = NULL; - } -#endif -#endif - - (void) ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_context_finalize, - NULL, pwexec->id); + ngx_proxy_wasm_store_t *store = ev->data; - ngx_rbtree_delete(&pwexec->ictx->tree_ctxs, &pwexec->node); + ngx_proxy_wasm_store_sweep(store); } -static ngx_int_t -ngx_proxy_wasm_on_tick(ngx_proxy_wasm_exec_t *pwexec) +static void +ngx_proxy_wasm_store_schedule_sweep(ngx_proxy_wasm_store_t *store) { - ngx_int_t rc; - wasm_val_vec_t args; - ngx_proxy_wasm_filter_t *filter = pwexec->filter; - - ngx_wasm_assert(pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID); - - wasm_val_vec_new_uninitialized(&args, 1); - ngx_wasm_vec_set_i32(&args, 0, pwexec->id); + ngx_event_t *ev; - rc = ngx_wavm_instance_call_funcref_vec(pwexec->ictx->instance, - filter->proxy_on_timer_ready, - NULL, &args); + ev = ngx_calloc(sizeof(ngx_event_t), store->pool->log); + if (ev == NULL) { + return; + } - wasm_val_vec_delete(&args); + ev->handler = ngx_proxy_wasm_store_schedule_sweep_handler; + ev->data = store; + ev->log = store->pool->log; - return rc; + ngx_post_event(ev, &ngx_posted_events); } +#endif diff --git a/src/common/proxy_wasm/ngx_proxy_wasm.h b/src/common/proxy_wasm/ngx_proxy_wasm.h index 0bc7f19c4..20d27f21f 100644 --- a/src/common/proxy_wasm/ngx_proxy_wasm.h +++ b/src/common/proxy_wasm/ngx_proxy_wasm.h @@ -195,6 +195,7 @@ struct ngx_proxy_wasm_exec_s { ngx_proxy_wasm_ctx_t *parent; ngx_proxy_wasm_filter_t *filter; ngx_proxy_wasm_instance_t *ictx; + ngx_proxy_wasm_store_t *store; ngx_event_t *ev; #ifdef NGX_WASM_HTTP ngx_http_proxy_wasm_dispatch_t *call; @@ -203,8 +204,8 @@ struct ngx_proxy_wasm_exec_s { /* flags */ unsigned started:1; - unsigned ecode_logged:1; unsigned in_tick:1; + unsigned ecode_logged:1; }; @@ -259,9 +260,7 @@ struct ngx_proxy_wasm_ctx_s { struct ngx_proxy_wasm_instance_s { - ngx_uint_t next_id; - ngx_uint_t nrefs; - ngx_queue_t q; + ngx_queue_t q; /* store busy/free/sweep */ ngx_rbtree_t tree_ctxs; ngx_rbtree_t root_ctxs; ngx_rbtree_node_t sentinel_ctxs; @@ -274,7 +273,7 @@ struct ngx_proxy_wasm_instance_s { /* swap */ - ngx_proxy_wasm_exec_t *pwexec; + ngx_proxy_wasm_exec_t *pwexec; /* current pwexec */ }; @@ -379,16 +378,14 @@ struct ngx_proxy_wasm_filter_s { }; -/* root */ -void ngx_proxy_wasm_init(ngx_conf_t *cf); -void ngx_proxy_wasm_exit(); -ngx_uint_t ngx_proxy_wasm_id(ngx_str_t *name, ngx_str_t *config, - uintptr_t data); +/* root context */ +void ngx_proxy_wasm_init(ngx_conf_t *cf, ngx_proxy_wasm_store_t *gstore); +void ngx_proxy_wasm_exit(ngx_proxy_wasm_store_t *gstore); ngx_int_t ngx_proxy_wasm_load(ngx_proxy_wasm_filter_t *filter, ngx_log_t *log); ngx_int_t ngx_proxy_wasm_start(ngx_cycle_t *cycle); -/* ctx/store */ +/* stream context */ ngx_proxy_wasm_ctx_t *ngx_proxy_wasm_ctx_alloc(ngx_pool_t *pool); ngx_proxy_wasm_ctx_t *ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, size_t nfilters, ngx_uint_t isolation, ngx_proxy_wasm_subsystem_t *subsys, @@ -396,13 +393,12 @@ ngx_proxy_wasm_ctx_t *ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, void ngx_proxy_wasm_ctx_destroy(ngx_proxy_wasm_ctx_t *pwctx); ngx_int_t ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx, ngx_wasm_phase_t *phase, ngx_proxy_wasm_step_e step); +ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, + ngx_proxy_wasm_step_e step); + + +/* host handlers */ ngx_wavm_ptr_t ngx_proxy_wasm_alloc(ngx_proxy_wasm_exec_t *pwexec, size_t size); -void ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store); -ngx_proxy_wasm_instance_t *ngx_proxy_wasm_get_instance( - ngx_proxy_wasm_filter_t *filter, ngx_proxy_wasm_store_t *store, - ngx_proxy_wasm_exec_t *pwexec, ngx_log_t *log); -void ngx_proxy_wasm_release_instance(ngx_proxy_wasm_instance_t *ictx, - unsigned sweep); /* utils */ @@ -418,8 +414,6 @@ ngx_int_t ngx_proxy_wasm_pairs_unmarshal(ngx_proxy_wasm_exec_t *pwexec, unsigned ngx_proxy_wasm_marshal(ngx_proxy_wasm_exec_t *pwexec, ngx_list_t *list, ngx_array_t *extras, ngx_wavm_ptr_t *out, uint32_t *out_size, ngx_uint_t *truncated); -ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, - ngx_proxy_wasm_instance_t *ictx, ngx_proxy_wasm_step_e step); static ngx_inline void diff --git a/src/common/proxy_wasm/ngx_proxy_wasm_util.c b/src/common/proxy_wasm/ngx_proxy_wasm_util.c index 361793192..5f65f31ff 100644 --- a/src/common/proxy_wasm/ngx_proxy_wasm_util.c +++ b/src/common/proxy_wasm/ngx_proxy_wasm_util.c @@ -193,14 +193,13 @@ ngx_proxy_wasm_log_error(ngx_uint_t level, ngx_log_t *log, void ngx_proxy_wasm_filter_tick_handler(ngx_event_t *ev) { - ngx_log_t *log = ev->log; - ngx_proxy_wasm_exec_t *pwexec = ev->data; + ngx_proxy_wasm_err_e ecode; + ngx_log_t *log = ev->log; + ngx_proxy_wasm_exec_t *pwexec = ev->data; + ngx_proxy_wasm_filter_t *filter = pwexec->filter; #ifdef NGX_WASM_HTTP - ngx_proxy_wasm_ctx_t *pwctx = pwexec->parent; + ngx_proxy_wasm_ctx_t *pwctx = pwexec->parent; #endif - ngx_proxy_wasm_filter_t *filter = pwexec->filter; - ngx_proxy_wasm_instance_t *ictx; - ngx_proxy_wasm_err_e ecode; dd("enter"); @@ -214,31 +213,16 @@ ngx_proxy_wasm_filter_tick_handler(ngx_event_t *ev) return; } - ictx = ngx_proxy_wasm_get_instance(filter, filter->store, pwexec, - filter->log); - if (ictx == NULL) { - ngx_wasm_log_error(NGX_LOG_ERR, log, 0, - "tick_handler: no instance"); - return; - } - #ifdef NGX_WASM_HTTP pwctx->phase = ngx_wasm_phase_lookup(&ngx_http_wasm_subsystem, NGX_WASM_BACKGROUND_PHASE); #endif - pwexec->in_tick = 1; - - ecode = ngx_proxy_wasm_run_step(pwexec, ictx, NGX_PROXY_WASM_STEP_TICK); - - ngx_proxy_wasm_release_instance(ictx, 0); + ecode = ngx_proxy_wasm_run_step(pwexec, NGX_PROXY_WASM_STEP_TICK); if (ecode != NGX_PROXY_WASM_ERR_NONE) { - pwexec->ecode = ecode; return; } - pwexec->in_tick = 0; - if (!ngx_exiting) { pwexec->ev = ngx_calloc(sizeof(ngx_event_t), log); if (pwexec->ev == NULL) { diff --git a/src/http/ngx_http_wasm_module.c b/src/http/ngx_http_wasm_module.c index cb4f6c29d..e0357dc21 100644 --- a/src/http/ngx_http_wasm_module.c +++ b/src/http/ngx_http_wasm_module.c @@ -265,6 +265,7 @@ ngx_http_wasm_create_main_conf(ngx_conf_t *cf) mcf->vm = ngx_wasm_main_vm(cf->cycle); ngx_queue_init(&mcf->plans); + ngx_proxy_wasm_init(cf, &mcf->store); return mcf; } @@ -281,9 +282,6 @@ ngx_http_wasm_init_main_conf(ngx_conf_t *cf, void *conf) return NGX_CONF_ERROR; } - ngx_proxy_wasm_init(cf); - ngx_proxy_wasm_store_init(&mcf->store, cf->pool); - return NGX_CONF_OK; } @@ -445,8 +443,7 @@ ngx_http_wasm_exit_process(ngx_cycle_t *cycle) mcf = ngx_http_cycle_get_module_main_conf(cycle, ngx_http_wasm_module); if (mcf) { - ngx_proxy_wasm_exit(); - ngx_proxy_wasm_store_destroy(&mcf->store); + ngx_proxy_wasm_exit(&mcf->store); ngx_wasm_ops_destroy(mcf->ops); } diff --git a/src/http/ngx_http_wasm_util.c b/src/http/ngx_http_wasm_util.c index 15123c374..c3841f4bc 100644 --- a/src/http/ngx_http_wasm_util.c +++ b/src/http/ngx_http_wasm_util.c @@ -411,8 +411,8 @@ ngx_http_wasm_ops_add_filter(ngx_wasm_ops_plan_t *plan, goto error; } - filter->pool = plan->pool; filter->log = vm->log; + filter->pool = store->pool; filter->store = store; if (config) { diff --git a/src/http/proxy_wasm/ngx_http_proxy_wasm.h b/src/http/proxy_wasm/ngx_http_proxy_wasm.h index 3392a93ad..37a9a052f 100644 --- a/src/http/proxy_wasm/ngx_http_proxy_wasm.h +++ b/src/http/proxy_wasm/ngx_http_proxy_wasm.h @@ -19,7 +19,7 @@ ngx_http_proxy_wasm_get_rctx(ngx_wavm_instance_t *instance) pwexec = ngx_proxy_wasm_instance2pwexec(instance); pwctx = pwexec->parent; - if (!pwctx) { + if (pwctx == NULL) { return NULL; } diff --git a/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c b/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c index d08f240bb..7ff9eb3c2 100644 --- a/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c +++ b/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c @@ -821,7 +821,7 @@ ngx_http_proxy_wasm_dispatch_resume_handler(ngx_wasm_socket_tcp_t *sock) */ pwexec->call = call; - ecode = ngx_proxy_wasm_run_step(pwexec, pwexec->ictx, + ecode = ngx_proxy_wasm_run_step(pwexec, NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE); if (ecode != NGX_PROXY_WASM_ERR_NONE) { goto error; diff --git a/src/wasm/vm/ngx_wavm.c b/src/wasm/vm/ngx_wavm.c index ade694c87..643b7ea24 100644 --- a/src/wasm/vm/ngx_wavm.c +++ b/src/wasm/vm/ngx_wavm.c @@ -1345,12 +1345,12 @@ ngx_wavm_instance_destroy(ngx_wavm_instance_t *instance) ngx_wavm_func_t *func; ngx_wavm_module_t *module = instance->module; - ngx_log_debug5(NGX_LOG_DEBUG_WASM, + ngx_log_debug6(NGX_LOG_DEBUG_WASM, instance->log ? instance->log : ngx_cycle->log, 0, "wasm freeing \"%V\" instance in \"%V\" vm" - " (vm: %p, module: %p, instance: %p)", + " (vm: %p, module: %p, instance: %p, trapped: %d)", &module->name, module->vm->name, - module->vm, module, instance); + module->vm, module, instance, instance->trapped); if (instance->funcs.nelts) { for (i = 0; i < instance->funcs.nelts; i++) { diff --git a/t/03-proxy_wasm/007-on_http_instance_isolation.t b/t/03-proxy_wasm/007-on_http_instance_isolation.t index 7ac729bd6..a58de9c87 100644 --- a/t/03-proxy_wasm/007-on_http_instance_isolation.t +++ b/t/03-proxy_wasm/007-on_http_instance_isolation.t @@ -94,10 +94,8 @@ Should recycle the global instance when trapped. (.*?(Uncaught RuntimeError: )?unreachable|\s*wasm trap: wasm `unreachable` instruction executed)[^#*]* \*\d+ .*? filter chain failed resuming: previous error \(instance trapped\)[^#*]* \*\d+ .*? filter freeing context #\d+ \(1\/2\)[^#*]* -\*\d+ .*? filter freeing context #\d+ \(2\/2\)\Z/, -qr/\A\*\d+ .*? filter freeing trapped instance[^#*]* -\*\d+ wasm freeing "hostcalls" instance in "main" vm[^#*]* -\*\d+ .*? filter new instance[^#*]* +\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*\Z/, +qr/\A\*\d+ .*? filter new instance[^#*]* \*\d+ .*? filter reusing instance[^#*]* \*\d+ .*? filter 1\/2 resuming "on_request_headers" step in "rewrite" phase[^#*]* \*\d+ .*? filter 1\/2 resuming "on_response_headers" step in "header_filter" phase[^#*]* @@ -110,6 +108,7 @@ qr/\A\*\d+ .*? filter freeing trapped instance[^#*]* \*\d+ .*? filter 2\/2 resuming "on_done" step in "done" phase[^#*]* \*\d+ .*? filter 2\/2 finalizing context[^#*]* \*\d+ .*? filter freeing context #\d+ \(1\/2\)[^#*]* +\*\d+ wasm freeing "hostcalls" instance in "main" vm[^#*]* \*\d+ .*? filter freeing context #\d+ \(2\/2\)\Z/] --- no_error_log [emerg] diff --git a/t/03-proxy_wasm/hfuncs/114-proxy_set_http_request_body.t b/t/03-proxy_wasm/hfuncs/114-proxy_set_http_request_body.t index bf147dcf2..1c31f21b1 100644 --- a/t/03-proxy_wasm/hfuncs/114-proxy_set_http_request_body.t +++ b/t/03-proxy_wasm/hfuncs/114-proxy_set_http_request_body.t @@ -235,7 +235,7 @@ HelloWorld } location /log { - # cannot set request body + # not executed (subrequest) proxy_wasm hostcalls 'on=log \ test=/t/set_request_body'; echo $request_body; @@ -258,7 +258,6 @@ from_request_body --- grep_error_log_out eval qr/.*?host trap \(bad usage\): cannot set request body.* \[info\] .*? \*\d+ .*? filter chain failed resuming: previous error \(instance trapped\).*? subrequest: "\/response_headers".* -\[info\] .*? \*\d+ .*? filter chain failed resuming: previous error \(instance trapped\).*? request: "GET \/t HTTP\/1\.1".* \z/ --- no_error_log [alert] diff --git a/t/03-proxy_wasm/hfuncs/119-proxy_properties_get_ngx.t b/t/03-proxy_wasm/hfuncs/119-proxy_properties_get_ngx.t index 28b520b3c..c5225b345 100644 --- a/t/03-proxy_wasm/hfuncs/119-proxy_properties_get_ngx.t +++ b/t/03-proxy_wasm/hfuncs/119-proxy_properties_get_ngx.t @@ -131,13 +131,8 @@ qr/\[info\] .*? property not found: n,/ === TEST 5: proxy_wasm - get_property() ngx.* - not available on: tick (isolation: global) - on_tick runs on the root context, so it does not have access to ngx_http_* calls. - -HTTP 500 since instance recycling happens on next request, and isolation -is global (single instance for root/request). - --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config @@ -146,7 +141,6 @@ is global (single instance for root/request). echo_sleep 0.150; echo ok; } ---- error_code: 500 --- ignore_response_body --- error_log eval [ diff --git a/t/03-proxy_wasm/hfuncs/120-proxy_properties_get_host.t b/t/03-proxy_wasm/hfuncs/120-proxy_properties_get_host.t index 705c2fa3b..aadd5a76c 100644 --- a/t/03-proxy_wasm/hfuncs/120-proxy_properties_get_host.t +++ b/t/03-proxy_wasm/hfuncs/120-proxy_properties_get_host.t @@ -97,7 +97,8 @@ TODO: is this behavior correct? name=wasmx.nonexistent_property'; echo ok; } ---- ignore_response_body +--- response_body +ok --- error_log eval qr/\[info\] .*? property not found: wasmx.nonexistent_property,/ --- no_error_log @@ -129,10 +130,6 @@ qr/\[info\] .*? property not found: was,/ === TEST 4: proxy_wasm - get_property() wasmx - not available on: tick (isolation: global) - -HTTP 500 since instance recycling happens on next request, and isolation -is global (single instance for root/request). - --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config @@ -143,7 +140,6 @@ is global (single instance for root/request). echo_sleep 0.150; echo ok; } ---- error_code: 500 --- ignore_response_body --- error_log eval [ @@ -152,15 +148,10 @@ is global (single instance for root/request). qr/\[crit\] .*? panicked at/, qr/unexpected status: 10/, ] ---- no_error_log -[emerg] === TEST 5: proxy_wasm - get_property() wasmx - not available on: tick (isolation: stream) - -HTTP 200 since the root and request instances are different. - --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config diff --git a/t/03-proxy_wasm/hfuncs/122-proxy_properties_set_host.t b/t/03-proxy_wasm/hfuncs/122-proxy_properties_set_host.t index abe7b0aa0..bfc35f836 100644 --- a/t/03-proxy_wasm/hfuncs/122-proxy_properties_set_host.t +++ b/t/03-proxy_wasm/hfuncs/122-proxy_properties_set_host.t @@ -139,10 +139,8 @@ Does not fail when the property is not found. === TEST 5: proxy_wasm - set_property() wasmx - not available on: tick (isolation: global) - -HTTP 500 since instance recycling happens on next request, and -isolation is global (single instance for root/request). - +on_tick runs on the root context, so it does not have access to +ngx_http_* calls. --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config @@ -157,7 +155,6 @@ isolation is global (single instance for root/request). echo_sleep 0.150; echo ok; } ---- error_code: 500 --- ignore_response_body --- error_log eval [ @@ -172,12 +169,8 @@ isolation is global (single instance for root/request). === TEST 6: proxy_wasm - set_property() wasmx - not available on: tick (isolation: stream) - -HTTP 200 since the root and request instances are different. - on_tick runs on the root context, so it does not have access to ngx_http_* calls. - --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config diff --git a/t/03-proxy_wasm/hfuncs/123-proxy_properties_set_ngx.t b/t/03-proxy_wasm/hfuncs/123-proxy_properties_set_ngx.t index 370854afe..2d8fc6c39 100644 --- a/t/03-proxy_wasm/hfuncs/123-proxy_properties_set_ngx.t +++ b/t/03-proxy_wasm/hfuncs/123-proxy_properties_set_ngx.t @@ -190,13 +190,8 @@ forward the NotFound status back to the caller. === TEST 7: proxy_wasm - set_property() ngx.* - not available on: tick (isolation: global) - on_tick runs on the root context, so it does not have access to ngx_http_* calls. - -HTTP 500 since instance recycling happens on next -request, and isolation is global (single instance for root/request) - --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config @@ -211,7 +206,6 @@ request, and isolation is global (single instance for root/request) echo_sleep 0.150; echo ok; } ---- error_code: 500 --- ignore_response_body --- error_log eval [