Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(*) proxy-wasm runloop + req state enhancements #508

Merged
merged 3 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/common/ngx_wasm_socket_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ ngx_wasm_socket_tcp_resume(ngx_wasm_socket_tcp_t *sock)
ngx_wasm_yield(&rctx->env);
break;
case NGX_ERROR:
ngx_http_wasm_error(rctx);
ngx_wasm_error(&rctx->env);
break;
default:
ngx_wasm_assert(rc == NGX_OK);
ngx_http_wasm_continue(rctx);
ngx_wasm_continue(&rctx->env);
break;
}

Expand Down
17 changes: 0 additions & 17 deletions src/common/ngx_wasm_subsystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,3 @@ ngx_wasm_set_resume_handler(ngx_wasm_subsys_env_t *env)
}
#endif
}


ngx_inline void
ngx_wasm_yield(ngx_wasm_subsys_env_t *env)
{
#if (NGX_WASM_HTTP)
ngx_http_wasm_req_ctx_t *rctx;

if (env->subsys->kind == NGX_WASM_SUBSYS_HTTP) {
rctx = env->ctx.rctx;

rctx->state = NGX_HTTP_WASM_REQ_STATE_YIELD;
}
#endif

ngx_wasm_set_resume_handler(env);
}
15 changes: 14 additions & 1 deletion src/common/ngx_wasm_subsystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,23 @@
#endif


#define ngx_wasm_continue(env) \
(env)->state = NGX_WASM_STATE_CONTINUE

#define ngx_wasm_error(env) \
(env)->state = NGX_WASM_STATE_ERROR

#define ngx_wasm_yield(env) \
(env)->state = NGX_WASM_STATE_YIELD; \
ngx_wasm_set_resume_handler(env)

#define ngx_wasm_yielding(env) \
(env)->state == NGX_WASM_STATE_YIELD


ngx_wasm_phase_t *ngx_wasm_phase_lookup(ngx_wasm_subsystem_t *subsys,
ngx_uint_t phaseidx);
void ngx_wasm_set_resume_handler(ngx_wasm_subsys_env_t *env);
void ngx_wasm_yield(ngx_wasm_subsys_env_t *env);


#endif /* _NGX_WASM_SUBSYSTEM_H_INCLUDED_ */
110 changes: 56 additions & 54 deletions src/common/proxy_wasm/ngx_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,27 +445,13 @@ action2rc(ngx_proxy_wasm_ctx_t *pwctx,
}
#if (NGX_DEBUG)
else {
#if 0
if (pwexec->root_id == NGX_PROXY_WASM_ROOT_CTX_ID) {
ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwctx->log,
pwexec->ecode,
"root context skipping \"%V\" "
"step in \"%V\" phase",
ngx_proxy_wasm_step_name(pwctx->step),
&pwctx->phase->name);

} else {
#endif
ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwctx->log,
pwexec->ecode,
"filter %l/%l skipping \"%V\" "
"step in \"%V\" phase",
pwexec->index + 1, pwctx->nfilters,
ngx_proxy_wasm_step_name(pwctx->step),
&pwctx->phase->name);
#if 0
}
#endif
ngx_proxy_wasm_log_error(NGX_LOG_DEBUG, pwctx->log,
pwexec->ecode,
"filter %l/%l skipping \"%V\" "
"step in \"%V\" phase: previous error",
pwexec->index + 1, pwctx->nfilters,
ngx_proxy_wasm_step_name(pwctx->step),
&pwctx->phase->name);
}
#endif

Expand Down Expand Up @@ -522,13 +508,20 @@ action2rc(ngx_proxy_wasm_ctx_t *pwctx,
}

goto yield;

case NGX_HTTP_WASM_BODY_FILTER_PHASE:
instance = ngx_proxy_wasm_pwexec2instance(pwexec);
rctx = ngx_http_proxy_wasm_get_rctx(instance);

ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwctx->log, 0,
"proxy_wasm buffering response after "
"\"ResponseBody\" step "
"(filter: %l/%l, pwctx: %p)",
pwexec->index + 1, pwctx->nfilters, pwctx);

rctx->resp_buffering = 1;
goto yield;

case NGX_HTTP_REWRITE_PHASE:
case NGX_HTTP_ACCESS_PHASE:
case NGX_HTTP_CONTENT_PHASE:
Expand Down Expand Up @@ -604,41 +597,47 @@ ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx,

