Skip to content

Commit

Permalink
refactor(proxy-wasm) improve pwexec resurrection and instance lifecycle
Browse files Browse the repository at this point in the history
The main goal of this overhaul is to simplify `on_context_create`, make
it fully re-entrant *and* properly handle instance recycling at the same
time.

The way to do so, in my opinion, was to move `pwexec` creation where
`rexec` already was. In other words, always lookup the context id in the
instance rbtree, and if not found, create it. This means that
surrounding code also needed big overhauls. It also removes the
reference counting poor man's GC of the older implementation. The code
became really ugly by then so I took the time to also review this
module's code structure instead of making a *very* ugly commit.

This new ngx_proxy_wasm.c file should be much easier to read and follow
now.

One change I do not fully like is moving the `next_id` to a global
counter, but we do not have a "global proxy-wasm conf" object yet. I
also started thinking about pre-allocating a number of `pwexecs` (like
`worker_connections`) and use free/busy queue that all filter chains can
dip into to get a context id + context memory zone. Perhaps for a later
time.
  • Loading branch information
thibaultcha committed Oct 4, 2023
1 parent 0609c57 commit bc8649e
Show file tree
Hide file tree
Showing 16 changed files with 1,033 additions and 892 deletions.
1,727 changes: 965 additions & 762 deletions src/common/proxy_wasm/ngx_proxy_wasm.c

Large diffs are not rendered by default.

38 changes: 13 additions & 25 deletions src/common/proxy_wasm/ngx_proxy_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@

#define NGX_PROXY_WASM_ROOT_CTX_ID 0

#define ngx_proxy_wasm_store_init(s, p) \
(s)->pool = (p); \
ngx_queue_init(&(s)->sweep); \
ngx_queue_init(&(s)->free); \
ngx_queue_init(&(s)->busy)


