diff --git a/src/common/proxy_wasm/ngx_proxy_wasm.c b/src/common/proxy_wasm/ngx_proxy_wasm.c index 05b035172..541fd30a4 100644 --- a/src/common/proxy_wasm/ngx_proxy_wasm.c +++ b/src/common/proxy_wasm/ngx_proxy_wasm.c @@ -10,115 +10,62 @@ #endif -static ngx_int_t ngx_proxy_wasm_init_abi(ngx_proxy_wasm_filter_t *filter); -static ngx_int_t ngx_proxy_wasm_start_filter(ngx_proxy_wasm_filter_t *filter); -static void ngx_proxy_wasm_instance_destroy(ngx_proxy_wasm_instance_t *ictx); -static ngx_proxy_wasm_err_e ngx_proxy_wasm_on_start( - ngx_proxy_wasm_instance_t *ictx, ngx_proxy_wasm_filter_t *filter, - unsigned start); +#define ngx_proxy_wasm_store_init(s, p) \ + (s)->pool = (p); \ + ngx_queue_init(&(s)->sweep); \ + ngx_queue_init(&(s)->free); \ + ngx_queue_init(&(s)->busy) + + +static ngx_proxy_wasm_err_e ngx_proxy_wasm_create_context( + ngx_proxy_wasm_filter_t *filter, ngx_proxy_wasm_ctx_t *pwctx, + ngx_uint_t id, ngx_proxy_wasm_exec_t *in, ngx_proxy_wasm_exec_t **out); static void ngx_proxy_wasm_on_log(ngx_proxy_wasm_exec_t *pwexec); static void ngx_proxy_wasm_on_done(ngx_proxy_wasm_exec_t *pwexec); static ngx_int_t ngx_proxy_wasm_on_tick(ngx_proxy_wasm_exec_t *pwexec); +static ngx_proxy_wasm_filter_t *ngx_proxy_wasm_lookup_filter(ngx_uint_t id); +static ngx_proxy_wasm_exec_t *ngx_proxy_wasm_lookup_root_ctx( + ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id); +static ngx_proxy_wasm_exec_t *ngx_proxy_wasm_lookup_ctx( + ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id); +static ngx_int_t ngx_proxy_wasm_filter_init_abi( + ngx_proxy_wasm_filter_t *filter); +static ngx_int_t ngx_proxy_wasm_filter_start(ngx_proxy_wasm_filter_t *filter); +static void ngx_proxy_wasm_instance_update( + ngx_proxy_wasm_instance_t *ictx, ngx_proxy_wasm_exec_t *pwexec); +static void ngx_proxy_wasm_instance_invalidate(ngx_proxy_wasm_instance_t *ictx); +static void ngx_proxy_wasm_instance_destroy(ngx_proxy_wasm_instance_t *ictx); +static void ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store); +static void ngx_proxy_wasm_store_sweep(ngx_proxy_wasm_store_t *store); +#if 0 +static void ngx_proxy_wasm_store_schedule_sweep_handler(ngx_event_t *ev); +static void ngx_proxy_wasm_store_schedule_sweep(ngx_proxy_wasm_store_t *store); +#endif +static ngx_uint_t next_id = 1; static ngx_rbtree_t ngx_proxy_wasm_filters_rbtree; static ngx_rbtree_node_t ngx_proxy_wasm_filters_sentinel; -static ngx_proxy_wasm_filter_t * -ngx_proxy_wasm_filter_lookup(ngx_uint_t id) -{ - ngx_rbtree_t *rbtree; - ngx_rbtree_node_t *node, *sentinel; - ngx_proxy_wasm_filter_t *filter; - - rbtree = &ngx_proxy_wasm_filters_rbtree; - node = rbtree->root; - sentinel = rbtree->sentinel; - - while (node != sentinel) { - - if (id != node->key) { - node = (id < node->key) ? node->left : node->right; - continue; - } - - filter = ngx_rbtree_data(node, ngx_proxy_wasm_filter_t, node); - - return filter; - } - - return NULL; -} - - -static ngx_proxy_wasm_exec_t * -ngx_proxy_wasm_root_lookup(ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id) -{ - ngx_rbtree_t *rbtree; - ngx_rbtree_node_t *node, *sentinel; - ngx_proxy_wasm_exec_t *pwexec; - - rbtree = &ictx->root_ctxs; - node = rbtree->root; - sentinel = rbtree->sentinel; - - while (node != sentinel) { - - if (id != node->key) { - node = (id < node->key) ? node->left : node->right; - continue; - } - - pwexec = ngx_rbtree_data(node, ngx_proxy_wasm_exec_t, node); - - return pwexec; - } - - return NULL; -} - - -static ngx_proxy_wasm_exec_t * -ngx_proxy_wasm_ctx_lookup(ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id) -{ - ngx_rbtree_t *rbtree; - ngx_rbtree_node_t *node, *sentinel; - ngx_proxy_wasm_exec_t *pwexec; - - rbtree = &ictx->tree_ctxs; - node = rbtree->root; - sentinel = rbtree->sentinel; - - while (node != sentinel) { - - if (id != node->key) { - node = (id < node->key) ? node->left : node->right; - continue; - } - - pwexec = ngx_rbtree_data(node, ngx_proxy_wasm_exec_t, node); - - return pwexec; - } - - return NULL; -} +/* context - root */ void -ngx_proxy_wasm_init(ngx_conf_t *cf) +ngx_proxy_wasm_init(ngx_conf_t *cf, ngx_proxy_wasm_store_t *gstore) { ngx_rbtree_init(&ngx_proxy_wasm_filters_rbtree, &ngx_proxy_wasm_filters_sentinel, ngx_rbtree_insert_value); ngx_proxy_wasm_properties_init(cf); + + ngx_proxy_wasm_store_init(gstore, cf->pool); } void -ngx_proxy_wasm_exit() +ngx_proxy_wasm_exit(ngx_proxy_wasm_store_t *gstore) { ngx_rbtree_node_t **root_node, **sentinel, *node; ngx_proxy_wasm_filter_t *filter; @@ -132,17 +79,15 @@ ngx_proxy_wasm_exit() ngx_rbtree_delete(&ngx_proxy_wasm_filters_rbtree, node); - ngx_log_debug1(NGX_LOG_DEBUG_WASM, filter->log, 0, - "proxy_wasm releasing \"%V\" filter instance", - filter->name); - ngx_proxy_wasm_store_destroy(filter->store); } + + ngx_proxy_wasm_store_destroy(gstore); } -ngx_uint_t -ngx_proxy_wasm_id(ngx_str_t *name, ngx_str_t *config, uintptr_t data) +static ngx_uint_t +get_filter_id(ngx_str_t *name, ngx_str_t *config, uintptr_t data) { u_char buf[NGX_INT64_LEN]; uint32_t hash; @@ -178,12 +123,12 @@ ngx_proxy_wasm_load(ngx_proxy_wasm_filter_t *filter, ngx_log_t *log) filter->log = log; filter->name = &filter->module->name; - filter->id = ngx_proxy_wasm_id(filter->name, &filter->config, filter->data); + filter->id = get_filter_id(filter->name, &filter->config, filter->data); filter->node.key = filter->id; ngx_rbtree_insert(&ngx_proxy_wasm_filters_rbtree, &filter->node); - if (ngx_proxy_wasm_init_abi(filter) != NGX_OK) { + if (ngx_proxy_wasm_filter_init_abi(filter) != NGX_OK) { return NGX_ERROR; } @@ -217,7 +162,7 @@ ngx_proxy_wasm_start(ngx_cycle_t *cycle) { filter = ngx_rbtree_data(node, ngx_proxy_wasm_filter_t, node); - rc = ngx_proxy_wasm_start_filter(filter); + rc = ngx_proxy_wasm_filter_start(filter); if (rc != NGX_OK) { ngx_proxy_wasm_log_error(NGX_LOG_EMERG, filter->log, filter->ecode, "failed initializing \"%V\" filter", @@ -232,6 +177,9 @@ ngx_proxy_wasm_start(ngx_cycle_t *cycle) } +/* context - stream */ + + ngx_proxy_wasm_ctx_t * ngx_proxy_wasm_ctx_alloc(ngx_pool_t *pool) { @@ -260,15 +208,11 @@ ngx_proxy_wasm_ctx_t * ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, size_t nfilters, ngx_uint_t isolation, ngx_proxy_wasm_subsystem_t *subsys, void *data) { - size_t i; - ngx_uint_t id; - ngx_pool_t *pool; - ngx_log_t *log; - ngx_proxy_wasm_ctx_t *pwctx; - ngx_proxy_wasm_exec_t *pwexec; - ngx_proxy_wasm_filter_t *filter; - ngx_proxy_wasm_instance_t *ictx; - ngx_proxy_wasm_store_t *pwstore = NULL; + size_t i; + ngx_uint_t id; + ngx_proxy_wasm_ctx_t *pwctx; + ngx_proxy_wasm_filter_t *filter; + ngx_proxy_wasm_exec_t *pwexec = NULL; pwctx = subsys->get_context(data); if (pwctx == NULL) { @@ -292,115 +236,114 @@ ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, size_t nfilters, for (i = 0; i < nfilters; i++) { id = filter_ids[i]; - filter = ngx_proxy_wasm_filter_lookup(id); + filter = ngx_proxy_wasm_lookup_filter(id); if (filter == NULL) { /* TODO: log error */ ngx_wasm_assert(0); return NULL; } - pool = pwctx->pool; - log = pwctx->log; - - switch (pwctx->isolation) { - case NGX_PROXY_WASM_ISOLATION_NONE: - pwstore = filter->store; - break; - case NGX_PROXY_WASM_ISOLATION_STREAM: - pwstore = &pwctx->store; - break; - case NGX_PROXY_WASM_ISOLATION_FILTER: - break; - default: - ngx_proxy_wasm_log_error(NGX_LOG_WASM_NYI, pwctx->log, 0, - "NYI - instance isolation: %d", - pwctx->isolation); - return NULL; - } - - pwexec = ngx_array_push(&pwctx->pwexecs); + (void) ngx_proxy_wasm_create_context(filter, pwctx, + next_id++, NULL, &pwexec); if (pwexec == NULL) { return NULL; } - ngx_memzero(pwexec, sizeof(ngx_proxy_wasm_exec_t)); + } /* for () */ + + pwctx->ready = 1; - pwexec->index = i; - pwexec->pool = pool; - pwexec->filter = filter; - pwexec->parent = pwctx; - pwexec->root_id = filter->id; + } /* !ready */ - pwexec->log = ngx_pcalloc(pwexec->pool, sizeof(ngx_log_t)); - if (pwexec->log == NULL) { - return NULL; - } + return pwctx; +} - pwexec->log->file = log->file; - pwexec->log->next = log->next; - pwexec->log->writer = log->writer; - pwexec->log->wdata = log->wdata; - pwexec->log->log_level = log->log_level; - pwexec->log->connection = log->connection; - pwexec->log->handler = ngx_proxy_wasm_log_error_handler; - pwexec->log->data = &pwexec->log_ctx; - pwexec->log_ctx.pwexec = pwexec; - pwexec->log_ctx.orig_log = log; +static void +destroy_pwexec(ngx_proxy_wasm_exec_t *pwexec) +{ + pwexec->ictx = NULL; - ictx = ngx_proxy_wasm_get_instance(filter, pwstore, pwexec, log); - if (ictx == NULL) { - return NULL; - } + if (pwexec->ev) { + ngx_del_timer(pwexec->ev); + ngx_free(pwexec->ev); - pwexec->ictx = ictx; + pwexec->ev = NULL; + } - ngx_wasm_assert(pwexec->index + 1 == pwctx->pwexecs.nelts); + if (pwexec->log) { + if (pwexec->log_ctx.log_prefix.data) { + ngx_pfree(pwexec->pool, pwexec->log_ctx.log_prefix.data); + } - } /* for () */ + ngx_pfree(pwexec->pool, pwexec->log); - pwctx->ready = 1; + pwexec->log = NULL; + } - } /* !ready */ + if (pwexec->parent) { + if (pwexec->log_ctx.log_prefix.data) { + ngx_pfree(pwexec->pool, pwexec->log_ctx.log_prefix.data); - return pwctx; + pwexec->log_ctx.log_prefix.data = NULL; + } + + if (pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID) { + ngx_pfree(pwexec->pool, pwexec->parent); + + pwexec->parent = NULL; + + } else if (pwexec->parent->isolation + == NGX_PROXY_WASM_ISOLATION_FILTER + && pwexec->store) + { + /* store specifically allocated for the pwexec */ + ngx_pfree(pwexec->parent->pool, pwexec->store); + + pwexec->store = NULL; + } + } } void ngx_proxy_wasm_ctx_destroy(ngx_proxy_wasm_ctx_t *pwctx) { - size_t i; - ngx_proxy_wasm_exec_t *pwexec, *pwexecs; - ngx_proxy_wasm_instance_t *ictx; + size_t i; + ngx_proxy_wasm_exec_t *pwexec, *pwexecs; pwexecs = (ngx_proxy_wasm_exec_t *) pwctx->pwexecs.elts; for (i = 0; i < pwctx->pwexecs.nelts; i++) { pwexec = &pwexecs[i]; - ictx = pwexec->ictx; ngx_wasm_assert(pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID); - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwexec->log, 0, + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwctx->log, 0, "\"%V\" filter freeing context #%d " "(%l/%l)", pwexec->filter->name, pwexec->id, pwexec->index + 1, pwctx->nfilters); - if (pwexec->log) { - if (pwexec->log_ctx.log_prefix.data) { - ngx_pfree(pwexec->pool, pwexec->log_ctx.log_prefix.data); + + if (pwexec->ictx) { + if (pwexec->node.key) { + ngx_rbtree_delete(&pwexec->ictx->tree_ctxs, &pwexec->node); } - ngx_pfree(pwexec->pool, pwexec->log); + if (pwctx->isolation == NGX_PROXY_WASM_ISOLATION_NONE) { + /* sweep if an instance has trapped */ + ngx_proxy_wasm_store_sweep(pwexec->ictx->store); + } } - ngx_proxy_wasm_release_instance(ictx, 0); + destroy_pwexec(pwexec); } ngx_proxy_wasm_store_destroy(&pwctx->store); + ngx_array_destroy(&pwctx->pwexecs); + #if 0 if (pwctx->authority.data) { ngx_pfree(pwctx->pool, pwctx->authority.data); @@ -454,7 +397,7 @@ ngx_proxy_wasm_ctx_destroy(ngx_proxy_wasm_ctx_t *pwctx) static ngx_int_t -ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx, +action2rc(ngx_proxy_wasm_ctx_t *pwctx, ngx_proxy_wasm_exec_t *pwexec) { ngx_int_t rc = NGX_ERROR; @@ -480,7 +423,7 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx, #if (NGX_DEBUG) else { if (pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID) { - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwexec->log, + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwctx->log, pwexec->ecode, "root context skipping \"%V\" " "step in \"%V\" phase", @@ -488,7 +431,7 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx, &pwctx->phase->name); } else { - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwexec->log, + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwctx->log, pwexec->ecode, "filter %l/%l skipping \"%V\" " "step in \"%V\" phase", @@ -516,6 +459,7 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx, /* determine current action rc */ switch (action) { + case NGX_PROXY_WASM_ACTION_DONE: ngx_log_debug0(NGX_LOG_DEBUG_WASM, pwctx->log, 0, "proxy_wasm action " "\"DONE\" -> \"CONTINUE\" to resume later phases"); @@ -533,17 +477,16 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx, case NGX_HTTP_REWRITE_PHASE: case NGX_HTTP_ACCESS_PHASE: case NGX_HTTP_CONTENT_PHASE: - ngx_log_debug7(NGX_LOG_DEBUG_WASM, pwctx->log, 0, + ngx_log_debug6(NGX_LOG_DEBUG_WASM, pwctx->log, 0, "proxy_wasm pausing in \"%V\" phase " - "(root: %d, filter: %l/%l, step: %d, action: %d, " + "(filter: %l/%l, step: %d, action: %d, " "pwctx: %p)", &pwctx->phase->name, - pwexec->id == NGX_PROXY_WASM_ROOT_CTX_ID, pwexec->index + 1, pwctx->nfilters, pwctx->step, action, pwctx); goto yield; #endif default: - ngx_proxy_wasm_log_error(NGX_LOG_ERR, pwexec->log, pwexec->ecode, + ngx_proxy_wasm_log_error(NGX_LOG_ERR, pwctx->log, pwexec->ecode, "bad \"%V\" return action: \"PAUSE\"", ngx_proxy_wasm_step_name(pwctx->step)); @@ -595,31 +538,143 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx, } +ngx_int_t +ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx, + ngx_wasm_phase_t *phase, ngx_proxy_wasm_step_e step) +{ + size_t i; + ngx_int_t rc = NGX_OK; + ngx_proxy_wasm_exec_t *pwexec, *pwexecs; + + dd("enter"); + + switch (step) { + case NGX_PROXY_WASM_STEP_TICK: + case NGX_PROXY_WASM_STEP_DONE: + case NGX_PROXY_WASM_STEP_RESP_BODY: + case NGX_PROXY_WASM_STEP_RESP_HEADERS: + case NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE: + break; + default: + if (step <= pwctx->last_step) { + dd("step %d already completed, exit", step); + ngx_wasm_assert(rc == NGX_OK); + goto ret; + } + } + + pwctx->step = step; + + /* resume filters chain */ + + pwexecs = (ngx_proxy_wasm_exec_t *) pwctx->pwexecs.elts; + + dd("pwctx->exec_index: %ld, nelts: %ld", + pwctx->exec_index, pwctx->pwexecs.nelts); + + for (i = pwctx->exec_index; i < pwctx->pwexecs.nelts; i++) { + dd("exec_index: %ld", i); + + pwexec = &pwexecs[i]; + + ngx_wasm_assert(pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID); + + /* check for yielded state */ + + rc = action2rc(pwctx, pwexec); + if (rc != NGX_OK) { + goto ret; + } + + /* run step */ + + pwexec->ecode = ngx_proxy_wasm_run_step(pwexec, step); + dd("pwexec->ecode: %d, pwctx->action: %d (pwctx: %p)", + pwexec->ecode, pwctx->action, pwctx); + if (pwexec->ecode != NGX_PROXY_WASM_ERR_NONE) { + rc = pwexec->filter->subsystem->ecode(pwexec->ecode); + goto ret; + } + + /* check for yield/done */ + + rc = action2rc(pwctx, pwexec); + if (rc != NGX_OK) { + if (rc == NGX_AGAIN + && pwctx->exec_index + 1 <= pwctx->nfilters) + { + dd("yield: resume on next filter " + "(idx: %ld -> %ld, nelts: %ld)", + pwctx->exec_index, pwctx->exec_index + 1, + pwctx->pwexecs.nelts); + + pwctx->exec_index++; + } + + goto ret; + } + + pwctx->exec_index++; + + dd("-------- next filter --------"); + } + + ngx_wasm_assert(rc == NGX_OK); + + /* next step */ + + pwctx->last_step = pwctx->step; + pwctx->exec_index = 0; + +ret: + + if (step == NGX_PROXY_WASM_STEP_DONE) { + ngx_proxy_wasm_ctx_destroy(pwctx); + } + + dd("exit rc: %ld", rc); + + return rc; +} + + ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, - ngx_proxy_wasm_instance_t *ictx, ngx_proxy_wasm_step_e step) + ngx_proxy_wasm_step_e step) { ngx_int_t rc; + ngx_proxy_wasm_err_e ecode; ngx_proxy_wasm_action_e action = NGX_PROXY_WASM_ACTION_CONTINUE; ngx_proxy_wasm_ctx_t *pwctx = pwexec->parent; ngx_proxy_wasm_filter_t *filter = pwexec->filter; - ngx_wavm_instance_t *instance = ictx->instance; #if (NGX_DEBUG) ngx_proxy_wasm_action_e old_action = pwctx->action; #endif - ngx_wasm_assert(ictx->module == filter->module); ngx_wasm_assert(pwctx->phase); - ictx->pwexec = pwexec; /* set instance current pwexec */ - pwexec->ictx = ictx; /* link pwexec to instance */ + dd("enter (pwexec: %p, ictx: %p, trapped: %d)", + pwexec, pwexec->ictx, + pwexec->ictx ? pwexec->ictx->instance->trapped : 0); - /* - * update instance log - * (instance->data = ictx already set by instance_new) - */ - ngx_wavm_instance_set_data(instance, ictx, - pwexec ? pwexec->log : filter->log); +#if 0 + /* possible optimization, commented for linter */ + if (pwexec->ictx->instance->trapped) { +#endif + ecode = ngx_proxy_wasm_create_context(filter, pwctx, pwexec->id, + pwexec, NULL); + if (ecode != NGX_PROXY_WASM_ERR_NONE) { + ngx_wasm_assert(0); + return ecode; + } +#if 0 + } +#endif + + ngx_wasm_assert(!pwexec->ictx->instance->trapped); + ngx_wasm_assert(pwexec->ictx->module == filter->module); + + ngx_proxy_wasm_instance_update(pwexec->ictx, pwexec); if (pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID) { ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwexec->log, 0, @@ -659,7 +714,9 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, rc = filter->subsystem->resume(pwexec, step, &action); break; case NGX_PROXY_WASM_STEP_TICK: + pwexec->in_tick = 1; rc = ngx_proxy_wasm_on_tick(pwexec); + pwexec->in_tick = 0; break; default: ngx_proxy_wasm_log_error(NGX_LOG_WASM_NYI, pwctx->log, 0, @@ -668,8 +725,8 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, break; } - dd("rc: %ld, old_action: %d, pwctx->action: %d, ret action: %d", - rc, old_action, pwctx->action, action); + dd("rc: %ld, old_action: %d, pwctx->action: %d, ret action: %d, ictx: %p", + rc, old_action, pwctx->action, action, pwexec->ictx); /* pwctx->action writes in host calls overwrite action return value */ @@ -701,181 +758,568 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, done: - ictx->pwexec = NULL; - return pwexec->ecode; } -ngx_int_t -ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx, - ngx_wasm_phase_t *phase, ngx_proxy_wasm_step_e step) -{ - size_t i; - ngx_int_t rc = NGX_OK; - ngx_proxy_wasm_filter_t *filter; - ngx_proxy_wasm_instance_t *ictx; - ngx_proxy_wasm_exec_t *pwexec, *pwexecs; +/* host handlers */ - dd("enter"); - switch (step) { - case NGX_PROXY_WASM_STEP_TICK: - case NGX_PROXY_WASM_STEP_DONE: - case NGX_PROXY_WASM_STEP_RESP_BODY: - case NGX_PROXY_WASM_STEP_RESP_HEADERS: - case NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE: - break; - default: - if (step <= pwctx->last_step) { - dd("step %d already completed, exit", step); - ngx_wasm_assert(rc == NGX_OK); - goto ret; - } - } +ngx_wavm_ptr_t +ngx_proxy_wasm_alloc(ngx_proxy_wasm_exec_t *pwexec, size_t size) +{ + ngx_wavm_ptr_t p; + ngx_int_t rc; + wasm_val_vec_t *rets; + ngx_proxy_wasm_filter_t *filter = pwexec->filter; + ngx_wavm_instance_t *instance = ngx_proxy_wasm_pwexec2instance(pwexec); - pwctx->step = step; + rc = ngx_wavm_instance_call_funcref(instance, + filter->proxy_on_memory_allocate, + &rets, size); + if (rc != NGX_OK) { + ngx_proxy_wasm_log_error(NGX_LOG_CRIT, pwexec->log, 0, + "proxy_wasm_alloc(%uz) failed", size); + return 0; + } - /* resume filters chain */ + p = rets->data[0].of.i32; - pwexecs = (ngx_proxy_wasm_exec_t *) pwctx->pwexecs.elts; + ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwexec->log, 0, + "proxy_wasm_alloc: %uz:%uz:%uz", + ngx_wavm_memory_data_size(instance->memory), p, size); - for (i = pwctx->exec_index; i < pwctx->pwexecs.nelts; i++) { - dd("exec_index: %ld", i); + return p; +} - pwexec = &pwexecs[i]; - filter = pwexec->filter; - ictx = pwexec->ictx; - ngx_wasm_assert(pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID); +static ngx_proxy_wasm_instance_t * +get_instance(ngx_proxy_wasm_filter_t *filter, + ngx_proxy_wasm_store_t *store, ngx_log_t *log) +{ + ngx_queue_t *q; + ngx_wavm_module_t *module = filter->module; + ngx_proxy_wasm_instance_t *ictx; + + dd("get instance in store: %p", store); + + for (q = ngx_queue_head(&store->busy); + q != ngx_queue_sentinel(&store->busy); + q = ngx_queue_next(q)) + { + ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - /* check for trap */ + if (ictx->instance->trapped) { + q = ngx_queue_next(&ictx->q); + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, + "\"%V\" filter invalidating trapped " + "instance (ictx: %p, store: %p)", + filter->name, ictx, store); + ngx_proxy_wasm_instance_invalidate(ictx); + continue; + } - if (ictx->instance->trapped && !pwexec->ecode) { - pwexec->ecode = NGX_PROXY_WASM_ERR_INSTANCE_TRAPPED; + if (ictx->module == module) { + dd("reuse busy instance"); + goto reuse; } + } - /* check for yielded state */ + for (q = ngx_queue_head(&store->free); + q != ngx_queue_sentinel(&store->free); + q = ngx_queue_next(q)) + { + ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - rc = ngx_proxy_wasm_action2rc(pwctx, pwexec); - if (rc != NGX_OK) { - goto ret; + ngx_wasm_assert(!ictx->instance->trapped); + + if (ictx->module == module) { + dd("reuse free instance, going to busy"); + ngx_queue_remove(&ictx->q); + ngx_queue_insert_tail(&store->busy, &ictx->q); + goto reuse; } + } - /* run step */ + dd("create instance in store: %p", store); - pwexec->ecode = ngx_proxy_wasm_run_step(pwexec, ictx, step); - dd("pwexec->ecode: %d, pwctx->action: %d (pwctx: %p)", - pwexec->ecode, pwctx->action, pwctx); - if (pwexec->ecode != NGX_PROXY_WASM_ERR_NONE) { - rc = filter->subsystem->ecode(pwexec->ecode); - goto ret; + ictx = ngx_pcalloc(store->pool, sizeof(ngx_proxy_wasm_instance_t)); + if (ictx == NULL) { + goto error; + } + + ictx->pool = store->pool; + ictx->log = log; + ictx->store = store; + ictx->module = module; + + ngx_rbtree_init(&ictx->root_ctxs, &ictx->sentinel_root_ctxs, + ngx_rbtree_insert_value); + + ngx_rbtree_init(&ictx->tree_ctxs, &ictx->sentinel_ctxs, + ngx_rbtree_insert_value); + + ictx->instance = ngx_wavm_instance_create(ictx->module, ictx->pool, + ictx->log, ictx); + if (ictx->instance == NULL) { + ngx_pfree(store->pool, ictx); + goto error; + } + + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, + "\"%V\" filter new instance (ictx: %p, store: %p)", + filter->name, ictx, store); + + ngx_queue_insert_tail(&store->busy, &ictx->q); + + goto done; + +reuse: + + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, + "\"%V\" filter reusing instance " + "(ictx: %p, store: %p)", + filter->name, ictx, store); + +done: + + return ictx; + +error: + + return NULL; +} + + +static ngx_proxy_wasm_err_e +ngx_proxy_wasm_create_context(ngx_proxy_wasm_filter_t *filter, + ngx_proxy_wasm_ctx_t *pwctx, ngx_uint_t id, ngx_proxy_wasm_exec_t *in, + ngx_proxy_wasm_exec_t **out) +{ + ngx_int_t rc; + ngx_log_t *log; + wasm_val_vec_t *rets; + ngx_proxy_wasm_instance_t *ictx; + ngx_proxy_wasm_store_t *store; + ngx_proxy_wasm_exec_t *rexec = NULL, *pwexec = NULL; + ngx_proxy_wasm_err_e ecode = NGX_PROXY_WASM_ERR_UNKNOWN; + + dd("enter (filter: \"%.*s\", id: %ld)", + (int) filter->name->len, filter->name->data, id); + + if (pwctx == NULL) { + /* root context */ + ngx_wasm_assert(id == 0); + log = filter->log; + store = filter->store; + + } else { + /* filter context */ + log = pwctx->log; + + switch (pwctx->isolation) { + case NGX_PROXY_WASM_ISOLATION_NONE: + store = filter->store; + break; + case NGX_PROXY_WASM_ISOLATION_STREAM: + store = &pwctx->store; + break; + case NGX_PROXY_WASM_ISOLATION_FILTER: + store = ngx_palloc(pwctx->pool, sizeof(ngx_proxy_wasm_store_t)); + if (store == NULL) { + goto error; + } + + ngx_proxy_wasm_store_init(store, pwctx->pool); + break; + default: + ngx_proxy_wasm_log_error(NGX_LOG_WASM_NYI, pwctx->log, 0, + "NYI - instance isolation: %d", + pwctx->isolation); + goto error; } + } - /* check for yield/done */ + /* get instance */ + + if (in && in->ictx && !in->ictx->instance->trapped) { + ictx = in->ictx; + + } else { + ictx = get_instance(filter, store, log); + if (ictx == NULL) { + goto error; + } + } + + /* create root context */ + + rexec = ngx_proxy_wasm_lookup_root_ctx(ictx, filter->id); + dd("rexec for id %ld: %p", filter->id, rexec); + if (rexec == NULL) { + rexec = ngx_pcalloc(ictx->pool, sizeof(ngx_proxy_wasm_exec_t)); + if (rexec == NULL) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } + + rexec->root_id = NGX_PROXY_WASM_ROOT_CTX_ID; + rexec->id = filter->id; + rexec->pool = ictx->pool; + rexec->log = filter->log; + rexec->filter = filter; + rexec->ictx = ictx; + + log = filter->log; + + rexec->log = ngx_pcalloc(rexec->pool, sizeof(ngx_log_t)); + if (rexec->log == NULL) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } + + rexec->log->file = log->file; + rexec->log->next = log->next; + rexec->log->writer = log->writer; + rexec->log->wdata = log->wdata; + rexec->log->log_level = log->log_level; + rexec->log->handler = ngx_proxy_wasm_log_error_handler; + rexec->log->data = &rexec->log_ctx; + + rexec->log_ctx.pwexec = rexec; + rexec->log_ctx.orig_log = log; + + rexec->parent = ngx_pcalloc(rexec->pool, sizeof(ngx_proxy_wasm_ctx_t)); + if (rexec->parent == NULL) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } + + rexec->parent->id = NGX_PROXY_WASM_ROOT_CTX_ID; + rexec->parent->pool = rexec->pool; + rexec->parent->log = rexec->log; + rexec->parent->isolation = NGX_PROXY_WASM_ISOLATION_STREAM; + + rexec->node.key = rexec->id; + ngx_rbtree_insert(&ictx->root_ctxs, &rexec->node); + } + + /* start root context */ - rc = ngx_proxy_wasm_action2rc(pwctx, pwexec); + if (!rexec->started) { + dd("start root exec ctx (rexec: %p, root_id: %ld, id: %ld, ictx: %p)", + rexec, rexec->root_id, rexec->id, ictx); + + ngx_proxy_wasm_instance_update(ictx, rexec); + + rc = ngx_wavm_instance_call_funcref(ictx->instance, + filter->proxy_on_context_create, + NULL, + rexec->id, rexec->root_id); if (rc != NGX_OK) { - if (rc == NGX_AGAIN - && pwctx->exec_index + 1 <= pwctx->nfilters) - { - dd("yield: resume on next filter " - "(idx: %ld -> %ld, nelts: %ld)", - pwctx->exec_index, pwctx->exec_index + 1, - pwctx->pwexecs.nelts); + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } - pwctx->exec_index++; + if (id == NGX_PROXY_WASM_ROOT_CTX_ID) { + rc = ngx_wavm_instance_call_funcref(ictx->instance, + filter->proxy_on_vm_start, + &rets, + rexec->id, rexec->root_id); + if (rc != NGX_OK || !rets->data[0].of.i32) { + ecode = NGX_PROXY_WASM_ERR_VM_START_FAILED; + goto error; } - - goto ret; } - pwctx->exec_index++; + rc = ngx_wavm_instance_call_funcref(ictx->instance, + filter->proxy_on_plugin_start, + &rets, + rexec->id, filter->config.len); + if (rc != NGX_OK || !rets->data[0].of.i32) { + ecode = NGX_PROXY_WASM_ERR_CONFIGURE_FAILED; + goto error; + } - dd("-------- next filter --------"); + rexec->started = 1; } - ngx_wasm_assert(rc == NGX_OK); + /* start filter context */ - /* next step */ + if (id) { + pwexec = ngx_proxy_wasm_lookup_ctx(ictx, id); - pwctx->last_step = pwctx->step; - pwctx->exec_index = 0; + dd("pwexec for id %ld: %p", id, pwexec); -ret: + if (pwexec == NULL) { - if (step == NGX_PROXY_WASM_STEP_DONE) { - ngx_proxy_wasm_ctx_destroy(pwctx); + if (in == NULL) { + pwexec = ngx_array_push(&pwctx->pwexecs); + if (pwexec == NULL) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } + + ngx_memzero(pwexec, sizeof(ngx_proxy_wasm_exec_t)); + + pwexec->id = id; + pwexec->root_id = filter->id; + pwexec->index = pwctx->pwexecs.nelts - 1; + pwexec->pool = ictx->store->pool; + pwexec->filter = filter; + pwexec->parent = pwctx; + pwexec->ictx = ictx; + pwexec->store = ictx->store; + + } else { + if (in->ictx != ictx) { + dd("replace pwexec instance"); + + in->ictx = ictx; + in->started = 0; + } + + pwexec = in; + } + + + if (pwexec->log == NULL) { + log = pwctx->log; + + pwexec->log = ngx_pcalloc(pwexec->pool, sizeof(ngx_log_t)); + if (pwexec->log == NULL) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } + + pwexec->log->file = log->file; + pwexec->log->next = log->next; + pwexec->log->writer = log->writer; + pwexec->log->wdata = log->wdata; + pwexec->log->log_level = log->log_level; + pwexec->log->connection = log->connection; + pwexec->log->handler = ngx_proxy_wasm_log_error_handler; + pwexec->log->data = &pwexec->log_ctx; + + pwexec->log_ctx.pwexec = pwexec; + pwexec->log_ctx.orig_log = log; + } + } +#if (NGX_DEBUG) + else { + ngx_wasm_assert(pwexec->id == id); + dd("pwexec #%ld found in instance %p", pwexec->id, ictx); + } +#endif + + if (!pwexec->started) { + dd("start exec ctx (pwexec: %p, id: %ld, ictx: %p)", + pwexec, id, ictx); + + ngx_proxy_wasm_instance_update(ictx, pwexec); + + rc = ngx_wavm_instance_call_funcref(ictx->instance, + filter->proxy_on_context_create, + NULL, + id, filter->id); + if (rc != NGX_OK) { + ecode = NGX_PROXY_WASM_ERR_START_FAILED; + goto error; + } + + pwexec->node.key = pwexec->id; + ngx_rbtree_insert(&ictx->tree_ctxs, &pwexec->node); + + pwexec->started = 1; + } } - dd("exit rc: %ld", rc); + if (out) { + *out = pwexec; + } - return rc; + return NGX_PROXY_WASM_ERR_NONE; + +error: + + if (ecode != NGX_PROXY_WASM_ERR_NONE) { + if (pwexec) { + pwexec->ecode = ecode; + + } else { + filter->ecode = ecode; + } + } + + return ecode; } -ngx_wavm_ptr_t -ngx_proxy_wasm_alloc(ngx_proxy_wasm_exec_t *pwexec, size_t size) +static void +ngx_proxy_wasm_on_log(ngx_proxy_wasm_exec_t *pwexec) { - ngx_wavm_ptr_t p; - ngx_int_t rc; - wasm_val_vec_t *rets; ngx_proxy_wasm_filter_t *filter = pwexec->filter; ngx_wavm_instance_t *instance = ngx_proxy_wasm_pwexec2instance(pwexec); - rc = ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_memory_allocate, - &rets, size); - if (rc != NGX_OK) { - ngx_proxy_wasm_log_error(NGX_LOG_CRIT, pwexec->log, 0, - "proxy_wasm_alloc(%uz) failed", size); - return 0; + if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { + /* 0.1.0 - 0.2.1 */ + (void) ngx_wavm_instance_call_funcref(instance, filter->proxy_on_done, + NULL, pwexec->id); } - p = rets->data[0].of.i32; + (void) ngx_wavm_instance_call_funcref(instance, filter->proxy_on_log, + NULL, pwexec->id); +} - ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwexec->log, 0, - "proxy_wasm_alloc: %uz:%uz:%uz", - ngx_wavm_memory_data_size(instance->memory), p, size); - return p; +static void +ngx_proxy_wasm_on_done(ngx_proxy_wasm_exec_t *pwexec) +{ + ngx_wavm_instance_t *instance; + ngx_proxy_wasm_filter_t *filter = pwexec->filter; +#ifdef NGX_WASM_HTTP + ngx_http_proxy_wasm_dispatch_t *call; +#endif + + instance = ngx_proxy_wasm_pwexec2instance(pwexec); + + ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwexec->log, 0, + "filter %l/%l finalizing context", + pwexec->index + 1, pwexec->parent->nfilters); + +#ifdef NGX_WASM_HTTP + call = pwexec->call; + if (call) { + ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwexec->log, 0, + "proxy_wasm \"%V\" filter (%l/%l) " + "cancelling HTTP dispatch", + pwexec->filter->name, pwexec->index + 1, + pwexec->parent->nfilters); + + ngx_http_proxy_wasm_dispatch_destroy(call); + + pwexec->call = NULL; + } +#endif + + (void) ngx_wavm_instance_call_funcref(instance, + filter->proxy_on_context_finalize, + NULL, pwexec->id); + + if (pwexec->node.key) { + ngx_rbtree_delete(&pwexec->ictx->tree_ctxs, &pwexec->node); + } } -void -ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store) +static ngx_int_t +ngx_proxy_wasm_on_tick(ngx_proxy_wasm_exec_t *pwexec) { - ngx_queue_t *q; - ngx_proxy_wasm_instance_t *ictx; + ngx_int_t rc; + wasm_val_vec_t args; + ngx_proxy_wasm_filter_t *filter = pwexec->filter; - dd("enter"); + ngx_wasm_assert(pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID); - while (!ngx_queue_empty(&store->busy)) { - dd("remove busy"); - q = ngx_queue_head(&store->busy); - ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); + wasm_val_vec_new_uninitialized(&args, 1); + ngx_wasm_vec_set_i32(&args, 0, pwexec->id); - ngx_queue_remove(&ictx->q); - ngx_queue_insert_tail(&store->free, &ictx->q); + rc = ngx_wavm_instance_call_funcref_vec(pwexec->ictx->instance, + filter->proxy_on_timer_ready, + NULL, &args); + + wasm_val_vec_delete(&args); + + return rc; +} + + +/* utils */ + + +static ngx_proxy_wasm_filter_t * +ngx_proxy_wasm_lookup_filter(ngx_uint_t id) +{ + ngx_rbtree_t *rbtree; + ngx_rbtree_node_t *node, *sentinel; + ngx_proxy_wasm_filter_t *filter; + + rbtree = &ngx_proxy_wasm_filters_rbtree; + node = rbtree->root; + sentinel = rbtree->sentinel; + + while (node != sentinel) { + + if (id != node->key) { + node = (id < node->key) ? node->left : node->right; + continue; + } + + filter = ngx_rbtree_data(node, ngx_proxy_wasm_filter_t, node); + + return filter; } - while (!ngx_queue_empty(&store->free)) { - q = ngx_queue_head(&store->free); - ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); + return NULL; +} - dd("remove free"); - ngx_proxy_wasm_release_instance(ictx, 1); + +static ngx_proxy_wasm_exec_t * +ngx_proxy_wasm_lookup_root_ctx(ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id) +{ + ngx_rbtree_t *rbtree; + ngx_rbtree_node_t *node, *sentinel; + ngx_proxy_wasm_exec_t *pwexec; + + rbtree = &ictx->root_ctxs; + node = rbtree->root; + sentinel = rbtree->sentinel; + + while (node != sentinel) { + + if (id != node->key) { + node = (id < node->key) ? node->left : node->right; + continue; + } + + pwexec = ngx_rbtree_data(node, ngx_proxy_wasm_exec_t, node); + + return pwexec; } - dd("exit"); + return NULL; } -/* static */ +static ngx_proxy_wasm_exec_t * +ngx_proxy_wasm_lookup_ctx(ngx_proxy_wasm_instance_t *ictx, ngx_uint_t id) +{ + ngx_rbtree_t *rbtree; + ngx_rbtree_node_t *node, *sentinel; + ngx_proxy_wasm_exec_t *pwexec; + + rbtree = &ictx->tree_ctxs; + node = rbtree->root; + sentinel = rbtree->sentinel; + + while (node != sentinel) { + + if (id != node->key) { + node = (id < node->key) ? node->left : node->right; + continue; + } + + pwexec = ngx_rbtree_data(node, ngx_proxy_wasm_exec_t, node); + + return pwexec; + } + + return NULL; +} -static ngx_inline ngx_wavm_funcref_t * -ngx_proxy_wasm_lookup_func(ngx_proxy_wasm_filter_t *filter, const char *n) +static ngx_wavm_funcref_t * +get_func(ngx_proxy_wasm_filter_t *filter, const char *n) { ngx_str_t name; @@ -887,7 +1331,7 @@ ngx_proxy_wasm_lookup_func(ngx_proxy_wasm_filter_t *filter, const char *n) static ngx_proxy_wasm_abi_version_e -ngx_proxy_wasm_abi_version(ngx_proxy_wasm_filter_t *filter) +get_abi_version(ngx_proxy_wasm_filter_t *filter) { size_t i; ngx_wavm_module_t *module = filter->module; @@ -930,14 +1374,14 @@ ngx_proxy_wasm_abi_version(ngx_proxy_wasm_filter_t *filter) static ngx_int_t -ngx_proxy_wasm_init_abi(ngx_proxy_wasm_filter_t *filter) +ngx_proxy_wasm_filter_init_abi(ngx_proxy_wasm_filter_t *filter) { ngx_log_debug4(NGX_LOG_DEBUG_WASM, filter->log, 0, "proxy_wasm initializing \"%V\" filter " "(config size: %d, filter: %p, filter->id: %ui)", filter->name, filter->config.len, filter, filter->id); - filter->abi_version = ngx_proxy_wasm_abi_version(filter); + filter->abi_version = get_abi_version(filter); switch (filter->abi_version) { case NGX_PROXY_WASM_0_1_0: @@ -961,12 +1405,11 @@ ngx_proxy_wasm_init_abi(ngx_proxy_wasm_filter_t *filter) /* malloc */ - filter->proxy_on_memory_allocate = - ngx_proxy_wasm_lookup_func(filter, "malloc"); + filter->proxy_on_memory_allocate = get_func(filter, "malloc"); if (filter->proxy_on_memory_allocate == NULL) { filter->proxy_on_memory_allocate = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_memory_allocate"); + get_func(filter, "proxy_on_memory_allocate"); if (filter->proxy_on_memory_allocate == NULL) { filter->ecode = NGX_PROXY_WASM_ERR_BAD_MODULE_INTERFACE; @@ -981,408 +1424,252 @@ ngx_proxy_wasm_init_abi(ngx_proxy_wasm_filter_t *filter) /* context */ filter->proxy_on_context_create = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_context_create"); + get_func(filter, "proxy_on_context_create"); filter->proxy_on_context_finalize = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_context_finalize"); + get_func(filter, "proxy_on_context_finalize"); if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { /* 0.1.0 - 0.2.1 */ filter->proxy_on_done = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_done"); + get_func(filter, "proxy_on_done"); filter->proxy_on_log = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_log"); + get_func(filter, "proxy_on_log"); filter->proxy_on_context_finalize = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_delete"); + get_func(filter, "proxy_on_delete"); } /* configuration */ filter->proxy_on_vm_start = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_vm_start"); + get_func(filter, "proxy_on_vm_start"); filter->proxy_on_plugin_start = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_plugin_start"); + get_func(filter, "proxy_on_plugin_start"); if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { /* 0.1.0 - 0.2.1 */ filter->proxy_on_plugin_start = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_configure"); + get_func(filter, "proxy_on_configure"); } /* stream */ filter->proxy_on_new_connection = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_new_connection"); + get_func(filter, "proxy_on_new_connection"); filter->proxy_on_downstream_data = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_downstream_data"); + get_func(filter, "proxy_on_downstream_data"); filter->proxy_on_upstream_data = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_upstream_data"); + get_func(filter, "proxy_on_upstream_data"); filter->proxy_on_downstream_close = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_downstream_close"); + get_func(filter, "proxy_on_downstream_close"); filter->proxy_on_upstream_close = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_upstream_close"); + get_func(filter, "proxy_on_upstream_close"); if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { /* 0.1.0 - 0.2.1 */ filter->proxy_on_downstream_close = - ngx_proxy_wasm_lookup_func( - filter, "proxy_on_downstream_connection_close"); + get_func(filter, "proxy_on_downstream_connection_close"); filter->proxy_on_upstream_close = - ngx_proxy_wasm_lookup_func( - filter, "proxy_on_upstream_connection_close"); + get_func(filter, "proxy_on_upstream_connection_close"); } /* http */ filter->proxy_on_http_request_headers = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_request_headers"); + get_func(filter, "proxy_on_http_request_headers"); filter->proxy_on_http_request_body = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_request_body"); + get_func(filter, "proxy_on_http_request_body"); filter->proxy_on_http_request_trailers = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_request_trailers"); + get_func(filter, "proxy_on_http_request_trailers"); filter->proxy_on_http_request_metadata = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_request_metadata"); + get_func(filter, "proxy_on_http_request_metadata"); filter->proxy_on_http_response_headers = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_response_headers"); + get_func(filter, "proxy_on_http_response_headers"); filter->proxy_on_http_response_body = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_response_body"); + get_func(filter, "proxy_on_http_response_body"); filter->proxy_on_http_response_trailers = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_response_trailers"); + get_func(filter, "proxy_on_http_response_trailers"); filter->proxy_on_http_response_metadata = - ngx_proxy_wasm_lookup_func(filter, - "proxy_on_http_response_metadata"); + get_func(filter, "proxy_on_http_response_metadata"); if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { /* 0.1.0 - 0.2.1 */ filter->proxy_on_http_request_headers = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_request_headers"); + get_func(filter, "proxy_on_request_headers"); filter->proxy_on_http_request_body = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_request_body"); + get_func(filter, "proxy_on_request_body"); filter->proxy_on_http_request_trailers = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_request_trailers"); + get_func(filter, "proxy_on_request_trailers"); filter->proxy_on_http_request_metadata = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_request_metadata"); + get_func(filter, "proxy_on_request_metadata"); filter->proxy_on_http_response_headers = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_response_headers"); + get_func(filter, "proxy_on_response_headers"); filter->proxy_on_http_response_body = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_response_body"); + get_func(filter, "proxy_on_response_body"); filter->proxy_on_http_response_trailers = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_response_trailers"); + get_func(filter, "proxy_on_response_trailers"); filter->proxy_on_http_response_metadata = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_response_metadata"); + get_func(filter, "proxy_on_response_metadata"); } /* shared queue */ filter->proxy_on_queue_ready = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_queue_ready"); + get_func(filter, "proxy_on_queue_ready"); /* timers */ filter->proxy_create_timer = - ngx_proxy_wasm_lookup_func(filter, "proxy_create_timer"); + get_func(filter, "proxy_create_timer"); filter->proxy_delete_timer = - ngx_proxy_wasm_lookup_func(filter, "proxy_delete_timer"); + get_func(filter, "proxy_delete_timer"); filter->proxy_on_timer_ready = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_timer_ready"); + get_func(filter, "proxy_on_timer_ready"); if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { /* 0.1.0 - 0.2.1 */ filter->proxy_on_timer_ready = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_tick"); + get_func(filter, "proxy_on_tick"); } /* http callouts */ filter->proxy_on_http_call_response = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_http_call_response"); + get_func(filter, "proxy_on_http_call_response"); /* grpc callouts */ filter->proxy_on_grpc_call_response_header_metadata = - ngx_proxy_wasm_lookup_func( - filter, "proxy_on_grpc_call_response_header_metadata"); + get_func(filter, "proxy_on_grpc_call_response_header_metadata"); filter->proxy_on_grpc_call_response_message = - ngx_proxy_wasm_lookup_func( - filter, "proxy_on_grpc_call_response_message"); + get_func(filter, "proxy_on_grpc_call_response_message"); filter->proxy_on_grpc_call_response_trailer_metadata = - ngx_proxy_wasm_lookup_func( - filter, "proxy_on_grpc_call_response_trailer_metadata"); + get_func(filter, "proxy_on_grpc_call_response_trailer_metadata"); filter->proxy_on_grpc_call_close = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_grpc_call_close"); + get_func(filter, "proxy_on_grpc_call_close"); /* custom extensions */ filter->proxy_on_custom_callback = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_custom_callback"); + get_func(filter, "proxy_on_custom_callback"); if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { /* 0.2.0 - 0.2.1 */ filter->proxy_on_custom_callback = - ngx_proxy_wasm_lookup_func(filter, "proxy_on_foreign_function"); - } - - /* validate */ - - if (filter->proxy_on_context_create == NULL - || filter->proxy_on_vm_start == NULL - || filter->proxy_on_plugin_start == NULL) - { - filter->ecode = NGX_PROXY_WASM_ERR_BAD_MODULE_INTERFACE; - - ngx_proxy_wasm_log_error(NGX_LOG_EMERG, filter->log, filter->ecode, - "\"%V\" filter missing one of: " - "on_context_create, on_vm_start, " - "on_plugin_start", filter->name); - return NGX_ERROR; - } - - return NGX_OK; -} - - -static ngx_int_t -ngx_proxy_wasm_start_filter(ngx_proxy_wasm_filter_t *filter) -{ - ngx_proxy_wasm_instance_t *ictx; - - ngx_wasm_assert(filter->loaded); - - if (filter->ecode) { - return NGX_ERROR; - } - - if (filter->started) { - return NGX_OK; - } - - ictx = ngx_proxy_wasm_get_instance(filter, filter->store, NULL, - filter->log); - if (ictx == NULL) { - return NGX_ERROR; + get_func(filter, "proxy_on_foreign_function"); } - filter->started = 1; - - return NGX_OK; -} - - -ngx_proxy_wasm_instance_t * -ngx_proxy_wasm_get_instance(ngx_proxy_wasm_filter_t *filter, - ngx_proxy_wasm_store_t *store, ngx_proxy_wasm_exec_t *pwexec, - ngx_log_t *log) -{ - ngx_queue_t *q; - ngx_pool_t *pool; - ngx_wavm_module_t *module = filter->module; - ngx_proxy_wasm_instance_t *ictx; - ngx_proxy_wasm_err_e ecode; - - dd("enter (pwexec: %p)", pwexec); - - if (store == NULL) { - dd("no store, jump to create"); - pool = filter->pool; - goto create; - } - - pool = store->pool; - - dd("lookup instance in store: %p", store); - - for (q = ngx_queue_head(&store->busy); - q != ngx_queue_sentinel(&store->busy); - q = ngx_queue_next(q)) - { - ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - - if (ictx->instance->trapped) { - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, - "\"%V\" filter freeing trapped instance " - "(ictx: %p, store: %p)", - filter->name, ictx, store); - q = ngx_queue_next(&ictx->q); - ngx_proxy_wasm_release_instance(ictx, 1); - continue; - } - - if (ictx->module == filter->module) { - dd("reuse busy instance"); - ngx_wasm_assert(ictx->nrefs); - goto reuse; - } - } - - for (q = ngx_queue_head(&store->free); - q != ngx_queue_sentinel(&store->free); - q = ngx_queue_next(q)) - { - ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - - if (ictx->instance->trapped) { - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, - "\"%V\" filter freeing trapped instance " - "(ictx: %p, store: %p)", - filter->name, ictx, store); - - q = ngx_queue_next(&ictx->q); - ngx_proxy_wasm_release_instance(ictx, 1); - continue; - } - - if (ictx->module == filter->module) { - dd("reuse free instance"); - ngx_wasm_assert(ictx->nrefs == 0); - ngx_queue_remove(&ictx->q); - goto reuse; - } - } - -create: - - dd("create instance in store: %p", store); - - ictx = ngx_pcalloc(pool, sizeof(ngx_proxy_wasm_instance_t)); - if (ictx == NULL) { - goto error; - } - - ictx->next_id = 1; - ictx->pool = pool; - ictx->log = log; - ictx->store = store; - ictx->module = module; - - ngx_rbtree_init(&ictx->root_ctxs, &ictx->sentinel_root_ctxs, - ngx_rbtree_insert_value); + /* validate */ - ngx_rbtree_init(&ictx->tree_ctxs, &ictx->sentinel_ctxs, - ngx_rbtree_insert_value); + if (filter->proxy_on_context_create == NULL + || filter->proxy_on_vm_start == NULL + || filter->proxy_on_plugin_start == NULL) + { + filter->ecode = NGX_PROXY_WASM_ERR_BAD_MODULE_INTERFACE; - ictx->instance = ngx_wavm_instance_create(module, pool, log, ictx); - if (ictx->instance == NULL) { - ngx_pfree(pool, ictx); - goto error; + ngx_proxy_wasm_log_error(NGX_LOG_EMERG, filter->log, filter->ecode, + "\"%V\" filter missing one of: " + "on_context_create, on_vm_start, " + "on_plugin_start", filter->name); + return NGX_ERROR; } - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, - "\"%V\" filter new instance (ictx: %p, store: %p)", - filter->name, ictx, store); + return NGX_OK; +} - goto done; -reuse: +static ngx_int_t +ngx_proxy_wasm_filter_start(ngx_proxy_wasm_filter_t *filter) +{ + ngx_proxy_wasm_err_e ecode; - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, log, 0, - "\"%V\" filter reusing instance " - "(ictx: %p, nrefs: %d, store: %p)", - filter->name, ictx, ictx->nrefs + 1, store); + ngx_wasm_assert(filter->loaded); - goto done; + if (filter->ecode) { + return NGX_ERROR; + } -done: + if (filter->started) { + return NGX_OK; + } - if (store && !ictx->nrefs) { - ngx_queue_insert_tail(&store->busy, &ictx->q); + ecode = ngx_proxy_wasm_create_context(filter, NULL, 0, NULL, NULL); + if (ecode != NGX_PROXY_WASM_ERR_NONE) { + return NGX_ERROR; } - if (pwexec) { - if (pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID) { - ngx_wasm_assert(pwexec->id == 0); - pwexec->id = ictx->next_id++; - } + filter->started = 1; - if (pwexec->ecode) { - /* recycled instance */ - pwexec->ecode = NGX_PROXY_WASM_ERR_NONE; - pwexec->ecode_logged = 0; - } - } + return NGX_OK; +} - ictx->nrefs++; - ictx->pwexec = pwexec; +static void +ngx_proxy_wasm_instance_update(ngx_proxy_wasm_instance_t *ictx, + ngx_proxy_wasm_exec_t *pwexec) +{ /** - * update instance log + * Update instance * FFI-injected filters have a valid log while the instance's * might be outdated. */ - ngx_wavm_instance_set_data(ictx->instance, ictx, log); + ictx->pwexec = pwexec; - /* create wasm context (root/http) */ + if (pwexec) { + ngx_wavm_instance_set_data(ictx->instance, ictx, pwexec->log); + } +} - ecode = ngx_proxy_wasm_on_start(ictx, filter, pwexec == NULL); - if (ecode != NGX_PROXY_WASM_ERR_NONE) { - if (pwexec) { - pwexec->ecode = ecode; - } else { - filter->ecode = ecode; - } +static void +ngx_proxy_wasm_instance_invalidate(ngx_proxy_wasm_instance_t *ictx) +{ + ngx_rbtree_node_t **root, **sentinel, *s, *n; + ngx_proxy_wasm_exec_t *pwexec = NULL; - goto error; - } + dd("enter (ictx: %p)", ictx); - return ictx; + /* root context */ -error: + root = &ictx->root_ctxs.root; + s = &ictx->sentinel_root_ctxs; + sentinel = &s; - return NULL; -} + while (*root != *sentinel) { + n = ngx_rbtree_min(*root, *sentinel); + pwexec = ngx_rbtree_data(n, ngx_proxy_wasm_exec_t, node); + dd("invalidate root ctx #%ld instance", pwexec->id); -void -ngx_proxy_wasm_release_instance(ngx_proxy_wasm_instance_t *ictx, - unsigned sweep) -{ - ngx_queue_t *q; + ngx_rbtree_delete(&ictx->root_ctxs, n); - if (sweep) { - ictx->nrefs = 0; + destroy_pwexec(pwexec); - } else if (ictx->nrefs) { - ictx->nrefs--; + ngx_pfree(pwexec->pool, pwexec); } - dd("ictx: %p (nrefs: %ld, sweep: %d, trapped: %d)", - ictx, ictx->nrefs, sweep, ictx->instance->trapped); - - if (ictx->nrefs == 0) { - if (ictx->store) { - dd("remove from busy"); - ngx_queue_remove(&ictx->q); /* remove from busy/free */ + /* stream context */ - if (sweep || ictx->instance->trapped) { - dd("insert in sweep"); - ngx_queue_insert_tail(&ictx->store->sweep, &ictx->q); + root = &ictx->tree_ctxs.root; + s = &ictx->sentinel_ctxs; + sentinel = &s; - } else { - dd("insert in free"); - ngx_queue_insert_tail(&ictx->store->free, &ictx->q); - } + while (*root != *sentinel) { + n = ngx_rbtree_min(*root, *sentinel); + pwexec = ngx_rbtree_data(n, ngx_proxy_wasm_exec_t, node); - while (!ngx_queue_empty(&ictx->store->sweep)) { - dd("sweep (destroy)"); - q = ngx_queue_head(&ictx->store->sweep); - ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); + dd("invalidate ctx #%ld instance", pwexec->id); - ngx_queue_remove(&ictx->q); - ngx_proxy_wasm_instance_destroy(ictx); - } + ngx_rbtree_delete(&ictx->tree_ctxs, n); - } else { - dd("destroy"); - ngx_proxy_wasm_instance_destroy(ictx); - } + destroy_pwexec(pwexec); } + /* remove from busy/free, schedule for sweeping */ + + ngx_queue_remove(&ictx->q); + + ngx_queue_insert_tail(&ictx->store->sweep, &ictx->q); + dd("exit"); } @@ -1422,11 +1709,26 @@ ngx_proxy_wasm_instance_destroy(ngx_proxy_wasm_instance_t *ictx) ngx_pfree(rexec->pool, rexec->parent); } + rexec->ictx = NULL; + ngx_rbtree_delete(&ictx->root_ctxs, n); ngx_pfree(rexec->pool, rexec); } + root = &ictx->tree_ctxs.root; + s = &ictx->sentinel_ctxs; + sentinel = &s; + + while (*root != *sentinel) { + n = ngx_rbtree_min(*root, *sentinel); + rexec = ngx_rbtree_data(n, ngx_proxy_wasm_exec_t, node); + + rexec->ictx = NULL; + + ngx_rbtree_delete(&ictx->tree_ctxs, n); + } + ngx_wavm_instance_destroy(ictx->instance); ngx_pfree(ictx->pool, ictx); @@ -1435,198 +1737,80 @@ ngx_proxy_wasm_instance_destroy(ngx_proxy_wasm_instance_t *ictx) } -static ngx_proxy_wasm_err_e -ngx_proxy_wasm_on_start(ngx_proxy_wasm_instance_t *ictx, - ngx_proxy_wasm_filter_t *filter, unsigned start) +static void +ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store) { - ngx_int_t rc; - ngx_log_t *log; - ngx_proxy_wasm_exec_t *rexec, *pwexec = ictx->pwexec; - ngx_wavm_instance_t *instance = ictx->instance; - wasm_val_vec_t *rets; - - dd("enter (pwexec: %p, ictx: %p)", pwexec, ictx); - - rexec = ngx_proxy_wasm_root_lookup(ictx, filter->id); - if (rexec == NULL) { - rexec = ngx_pcalloc(ictx->pool, sizeof(ngx_proxy_wasm_exec_t)); - if (rexec == NULL) { - return NGX_PROXY_WASM_ERR_START_FAILED; - } - - rexec->root_id = NGX_PROXY_WASM_ROOT_CTX_ID; - rexec->id = filter->id; - rexec->pool = ictx->pool; - rexec->log = filter->log; - rexec->filter = filter; - rexec->ictx = ictx; - - log = filter->log; - - rexec->log = ngx_pcalloc(rexec->pool, sizeof(ngx_log_t)); - if (rexec->log == NULL) { - return NGX_PROXY_WASM_ERR_START_FAILED; - } - - rexec->log->file = log->file; - rexec->log->next = log->next; - rexec->log->writer = log->writer; - rexec->log->wdata = log->wdata; - rexec->log->log_level = log->log_level; - rexec->log->handler = ngx_proxy_wasm_log_error_handler; - rexec->log->data = &rexec->log_ctx; - - rexec->log_ctx.pwexec = rexec; - rexec->log_ctx.orig_log = log; + ngx_queue_t *q; + ngx_proxy_wasm_instance_t *ictx; - rexec->parent = ngx_pcalloc(rexec->pool, sizeof(ngx_proxy_wasm_ctx_t)); - if (rexec->parent == NULL) { - return NGX_PROXY_WASM_ERR_START_FAILED; - } + dd("enter"); - rexec->parent->id = NGX_PROXY_WASM_ROOT_CTX_ID; - rexec->parent->pool = rexec->pool; - rexec->parent->log = rexec->log; + while (!ngx_queue_empty(&store->busy)) { + dd("remove busy"); + q = ngx_queue_head(&store->busy); + ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - rexec->node.key = rexec->id; - ngx_rbtree_insert(&ictx->root_ctxs, &rexec->node); + ngx_queue_remove(&ictx->q); + ngx_queue_insert_tail(&store->free, &ictx->q); } - ictx->pwexec = rexec; /* set instance current pwexec */ - - if (!rexec->started) { - dd("start root exec ctx (rexec: %p, root_id: %ld, id: %ld, ictx: %p)", - rexec, rexec->root_id, rexec->id, ictx); - - rc = ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_context_create, - NULL, - rexec->id, rexec->root_id); - if (rc != NGX_OK) { - return NGX_PROXY_WASM_ERR_START_FAILED; - } - - if (start) { - rc = ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_vm_start, - &rets, - rexec->id, 0); - if (rc != NGX_OK || !rets->data[0].of.i32) { - return NGX_PROXY_WASM_ERR_VM_START_FAILED; - } - } - - rc = ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_plugin_start, - &rets, - rexec->id, filter->config.len); - if (rc != NGX_OK || !rets->data[0].of.i32) { - return NGX_PROXY_WASM_ERR_CONFIGURE_FAILED; - } + while (!ngx_queue_empty(&store->free)) { + dd("remove free"); + q = ngx_queue_head(&store->free); + ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - rexec->started = 1; + ngx_queue_remove(&ictx->q); + ngx_queue_insert_tail(&store->sweep, &ictx->q); } - ictx->pwexec = pwexec; /* set instance current pwexec */ - - if (pwexec && pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID - && ngx_proxy_wasm_ctx_lookup(ictx, pwexec->id) == NULL) - { - dd("start exec ctx (pwexec: %p, root_id: %ld, id: %ld, ictx: %p)", - pwexec, pwexec->root_id, pwexec->id, ictx); - - rc = ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_context_create, - NULL, - pwexec->id, pwexec->root_id); - if (rc != NGX_OK) { - return NGX_PROXY_WASM_ERR_START_FAILED; - } - - pwexec->node.key = pwexec->id; - ngx_rbtree_insert(&ictx->tree_ctxs, &pwexec->node); - } + ngx_proxy_wasm_store_sweep(store); - return NGX_PROXY_WASM_ERR_NONE; + dd("exit"); } static void -ngx_proxy_wasm_on_log(ngx_proxy_wasm_exec_t *pwexec) +ngx_proxy_wasm_store_sweep(ngx_proxy_wasm_store_t *store) { - ngx_proxy_wasm_filter_t *filter = pwexec->filter; - ngx_wavm_instance_t *instance = ngx_proxy_wasm_pwexec2instance(pwexec); + ngx_queue_t *q; + ngx_proxy_wasm_instance_t *ictx; - if (filter->abi_version < NGX_PROXY_WASM_VNEXT) { - /* 0.1.0 - 0.2.1 */ - (void) ngx_wavm_instance_call_funcref(instance, filter->proxy_on_done, - NULL, pwexec->id); - } + while (!ngx_queue_empty(&store->sweep)) { + q = ngx_queue_head(&store->sweep); + ictx = ngx_queue_data(q, ngx_proxy_wasm_instance_t, q); - (void) ngx_wavm_instance_call_funcref(instance, filter->proxy_on_log, - NULL, pwexec->id); + dd("sweep (ictx: %p)", ictx); + + ngx_queue_remove(&ictx->q); + ngx_proxy_wasm_instance_destroy(ictx); + } } +#if 0 static void -ngx_proxy_wasm_on_done(ngx_proxy_wasm_exec_t *pwexec) +ngx_proxy_wasm_store_schedule_sweep_handler(ngx_event_t *ev) { - ngx_wavm_instance_t *instance; - ngx_proxy_wasm_filter_t *filter = pwexec->filter; -#if 1 -#ifdef NGX_WASM_HTTP - ngx_http_proxy_wasm_dispatch_t *call; -#endif -#endif - - instance = ngx_proxy_wasm_pwexec2instance(pwexec); - - ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwexec->log, 0, - "filter %l/%l finalizing context", - pwexec->index + 1, pwexec->parent->nfilters); - -#if 1 -#ifdef NGX_WASM_HTTP - call = pwexec->call; - if (call) { - ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwexec->log, 0, - "proxy_wasm \"%V\" filter (%l/%l) " - "cancelling HTTP dispatch", - pwexec->filter->name, pwexec->index + 1, - pwexec->parent->nfilters); - - ngx_http_proxy_wasm_dispatch_destroy(call); - - pwexec->call = NULL; - } -#endif -#endif - - (void) ngx_wavm_instance_call_funcref(instance, - filter->proxy_on_context_finalize, - NULL, pwexec->id); + ngx_proxy_wasm_store_t *store = ev->data; - ngx_rbtree_delete(&pwexec->ictx->tree_ctxs, &pwexec->node); + ngx_proxy_wasm_store_sweep(store); } -static ngx_int_t -ngx_proxy_wasm_on_tick(ngx_proxy_wasm_exec_t *pwexec) +static void +ngx_proxy_wasm_store_schedule_sweep(ngx_proxy_wasm_store_t *store) { - ngx_int_t rc; - wasm_val_vec_t args; - ngx_proxy_wasm_filter_t *filter = pwexec->filter; - - ngx_wasm_assert(pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID); - - wasm_val_vec_new_uninitialized(&args, 1); - ngx_wasm_vec_set_i32(&args, 0, pwexec->id); + ngx_event_t *ev; - rc = ngx_wavm_instance_call_funcref_vec(pwexec->ictx->instance, - filter->proxy_on_timer_ready, - NULL, &args); + ev = ngx_calloc(sizeof(ngx_event_t), store->pool->log); + if (ev == NULL) { + return; + } - wasm_val_vec_delete(&args); + ev->handler = ngx_proxy_wasm_store_schedule_sweep_handler; + ev->data = store; + ev->log = store->pool->log; - return rc; + ngx_post_event(ev, &ngx_posted_events); } +#endif diff --git a/src/common/proxy_wasm/ngx_proxy_wasm.h b/src/common/proxy_wasm/ngx_proxy_wasm.h index 0bc7f19c4..35115c6e1 100644 --- a/src/common/proxy_wasm/ngx_proxy_wasm.h +++ b/src/common/proxy_wasm/ngx_proxy_wasm.h @@ -8,12 +8,6 @@ #define NGX_PROXY_WASM_ROOT_CTX_ID 0 -#define ngx_proxy_wasm_store_init(s, p) \ - (s)->pool = (p); \ - ngx_queue_init(&(s)->sweep); \ - ngx_queue_init(&(s)->free); \ - ngx_queue_init(&(s)->busy) - typedef enum { NGX_PROXY_WASM_0_1_0 = 0, @@ -195,6 +189,7 @@ struct ngx_proxy_wasm_exec_s { ngx_proxy_wasm_ctx_t *parent; ngx_proxy_wasm_filter_t *filter; ngx_proxy_wasm_instance_t *ictx; + ngx_proxy_wasm_store_t *store; ngx_event_t *ev; #ifdef NGX_WASM_HTTP ngx_http_proxy_wasm_dispatch_t *call; @@ -203,8 +198,8 @@ struct ngx_proxy_wasm_exec_s { /* flags */ unsigned started:1; - unsigned ecode_logged:1; unsigned in_tick:1; + unsigned ecode_logged:1; }; @@ -259,9 +254,7 @@ struct ngx_proxy_wasm_ctx_s { struct ngx_proxy_wasm_instance_s { - ngx_uint_t next_id; - ngx_uint_t nrefs; - ngx_queue_t q; + ngx_queue_t q; /* store busy/free/sweep */ ngx_rbtree_t tree_ctxs; ngx_rbtree_t root_ctxs; ngx_rbtree_node_t sentinel_ctxs; @@ -274,7 +267,7 @@ struct ngx_proxy_wasm_instance_s { /* swap */ - ngx_proxy_wasm_exec_t *pwexec; + ngx_proxy_wasm_exec_t *pwexec; /* current pwexec */ }; @@ -379,16 +372,14 @@ struct ngx_proxy_wasm_filter_s { }; -/* root */ -void ngx_proxy_wasm_init(ngx_conf_t *cf); -void ngx_proxy_wasm_exit(); -ngx_uint_t ngx_proxy_wasm_id(ngx_str_t *name, ngx_str_t *config, - uintptr_t data); +/* root context */ +void ngx_proxy_wasm_init(ngx_conf_t *cf, ngx_proxy_wasm_store_t *gstore); +void ngx_proxy_wasm_exit(ngx_proxy_wasm_store_t *gstore); ngx_int_t ngx_proxy_wasm_load(ngx_proxy_wasm_filter_t *filter, ngx_log_t *log); ngx_int_t ngx_proxy_wasm_start(ngx_cycle_t *cycle); -/* ctx/store */ +/* stream context */ ngx_proxy_wasm_ctx_t *ngx_proxy_wasm_ctx_alloc(ngx_pool_t *pool); ngx_proxy_wasm_ctx_t *ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, size_t nfilters, ngx_uint_t isolation, ngx_proxy_wasm_subsystem_t *subsys, @@ -396,13 +387,12 @@ ngx_proxy_wasm_ctx_t *ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids, void ngx_proxy_wasm_ctx_destroy(ngx_proxy_wasm_ctx_t *pwctx); ngx_int_t ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx, ngx_wasm_phase_t *phase, ngx_proxy_wasm_step_e step); +ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, + ngx_proxy_wasm_step_e step); + + +/* host handlers */ ngx_wavm_ptr_t ngx_proxy_wasm_alloc(ngx_proxy_wasm_exec_t *pwexec, size_t size); -void ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store); -ngx_proxy_wasm_instance_t *ngx_proxy_wasm_get_instance( - ngx_proxy_wasm_filter_t *filter, ngx_proxy_wasm_store_t *store, - ngx_proxy_wasm_exec_t *pwexec, ngx_log_t *log); -void ngx_proxy_wasm_release_instance(ngx_proxy_wasm_instance_t *ictx, - unsigned sweep); /* utils */ @@ -418,8 +408,6 @@ ngx_int_t ngx_proxy_wasm_pairs_unmarshal(ngx_proxy_wasm_exec_t *pwexec, unsigned ngx_proxy_wasm_marshal(ngx_proxy_wasm_exec_t *pwexec, ngx_list_t *list, ngx_array_t *extras, ngx_wavm_ptr_t *out, uint32_t *out_size, ngx_uint_t *truncated); -ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec, - ngx_proxy_wasm_instance_t *ictx, ngx_proxy_wasm_step_e step); static ngx_inline void diff --git a/src/common/proxy_wasm/ngx_proxy_wasm_util.c b/src/common/proxy_wasm/ngx_proxy_wasm_util.c index 361793192..5f65f31ff 100644 --- a/src/common/proxy_wasm/ngx_proxy_wasm_util.c +++ b/src/common/proxy_wasm/ngx_proxy_wasm_util.c @@ -193,14 +193,13 @@ ngx_proxy_wasm_log_error(ngx_uint_t level, ngx_log_t *log, void ngx_proxy_wasm_filter_tick_handler(ngx_event_t *ev) { - ngx_log_t *log = ev->log; - ngx_proxy_wasm_exec_t *pwexec = ev->data; + ngx_proxy_wasm_err_e ecode; + ngx_log_t *log = ev->log; + ngx_proxy_wasm_exec_t *pwexec = ev->data; + ngx_proxy_wasm_filter_t *filter = pwexec->filter; #ifdef NGX_WASM_HTTP - ngx_proxy_wasm_ctx_t *pwctx = pwexec->parent; + ngx_proxy_wasm_ctx_t *pwctx = pwexec->parent; #endif - ngx_proxy_wasm_filter_t *filter = pwexec->filter; - ngx_proxy_wasm_instance_t *ictx; - ngx_proxy_wasm_err_e ecode; dd("enter"); @@ -214,31 +213,16 @@ ngx_proxy_wasm_filter_tick_handler(ngx_event_t *ev) return; } - ictx = ngx_proxy_wasm_get_instance(filter, filter->store, pwexec, - filter->log); - if (ictx == NULL) { - ngx_wasm_log_error(NGX_LOG_ERR, log, 0, - "tick_handler: no instance"); - return; - } - #ifdef NGX_WASM_HTTP pwctx->phase = ngx_wasm_phase_lookup(&ngx_http_wasm_subsystem, NGX_WASM_BACKGROUND_PHASE); #endif - pwexec->in_tick = 1; - - ecode = ngx_proxy_wasm_run_step(pwexec, ictx, NGX_PROXY_WASM_STEP_TICK); - - ngx_proxy_wasm_release_instance(ictx, 0); + ecode = ngx_proxy_wasm_run_step(pwexec, NGX_PROXY_WASM_STEP_TICK); if (ecode != NGX_PROXY_WASM_ERR_NONE) { - pwexec->ecode = ecode; return; } - pwexec->in_tick = 0; - if (!ngx_exiting) { pwexec->ev = ngx_calloc(sizeof(ngx_event_t), log); if (pwexec->ev == NULL) { diff --git a/src/http/ngx_http_wasm_module.c b/src/http/ngx_http_wasm_module.c index cb4f6c29d..e0357dc21 100644 --- a/src/http/ngx_http_wasm_module.c +++ b/src/http/ngx_http_wasm_module.c @@ -265,6 +265,7 @@ ngx_http_wasm_create_main_conf(ngx_conf_t *cf) mcf->vm = ngx_wasm_main_vm(cf->cycle); ngx_queue_init(&mcf->plans); + ngx_proxy_wasm_init(cf, &mcf->store); return mcf; } @@ -281,9 +282,6 @@ ngx_http_wasm_init_main_conf(ngx_conf_t *cf, void *conf) return NGX_CONF_ERROR; } - ngx_proxy_wasm_init(cf); - ngx_proxy_wasm_store_init(&mcf->store, cf->pool); - return NGX_CONF_OK; } @@ -445,8 +443,7 @@ ngx_http_wasm_exit_process(ngx_cycle_t *cycle) mcf = ngx_http_cycle_get_module_main_conf(cycle, ngx_http_wasm_module); if (mcf) { - ngx_proxy_wasm_exit(); - ngx_proxy_wasm_store_destroy(&mcf->store); + ngx_proxy_wasm_exit(&mcf->store); ngx_wasm_ops_destroy(mcf->ops); } diff --git a/src/http/ngx_http_wasm_util.c b/src/http/ngx_http_wasm_util.c index 15123c374..c3841f4bc 100644 --- a/src/http/ngx_http_wasm_util.c +++ b/src/http/ngx_http_wasm_util.c @@ -411,8 +411,8 @@ ngx_http_wasm_ops_add_filter(ngx_wasm_ops_plan_t *plan, goto error; } - filter->pool = plan->pool; filter->log = vm->log; + filter->pool = store->pool; filter->store = store; if (config) { diff --git a/src/http/proxy_wasm/ngx_http_proxy_wasm.h b/src/http/proxy_wasm/ngx_http_proxy_wasm.h index 3392a93ad..37a9a052f 100644 --- a/src/http/proxy_wasm/ngx_http_proxy_wasm.h +++ b/src/http/proxy_wasm/ngx_http_proxy_wasm.h @@ -19,7 +19,7 @@ ngx_http_proxy_wasm_get_rctx(ngx_wavm_instance_t *instance) pwexec = ngx_proxy_wasm_instance2pwexec(instance); pwctx = pwexec->parent; - if (!pwctx) { + if (pwctx == NULL) { return NULL; } diff --git a/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c b/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c index d08f240bb..7ff9eb3c2 100644 --- a/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c +++ b/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c @@ -821,7 +821,7 @@ ngx_http_proxy_wasm_dispatch_resume_handler(ngx_wasm_socket_tcp_t *sock) */ pwexec->call = call; - ecode = ngx_proxy_wasm_run_step(pwexec, pwexec->ictx, + ecode = ngx_proxy_wasm_run_step(pwexec, NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE); if (ecode != NGX_PROXY_WASM_ERR_NONE) { goto error; diff --git a/src/wasm/vm/ngx_wavm.c b/src/wasm/vm/ngx_wavm.c index ade694c87..643b7ea24 100644 --- a/src/wasm/vm/ngx_wavm.c +++ b/src/wasm/vm/ngx_wavm.c @@ -1345,12 +1345,12 @@ ngx_wavm_instance_destroy(ngx_wavm_instance_t *instance) ngx_wavm_func_t *func; ngx_wavm_module_t *module = instance->module; - ngx_log_debug5(NGX_LOG_DEBUG_WASM, + ngx_log_debug6(NGX_LOG_DEBUG_WASM, instance->log ? instance->log : ngx_cycle->log, 0, "wasm freeing \"%V\" instance in \"%V\" vm" - " (vm: %p, module: %p, instance: %p)", + " (vm: %p, module: %p, instance: %p, trapped: %d)", &module->name, module->vm->name, - module->vm, module, instance); + module->vm, module, instance, instance->trapped); if (instance->funcs.nelts) { for (i = 0; i < instance->funcs.nelts; i++) { diff --git a/t/03-proxy_wasm/007-on_http_instance_isolation.t b/t/03-proxy_wasm/007-on_http_instance_isolation.t index 7ac729bd6..a58de9c87 100644 --- a/t/03-proxy_wasm/007-on_http_instance_isolation.t +++ b/t/03-proxy_wasm/007-on_http_instance_isolation.t @@ -94,10 +94,8 @@ Should recycle the global instance when trapped. (.*?(Uncaught RuntimeError: )?unreachable|\s*wasm trap: wasm `unreachable` instruction executed)[^#*]* \*\d+ .*? filter chain failed resuming: previous error \(instance trapped\)[^#*]* \*\d+ .*? filter freeing context #\d+ \(1\/2\)[^#*]* -\*\d+ .*? filter freeing context #\d+ \(2\/2\)\Z/, -qr/\A\*\d+ .*? filter freeing trapped instance[^#*]* -\*\d+ wasm freeing "hostcalls" instance in "main" vm[^#*]* -\*\d+ .*? filter new instance[^#*]* +\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*\Z/, +qr/\A\*\d+ .*? filter new instance[^#*]* \*\d+ .*? filter reusing instance[^#*]* \*\d+ .*? filter 1\/2 resuming "on_request_headers" step in "rewrite" phase[^#*]* \*\d+ .*? filter 1\/2 resuming "on_response_headers" step in "header_filter" phase[^#*]* @@ -110,6 +108,7 @@ qr/\A\*\d+ .*? filter freeing trapped instance[^#*]* \*\d+ .*? filter 2\/2 resuming "on_done" step in "done" phase[^#*]* \*\d+ .*? filter 2\/2 finalizing context[^#*]* \*\d+ .*? filter freeing context #\d+ \(1\/2\)[^#*]* +\*\d+ wasm freeing "hostcalls" instance in "main" vm[^#*]* \*\d+ .*? filter freeing context #\d+ \(2\/2\)\Z/] --- no_error_log [emerg] diff --git a/t/03-proxy_wasm/hfuncs/114-proxy_set_http_request_body.t b/t/03-proxy_wasm/hfuncs/114-proxy_set_http_request_body.t index bf147dcf2..1c31f21b1 100644 --- a/t/03-proxy_wasm/hfuncs/114-proxy_set_http_request_body.t +++ b/t/03-proxy_wasm/hfuncs/114-proxy_set_http_request_body.t @@ -235,7 +235,7 @@ HelloWorld } location /log { - # cannot set request body + # not executed (subrequest) proxy_wasm hostcalls 'on=log \ test=/t/set_request_body'; echo $request_body; @@ -258,7 +258,6 @@ from_request_body --- grep_error_log_out eval qr/.*?host trap \(bad usage\): cannot set request body.* \[info\] .*? \*\d+ .*? filter chain failed resuming: previous error \(instance trapped\).*? subrequest: "\/response_headers".* -\[info\] .*? \*\d+ .*? filter chain failed resuming: previous error \(instance trapped\).*? request: "GET \/t HTTP\/1\.1".* \z/ --- no_error_log [alert] diff --git a/t/03-proxy_wasm/hfuncs/119-proxy_properties_get_ngx.t b/t/03-proxy_wasm/hfuncs/119-proxy_properties_get_ngx.t index 28b520b3c..c5225b345 100644 --- a/t/03-proxy_wasm/hfuncs/119-proxy_properties_get_ngx.t +++ b/t/03-proxy_wasm/hfuncs/119-proxy_properties_get_ngx.t @@ -131,13 +131,8 @@ qr/\[info\] .*? property not found: n,/ === TEST 5: proxy_wasm - get_property() ngx.* - not available on: tick (isolation: global) - on_tick runs on the root context, so it does not have access to ngx_http_* calls. - -HTTP 500 since instance recycling happens on next request, and isolation -is global (single instance for root/request). - --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config @@ -146,7 +141,6 @@ is global (single instance for root/request). echo_sleep 0.150; echo ok; } ---- error_code: 500 --- ignore_response_body --- error_log eval [ diff --git a/t/03-proxy_wasm/hfuncs/120-proxy_properties_get_host.t b/t/03-proxy_wasm/hfuncs/120-proxy_properties_get_host.t index 705c2fa3b..aadd5a76c 100644 --- a/t/03-proxy_wasm/hfuncs/120-proxy_properties_get_host.t +++ b/t/03-proxy_wasm/hfuncs/120-proxy_properties_get_host.t @@ -97,7 +97,8 @@ TODO: is this behavior correct? name=wasmx.nonexistent_property'; echo ok; } ---- ignore_response_body +--- response_body +ok --- error_log eval qr/\[info\] .*? property not found: wasmx.nonexistent_property,/ --- no_error_log @@ -129,10 +130,6 @@ qr/\[info\] .*? property not found: was,/ === TEST 4: proxy_wasm - get_property() wasmx - not available on: tick (isolation: global) - -HTTP 500 since instance recycling happens on next request, and isolation -is global (single instance for root/request). - --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config @@ -143,7 +140,6 @@ is global (single instance for root/request). echo_sleep 0.150; echo ok; } ---- error_code: 500 --- ignore_response_body --- error_log eval [ @@ -152,15 +148,10 @@ is global (single instance for root/request). qr/\[crit\] .*? panicked at/, qr/unexpected status: 10/, ] ---- no_error_log -[emerg] === TEST 5: proxy_wasm - get_property() wasmx - not available on: tick (isolation: stream) - -HTTP 200 since the root and request instances are different. - --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config diff --git a/t/03-proxy_wasm/hfuncs/122-proxy_properties_set_host.t b/t/03-proxy_wasm/hfuncs/122-proxy_properties_set_host.t index abe7b0aa0..bfc35f836 100644 --- a/t/03-proxy_wasm/hfuncs/122-proxy_properties_set_host.t +++ b/t/03-proxy_wasm/hfuncs/122-proxy_properties_set_host.t @@ -139,10 +139,8 @@ Does not fail when the property is not found. === TEST 5: proxy_wasm - set_property() wasmx - not available on: tick (isolation: global) - -HTTP 500 since instance recycling happens on next request, and -isolation is global (single instance for root/request). - +on_tick runs on the root context, so it does not have access to +ngx_http_* calls. --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config @@ -157,7 +155,6 @@ isolation is global (single instance for root/request). echo_sleep 0.150; echo ok; } ---- error_code: 500 --- ignore_response_body --- error_log eval [ @@ -172,12 +169,8 @@ isolation is global (single instance for root/request). === TEST 6: proxy_wasm - set_property() wasmx - not available on: tick (isolation: stream) - -HTTP 200 since the root and request instances are different. - on_tick runs on the root context, so it does not have access to ngx_http_* calls. - --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config diff --git a/t/03-proxy_wasm/hfuncs/123-proxy_properties_set_ngx.t b/t/03-proxy_wasm/hfuncs/123-proxy_properties_set_ngx.t index 370854afe..2d8fc6c39 100644 --- a/t/03-proxy_wasm/hfuncs/123-proxy_properties_set_ngx.t +++ b/t/03-proxy_wasm/hfuncs/123-proxy_properties_set_ngx.t @@ -190,13 +190,8 @@ forward the NotFound status back to the caller. === TEST 7: proxy_wasm - set_property() ngx.* - not available on: tick (isolation: global) - on_tick runs on the root context, so it does not have access to ngx_http_* calls. - -HTTP 500 since instance recycling happens on next -request, and isolation is global (single instance for root/request) - --- wasm_modules: hostcalls --- load_nginx_modules: ngx_http_echo_module --- config @@ -211,7 +206,6 @@ request, and isolation is global (single instance for root/request) echo_sleep 0.150; echo ok; } ---- error_code: 500 --- ignore_response_body --- error_log eval [