dd("enter");

pwctx->step = step;

switch (step) {
case NGX_PROXY_WASM_STEP_TICK:
case NGX_PROXY_WASM_STEP_DONE:
case NGX_PROXY_WASM_STEP_RESP_BODY:
case NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE:
break;
case NGX_PROXY_WASM_STEP_RESP_HEADERS:
if (pwctx->last_step < NGX_PROXY_WASM_STEP_RESP_HEADERS) {
if (pwctx->last_completed_step < NGX_PROXY_WASM_STEP_RESP_HEADERS) {
/* first execution of response phases, ensure the chain is reset */
ngx_proxy_wasm_ctx_reset_chain(pwctx);
}

break;
default:
if (step <= pwctx->last_step) {
if (step <= pwctx->last_completed_step) {
dd("step %d already completed, exit", step);
ngx_wasm_assert(rc == NGX_OK);
goto ret;
}
}

pwctx->step = step;

/* resume filters chain */

pwexecs = (ngx_proxy_wasm_exec_t *) pwctx->pwexecs.elts;

dd("pwctx->exec_index: %ld, nelts: %ld",
pwctx->exec_index, pwctx->pwexecs.nelts);

for (i = pwctx->exec_index; i < pwctx->pwexecs.nelts; i++) {
dd("exec_index: %ld", i);
ngx_wasm_assert(pwctx->pwexecs.nelts == pwctx->nfilters);

for (i = pwctx->exec_index; i < pwctx->nfilters; i++) {
pwexec = &pwexecs[i];

dd("---> exec_index: %ld (%ld/%ld), pwexec: %p (\"%.*s\"), pwctx: %p",
pwctx->exec_index,
ngx_min(pwctx->exec_index + 1, pwctx->nfilters),
pwctx->nfilters,
pwexec,
(int) pwexec->filter->name->len,
pwexec->filter->name->data,
pwctx);

ngx_wasm_assert(pwexec->root_id != NGX_PROXY_WASM_ROOT_CTX_ID);

/* check for yielded state */
Expand All @@ -659,42 +658,45 @@ ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx,
/* run step */

pwexec->ecode = ngx_proxy_wasm_run_step(pwexec, step);
dd("pwexec->ecode: %d, pwctx->action: %d (pwctx: %p)",
pwexec->ecode, pwctx->action, pwctx);
dd("<--- run_step ecode: %d, pwctx->action: %d",
pwexec->ecode, pwctx->action);
if (pwexec->ecode != NGX_PROXY_WASM_ERR_NONE) {
rc = pwexec->filter->subsystem->ecode(pwexec->ecode);
goto ret;
}

/* check for yield/done */

rc = action2rc(pwctx, pwexec);
if (rc != NGX_OK) {
if (rc == NGX_AGAIN
&& pwctx->exec_index + 1 <= pwctx->nfilters
&& step != NGX_PROXY_WASM_STEP_RESP_BODY)
{
dd("yield: resume on next filter "
"(idx: %ld -> %ld, nelts: %ld)",
pwctx->exec_index, pwctx->exec_index + 1,
pwctx->pwexecs.nelts);

switch (pwctx->action) {
case NGX_PROXY_WASM_ACTION_CONTINUE:
dd("-------- next filter --------");
pwctx->exec_index++;
break;
case NGX_PROXY_WASM_ACTION_PAUSE:
/**
* Exception for response body buffering which re-enters
* the same filter once the response is buffered.
*/
if (step != NGX_PROXY_WASM_STEP_RESP_BODY) {
dd("-------- pause --------");
pwctx->exec_index++;
}

goto ret;
/* fallthrough */

default:
break;
}

pwctx->exec_index++;
/* check for yield/done */

dd("-------- next filter --------");
rc = action2rc(pwctx, pwexec);
if (rc != NGX_OK && rc != NGX_AGAIN) {
goto ret;
}
}

ngx_wasm_assert(rc == NGX_OK);

/* next step */

pwctx->last_step = pwctx->step;
pwctx->last_completed_step = pwctx->step;
pwctx->exec_index = 0;

ret:
Expand Down Expand Up @@ -723,7 +725,7 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,

ngx_wasm_assert(pwctx->phase);

dd("enter (pwexec: %p, ictx: %p, trapped: %d)",
dd("--> enter (pwexec: %p, ictx: %p, trapped: %d)",
pwexec, pwexec->ictx,
pwexec->ictx ? pwexec->ictx->instance->trapped : 0);

Expand Down Expand Up @@ -798,8 +800,8 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
break;
}

dd("rc: %ld, old_action: %d, pwctx->action: %d, ret action: %d, ictx: %p",
rc, old_action, pwctx->action, action, pwexec->ictx);
dd("<-- step rc: %ld, old_action: %d, ret action: %d, pwctx->action: %d, "
" ictx: %p", rc, old_action, action, pwctx->action, pwexec->ictx);

/* pwctx->action writes in host calls overwrite action return value */

Expand Down
2 changes: 1 addition & 1 deletion src/common/proxy_wasm/ngx_proxy_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ struct ngx_proxy_wasm_ctx_s {
ngx_wasm_phase_t *phase;
ngx_proxy_wasm_action_e action;
ngx_proxy_wasm_step_e step;
ngx_proxy_wasm_step_e last_step;
ngx_proxy_wasm_step_e last_completed_step;
ngx_uint_t exec_index;

/* cache */
Expand Down
2 changes: 1 addition & 1 deletion src/common/proxy_wasm/ngx_proxy_wasm_host.c
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ ngx_proxy_wasm_hfuncs_resume_http_request(ngx_wavm_instance_t *instance,
rctx = ngx_http_proxy_wasm_get_rctx(instance);

/* force resume */
ngx_http_wasm_continue(rctx);
ngx_wasm_continue(&rctx->env);

return ngx_proxy_wasm_result_ok(rets);
}
Expand Down
18 changes: 1 addition & 17 deletions src/http/ngx_http_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,11 @@
#endif


typedef enum {
NGX_HTTP_WASM_REQ_STATE_CONTINUE,
NGX_HTTP_WASM_REQ_STATE_ERROR,
NGX_HTTP_WASM_REQ_STATE_YIELD,
} ngx_http_wasm_req_state_e;


struct ngx_http_wasm_req_ctx_s {
ngx_http_request_t *r;
ngx_connection_t *connection;
ngx_pool_t *pool; /* r->pool */
ngx_wasm_subsys_env_t env;
ngx_http_wasm_req_state_e state; /* determines next step on resume */
ngx_wasm_op_ctx_t opctx;
ngx_wasm_ops_t *ffi_engine;
void *data; /* per-stream extra context */
Expand Down Expand Up @@ -170,15 +162,7 @@ ngx_int_t ngx_http_wasm_prepend_resp_body(ngx_http_wasm_req_ctx_t *rctx,
ngx_str_t *body);


/* yielding */
#define ngx_http_wasm_continue(rctx) \
(rctx->state = NGX_HTTP_WASM_REQ_STATE_CONTINUE)
#define ngx_http_wasm_error(rctx) \
(rctx->state = NGX_HTTP_WASM_REQ_STATE_ERROR)
#define ngx_http_wasm_yielding(rctx) \
(rctx->state == NGX_HTTP_WASM_REQ_STATE_YIELD)


/* resume handler */
void ngx_http_wasm_set_resume_handler(ngx_http_wasm_req_ctx_t *rctx);
void ngx_http_wasm_resume(ngx_http_wasm_req_ctx_t *rctx, unsigned main,
unsigned wev);
Expand Down
9 changes: 2 additions & 7 deletions src/http/ngx_http_wasm_filter_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ ngx_http_wasm_body_filter_handler(ngx_http_request_t *r, ngx_chain_t *in)
static void
ngx_http_wasm_body_filter_resume(ngx_http_wasm_req_ctx_t *rctx, ngx_chain_t *in)
{
ngx_int_t rc;
ngx_chain_t *cl;

rctx->resp_chunk = in;
Expand All @@ -274,12 +273,8 @@ ngx_http_wasm_body_filter_resume(ngx_http_wasm_req_ctx_t *rctx, ngx_chain_t *in)
}

if (!rctx->resp_buffering) {
rc = ngx_wasm_ops_resume(&rctx->opctx,
NGX_HTTP_WASM_BODY_FILTER_PHASE);
if (rc == NGX_AGAIN) {
ngx_wasm_assert(rctx->resp_bufs == NULL);
rctx->resp_buffering = 1;
}
(void) ngx_wasm_ops_resume(&rctx->opctx,
NGX_HTTP_WASM_BODY_FILTER_PHASE);
}
}

Expand Down
Loading