typedef enum {
NGX_PROXY_WASM_0_1_0 = 0,
Expand Down Expand Up @@ -195,6 +189,7 @@ struct ngx_proxy_wasm_exec_s {
ngx_proxy_wasm_ctx_t *parent;
ngx_proxy_wasm_filter_t *filter;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_store_t *store;
ngx_event_t *ev;
#ifdef NGX_WASM_HTTP
ngx_http_proxy_wasm_dispatch_t *call;
Expand All @@ -203,8 +198,8 @@ struct ngx_proxy_wasm_exec_s {
/* flags */

unsigned started:1;
unsigned ecode_logged:1;
unsigned in_tick:1;
unsigned ecode_logged:1;
};


Expand Down Expand Up @@ -259,9 +254,7 @@ struct ngx_proxy_wasm_ctx_s {


struct ngx_proxy_wasm_instance_s {
ngx_uint_t next_id;
ngx_uint_t nrefs;
ngx_queue_t q;
ngx_queue_t q; /* store busy/free/sweep */
ngx_rbtree_t tree_ctxs;
ngx_rbtree_t root_ctxs;
ngx_rbtree_node_t sentinel_ctxs;
Expand All @@ -274,7 +267,7 @@ struct ngx_proxy_wasm_instance_s {

/* swap */

ngx_proxy_wasm_exec_t *pwexec;
ngx_proxy_wasm_exec_t *pwexec; /* current pwexec */
};


Expand Down Expand Up @@ -379,30 +372,27 @@ struct ngx_proxy_wasm_filter_s {
};


/* root */
void ngx_proxy_wasm_init(ngx_conf_t *cf);
void ngx_proxy_wasm_exit();
ngx_uint_t ngx_proxy_wasm_id(ngx_str_t *name, ngx_str_t *config,
uintptr_t data);
/* root context */
void ngx_proxy_wasm_init(ngx_conf_t *cf, ngx_proxy_wasm_store_t *gstore);
void ngx_proxy_wasm_exit(ngx_proxy_wasm_store_t *gstore);
ngx_int_t ngx_proxy_wasm_load(ngx_proxy_wasm_filter_t *filter, ngx_log_t *log);
ngx_int_t ngx_proxy_wasm_start(ngx_cycle_t *cycle);


/* ctx/store */
/* stream context */
ngx_proxy_wasm_ctx_t *ngx_proxy_wasm_ctx_alloc(ngx_pool_t *pool);
ngx_proxy_wasm_ctx_t *ngx_proxy_wasm_ctx(ngx_uint_t *filter_ids,
size_t nfilters, ngx_uint_t isolation, ngx_proxy_wasm_subsystem_t *subsys,
void *data);
void ngx_proxy_wasm_ctx_destroy(ngx_proxy_wasm_ctx_t *pwctx);
ngx_int_t ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx,
ngx_wasm_phase_t *phase, ngx_proxy_wasm_step_e step);
ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
ngx_proxy_wasm_step_e step);


/* host handlers */
ngx_wavm_ptr_t ngx_proxy_wasm_alloc(ngx_proxy_wasm_exec_t *pwexec, size_t size);
void ngx_proxy_wasm_store_destroy(ngx_proxy_wasm_store_t *store);
ngx_proxy_wasm_instance_t *ngx_proxy_wasm_get_instance(
ngx_proxy_wasm_filter_t *filter, ngx_proxy_wasm_store_t *store,
ngx_proxy_wasm_exec_t *pwexec, ngx_log_t *log);
void ngx_proxy_wasm_release_instance(ngx_proxy_wasm_instance_t *ictx,
unsigned sweep);


/* utils */
Expand All @@ -418,8 +408,6 @@ ngx_int_t ngx_proxy_wasm_pairs_unmarshal(ngx_proxy_wasm_exec_t *pwexec,
unsigned ngx_proxy_wasm_marshal(ngx_proxy_wasm_exec_t *pwexec,
ngx_list_t *list, ngx_array_t *extras, ngx_wavm_ptr_t *out,
uint32_t *out_size, ngx_uint_t *truncated);
ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
ngx_proxy_wasm_instance_t *ictx, ngx_proxy_wasm_step_e step);


static ngx_inline void
Expand Down
25 changes: 12 additions & 13 deletions src/common/proxy_wasm/ngx_proxy_wasm_host.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,44 +205,43 @@ ngx_proxy_wasm_hfuncs_set_tick_period(ngx_wavm_instance_t *instance,
{
uint32_t period = args[0].of.i32;
ngx_event_t *ev;
ngx_proxy_wasm_exec_t *pwexec = ngx_proxy_wasm_instance2pwexec(instance);
ngx_proxy_wasm_exec_t *rexec = ngx_proxy_wasm_instance2pwexec(instance);

ngx_wasm_assert(pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID);
ngx_wasm_assert(rexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID);

if (pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID) {
if (rexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID) {
/* ignore */
return ngx_proxy_wasm_result_ok(rets);
}

if (ngx_exiting) {
return ngx_proxy_wasm_result_trap(pwexec, "process exiting", rets,
NGX_WAVM_OK);
return ngx_proxy_wasm_result_trap(rexec, "process exiting",
rets, NGX_WAVM_OK);
}

if (pwexec->tick_period) {
return ngx_proxy_wasm_result_trap(pwexec, "tick_period already set",
if (rexec->tick_period) {
return ngx_proxy_wasm_result_trap(rexec, "tick_period already set",
rets, NGX_WAVM_OK);
}

pwexec->tick_period = period;
rexec->tick_period = period;

ev = ngx_calloc(sizeof(ngx_event_t), instance->log);
if (ev == NULL) {
goto nomem;
}

ev->handler = ngx_proxy_wasm_filter_tick_handler;
ev->data = pwexec;
ev->log = pwexec->log;
ev->data = rexec;
ev->log = rexec->log;

ngx_add_timer(ev, pwexec->tick_period);
ngx_add_timer(ev, rexec->tick_period);

return ngx_proxy_wasm_result_ok(rets);

nomem:

return ngx_proxy_wasm_result_trap(pwexec, "no memory",
rets, NGX_WAVM_ERROR);
return ngx_proxy_wasm_result_trap(rexec, "no memory", rets, NGX_WAVM_ERROR);
}


Expand Down
46 changes: 15 additions & 31 deletions src/common/proxy_wasm/ngx_proxy_wasm_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,65 +193,49 @@ ngx_proxy_wasm_log_error(ngx_uint_t level, ngx_log_t *log,
void
ngx_proxy_wasm_filter_tick_handler(ngx_event_t *ev)
{
ngx_log_t *log = ev->log;
ngx_proxy_wasm_exec_t *pwexec = ev->data;
ngx_proxy_wasm_err_e ecode;
ngx_log_t *log = ev->log;
ngx_proxy_wasm_exec_t *rexec = ev->data;
ngx_proxy_wasm_filter_t *filter = rexec->filter;
#ifdef NGX_WASM_HTTP
ngx_proxy_wasm_ctx_t *pwctx = pwexec->parent;
ngx_proxy_wasm_ctx_t *pwctx = rexec->parent;
#endif
ngx_proxy_wasm_filter_t *filter = pwexec->filter;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_err_e ecode;

dd("enter");

ngx_wasm_assert(pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID);
ngx_wasm_assert(rexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID);

ngx_free(ev);

pwexec->ev = NULL;
rexec->ev = NULL;

if (ngx_exiting || !filter->proxy_on_timer_ready) {
return;
}

ictx = ngx_proxy_wasm_get_instance(filter, filter->store, pwexec,
filter->log);
if (ictx == NULL) {
ngx_wasm_log_error(NGX_LOG_ERR, log, 0,
"tick_handler: no instance");
return;
}

#ifdef NGX_WASM_HTTP
pwctx->phase = ngx_wasm_phase_lookup(&ngx_http_wasm_subsystem,
NGX_WASM_BACKGROUND_PHASE);
#endif
pwexec->in_tick = 1;

ecode = ngx_proxy_wasm_run_step(pwexec, ictx, NGX_PROXY_WASM_STEP_TICK);

ngx_proxy_wasm_release_instance(ictx, 0);

ecode = ngx_proxy_wasm_run_step(rexec, NGX_PROXY_WASM_STEP_TICK);
if (ecode != NGX_PROXY_WASM_ERR_NONE) {
pwexec->ecode = ecode;
return;
}

pwexec->in_tick = 0;

if (!ngx_exiting) {
pwexec->ev = ngx_calloc(sizeof(ngx_event_t), log);
if (pwexec->ev == NULL) {
rexec->ev = ngx_calloc(sizeof(ngx_event_t), log);
if (rexec->ev == NULL) {
goto nomem;
}

pwexec->ev->handler = ngx_proxy_wasm_filter_tick_handler;
pwexec->ev->data = pwexec;
pwexec->ev->log = log;
rexec->ev->handler = ngx_proxy_wasm_filter_tick_handler;
rexec->ev->data = rexec;
rexec->ev->log = log;

dd("scheduling next tick in %ld", pwexec->tick_period);
dd("scheduling next tick in %ld", rexec->tick_period);

ngx_add_timer(pwexec->ev, pwexec->tick_period);
ngx_add_timer(rexec->ev, rexec->tick_period);
}

return;
Expand Down
7 changes: 2 additions & 5 deletions src/http/ngx_http_wasm_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ ngx_http_wasm_create_main_conf(ngx_conf_t *cf)
mcf->vm = ngx_wasm_main_vm(cf->cycle);

ngx_queue_init(&mcf->plans);
ngx_proxy_wasm_init(cf, &mcf->store);

return mcf;
}
Expand All @@ -281,9 +282,6 @@ ngx_http_wasm_init_main_conf(ngx_conf_t *cf, void *conf)
return NGX_CONF_ERROR;
}

ngx_proxy_wasm_init(cf);
ngx_proxy_wasm_store_init(&mcf->store, cf->pool);

return NGX_CONF_OK;
}

Expand Down Expand Up @@ -445,8 +443,7 @@ ngx_http_wasm_exit_process(ngx_cycle_t *cycle)

mcf = ngx_http_cycle_get_module_main_conf(cycle, ngx_http_wasm_module);
if (mcf) {
ngx_proxy_wasm_exit();
ngx_proxy_wasm_store_destroy(&mcf->store);
ngx_proxy_wasm_exit(&mcf->store);

ngx_wasm_ops_destroy(mcf->ops);
}
Expand Down
2 changes: 1 addition & 1 deletion src/http/ngx_http_wasm_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ ngx_http_wasm_ops_add_filter(ngx_wasm_ops_plan_t *plan,
goto error;
}

filter->pool = plan->pool;
filter->log = vm->log;
filter->pool = store->pool;
filter->store = store;

if (config) {
Expand Down
2 changes: 1 addition & 1 deletion src/http/proxy_wasm/ngx_http_proxy_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ngx_http_proxy_wasm_get_rctx(ngx_wavm_instance_t *instance)

pwexec = ngx_proxy_wasm_instance2pwexec(instance);
pwctx = pwexec->parent;
if (!pwctx) {
if (pwctx == NULL) {
return NULL;
}

Expand Down
2 changes: 1 addition & 1 deletion src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ ngx_http_proxy_wasm_dispatch_resume_handler(ngx_wasm_socket_tcp_t *sock)
*/
pwexec->call = call;

ecode = ngx_proxy_wasm_run_step(pwexec, pwexec->ictx,
ecode = ngx_proxy_wasm_run_step(pwexec,
NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE);
if (ecode != NGX_PROXY_WASM_ERR_NONE) {
goto error;
Expand Down
6 changes: 3 additions & 3 deletions src/wasm/vm/ngx_wavm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1345,12 +1345,12 @@ ngx_wavm_instance_destroy(ngx_wavm_instance_t *instance)
ngx_wavm_func_t *func;
ngx_wavm_module_t *module = instance->module;

ngx_log_debug5(NGX_LOG_DEBUG_WASM,
ngx_log_debug6(NGX_LOG_DEBUG_WASM,
instance->log ? instance->log : ngx_cycle->log, 0,
"wasm freeing \"%V\" instance in \"%V\" vm"
" (vm: %p, module: %p, instance: %p)",
" (vm: %p, module: %p, instance: %p, trapped: %d)",
&module->name, module->vm->name,
module->vm, module, instance);
module->vm, module, instance, instance->trapped);

if (instance->funcs.nelts) {
for (i = 0; i < instance->funcs.nelts; i++) {
Expand Down
23 changes: 11 additions & 12 deletions t/03-proxy_wasm/007-on_http_instance_isolation.t
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,8 @@ Should recycle the global instance when trapped.
(.*?(Uncaught RuntimeError: )?unreachable|\s*wasm trap: wasm `unreachable` instruction executed)[^#*]*
\*\d+ .*? filter chain failed resuming: previous error \(instance trapped\)[^#*]*
\*\d+ .*? filter freeing context #\d+ \(1\/2\)[^#*]*
\*\d+ .*? filter freeing context #\d+ \(2\/2\)\Z/,
qr/\A\*\d+ .*? filter freeing trapped instance[^#*]*
\*\d+ wasm freeing "hostcalls" instance in "main" vm[^#*]*
\*\d+ .*? filter new instance[^#*]*
\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*\Z/,
qr/\A\*\d+ .*? filter new instance[^#*]*
\*\d+ .*? filter reusing instance[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_request_headers" step in "rewrite" phase[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_response_headers" step in "header_filter" phase[^#*]*
Expand All @@ -110,6 +108,7 @@ qr/\A\*\d+ .*? filter freeing trapped instance[^#*]*
\*\d+ .*? filter 2\/2 resuming "on_done" step in "done" phase[^#*]*
\*\d+ .*? filter 2\/2 finalizing context[^#*]*
\*\d+ .*? filter freeing context #\d+ \(1\/2\)[^#*]*
\*\d+ wasm freeing "hostcalls" instance in "main" vm[^#*]*
\*\d+ .*? filter freeing context #\d+ \(2\/2\)\Z/]
--- no_error_log
[emerg]
Expand Down Expand Up @@ -153,9 +152,9 @@ should use an instance per stream
\*\d+ .*? filter 1\/2 finalizing context[^#*]*
\*\d+ .*? filter 2\/2 resuming "on_done" step in "done" phase[^#*]*
\*\d+ .*? filter 2\/2 finalizing context[^#*]*
\*\d+ .*? freeing "hostcalls" instance in "main" vm[^#*]*
\*\d+ .*? filter freeing context #\d+ \(1\/2\)[^#*]*
\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*
\*\d+ .*? freeing "hostcalls" instance in "main" vm[^#*]*\Z/,
\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*\Z/,
qr/\A\*\d+ .*? filter new instance[^#*]*
#0 on_configure[^#*]*
\*\d+ .*? filter reusing instance[^#*]*
Expand All @@ -172,9 +171,9 @@ qr/\A\*\d+ .*? filter new instance[^#*]*
\*\d+ .*? filter 1\/2 finalizing context[^#*]*
\*\d+ .*? filter 2\/2 resuming "on_done" step in "done" phase[^#*]*
\*\d+ .*? filter 2\/2 finalizing context[^#*]*
\*\d+ .*? freeing "hostcalls" instance in "main" vm[^#*]*
\*\d+ .*? filter freeing context #\d+ \(1\/2\)[^#*]*
\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*
\*\d+ .*? freeing "hostcalls" instance in "main" vm[^#*]*\Z/]
\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*\Z/]
--- no_error_log
[error]
[crit]
Expand Down Expand Up @@ -204,9 +203,9 @@ qr/\A\*\d+ .*? filter new instance[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_request_headers" step in "rewrite" phase[^#*]*
(.*?(Uncaught RuntimeError: )?unreachable|\s*wasm trap: wasm `unreachable` instruction executed)[^#*]*
\*\d+ .*? filter chain failed resuming: previous error \(instance trapped\)[^#*]*
\*\d+ .*? freeing "hostcalls" instance in "main" vm[^#*]*
\*\d+ .*? filter freeing context #\d+ \(1\/2\)[^#*]*
\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*
\*\d+ .*? freeing "hostcalls" instance in "main" vm[^#*]*\Z/,
\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*\Z/,
qr/\A\*\d+ .*? filter new instance[^#*]*
\*\d+ .*? filter reusing instance[^#*]*
\*\d+ .*? filter 1\/2 resuming "on_request_headers" step in "rewrite" phase[^#*]*
Expand All @@ -219,9 +218,9 @@ qr/\A\*\d+ .*? filter new instance[^#*]*
\*\d+ .*? filter 1\/2 finalizing context[^#*]*
\*\d+ .*? filter 2\/2 resuming "on_done" step in "done" phase[^#*]*
\*\d+ .*? filter 2\/2 finalizing context[^#*]*
\*\d+ .*? freeing "hostcalls" instance in "main" vm[^#*]*
\*\d+ .*? filter freeing context #\d+ \(1\/2\)[^#*]*
\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*
\*\d+ .*? freeing "hostcalls" instance in "main" vm[^#*]*\Z/]
\*\d+ .*? filter freeing context #\d+ \(2\/2\)[^#*]*\Z/]
--- no_error_log
[emerg]
[alert]
Expand Down
3 changes: 1 addition & 2 deletions t/03-proxy_wasm/hfuncs/114-proxy_set_http_request_body.t
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ HelloWorld
}
location /log {
# cannot set request body
# not executed (subrequest)
proxy_wasm hostcalls 'on=log \
test=/t/set_request_body';
echo $request_body;
Expand All @@ -258,7 +258,6 @@ from_request_body
--- grep_error_log_out eval
qr/.*?host trap \(bad usage\): cannot set request body.*
\[info\] .*? \*\d+ .*? filter chain failed resuming: previous error \(instance trapped\).*? subrequest: "\/response_headers".*
\[info\] .*? \*\d+ .*? filter chain failed resuming: previous error \(instance trapped\).*? request: "GET \/t HTTP\/1\.1".*
\z/
--- no_error_log
[alert]
Expand Down
Loading

0 comments on commit bc8649e

Please sign in to comment.