Skip to content

Commit

Permalink
feat(proxy-wasm) cancel pending calls when one produces a response
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaultcha committed Feb 9, 2024
1 parent 261bd71 commit 45a0c12
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 13 deletions.
2 changes: 2 additions & 0 deletions src/common/ngx_wasm_socket_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,8 @@ ngx_wasm_socket_tcp_handler(ngx_event_t *ev)
(int) ev->write);

if (sock->closed) {
ngx_log_debug0(NGX_LOG_DEBUG_WASM, ev->log, 0,
"wasm tcp socket was closed");
return;
}

Expand Down
41 changes: 41 additions & 0 deletions src/common/proxy_wasm/ngx_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,43 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
}


ngx_uint_t
ngx_proxy_wasm_dispatch_calls_total(ngx_proxy_wasm_exec_t *pwexec)
{
ngx_queue_t *q;
ngx_uint_t n = 0;

for (q = ngx_queue_head(&pwexec->calls);
q != ngx_queue_sentinel(&pwexec->calls);
q = ngx_queue_next(q), n++) { /* void */ }

return n;
}


void
ngx_proxy_wasm_dispatch_calls_cancel(ngx_proxy_wasm_exec_t *pwexec)
{
#ifdef NGX_WASM_HTTP
ngx_queue_t *q;
ngx_http_proxy_wasm_dispatch_t *call;

while (!ngx_queue_empty(&pwexec->calls)) {
q = ngx_queue_head(&pwexec->calls);
call = ngx_queue_data(q, ngx_http_proxy_wasm_dispatch_t, q);

ngx_log_debug1(NGX_LOG_DEBUG_ALL, pwexec->log, 0,
"proxy_wasm http dispatch cancelled (dispatch: %p)",
call);

ngx_queue_remove(&call->q);

ngx_http_proxy_wasm_dispatch_destroy(call);
}
#endif
}


/* host handlers */


Expand Down Expand Up @@ -1053,6 +1090,8 @@ ngx_proxy_wasm_create_context(ngx_proxy_wasm_filter_t *filter,
rexec->filter = filter;
rexec->ictx = ictx;

ngx_queue_init(&rexec->calls);

log = filter->log;

rexec->log = ngx_pcalloc(rexec->pool, sizeof(ngx_log_t));
Expand Down Expand Up @@ -1168,6 +1207,8 @@ ngx_proxy_wasm_create_context(ngx_proxy_wasm_filter_t *filter,
pwexec->ictx = ictx;
pwexec->store = ictx->store;

ngx_queue_init(&pwexec->calls);

} else {
if (in->ictx != ictx) {
dd("replace pwexec instance");
Expand Down
6 changes: 4 additions & 2 deletions src/common/proxy_wasm/ngx_proxy_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ struct ngx_proxy_wasm_exec_s {
ngx_uint_t id;
ngx_uint_t index;
ngx_uint_t tick_period;
ngx_uint_t ncalls;
ngx_rbtree_node_t node;
ngx_proxy_wasm_err_e ecode;
ngx_pool_t *pool;
Expand All @@ -192,8 +191,9 @@ struct ngx_proxy_wasm_exec_s {
ngx_proxy_wasm_store_t *store;
ngx_event_t *ev;
#ifdef NGX_WASM_HTTP
ngx_http_proxy_wasm_dispatch_t *call;
ngx_http_proxy_wasm_dispatch_t *call; /* swap pointer for host functions */
#endif
ngx_queue_t calls;

/* flags */

Expand Down Expand Up @@ -400,6 +400,8 @@ ngx_int_t ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx,
ngx_wasm_phase_t *phase, ngx_proxy_wasm_step_e step);
ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
ngx_proxy_wasm_step_e step);
ngx_uint_t ngx_proxy_wasm_dispatch_calls_total(ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_dispatch_calls_cancel(ngx_proxy_wasm_exec_t *pwexec);


/* host handlers */
Expand Down
10 changes: 10 additions & 0 deletions src/common/proxy_wasm/ngx_proxy_wasm_host.c
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,16 @@ ngx_proxy_wasm_hfuncs_send_local_response(ngx_wavm_instance_t *instance,
NGX_PROXY_WASM_ACTION_DONE);
}

/* pwexec->step == NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE) */

if (ngx_proxy_wasm_dispatch_calls_total(pwexec)) {
ngx_proxy_wasm_log_error(NGX_LOG_NOTICE, pwexec->log, 0,
"local response produced, cancelling "
"pending dispatch calls");

ngx_proxy_wasm_dispatch_calls_cancel(pwexec);
}

break;

case NGX_ERROR:
Expand Down
10 changes: 5 additions & 5 deletions src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ ngx_http_proxy_wasm_dispatch(ngx_proxy_wasm_exec_t *pwexec,

ngx_post_event(ev, &ngx_posted_events);

pwexec->ncalls++;
ngx_queue_insert_head(&pwexec->calls, &call->q);

ngx_proxy_wasm_ctx_set_next_action(pwctx, NGX_PROXY_WASM_ACTION_PAUSE);

Expand Down Expand Up @@ -817,7 +817,7 @@ ngx_http_proxy_wasm_dispatch_resume_handler(ngx_wasm_socket_tcp_t *sock)
}

/* call has finished */
pwexec->ncalls--;
ngx_queue_remove(&call->q);

/**
* Set current call for subsequent call detection after the step
Expand Down Expand Up @@ -853,11 +853,10 @@ ngx_http_proxy_wasm_dispatch_resume_handler(ngx_wasm_socket_tcp_t *sock)
/* remove current call now that callback was invoked */
pwexec->call = NULL;

if (pwexec->ncalls) {
if (ngx_proxy_wasm_dispatch_calls_total(pwexec)) {
ngx_log_debug0(NGX_LOG_DEBUG_ALL, pwexec->log, 0,
"proxy_wasm more http dispatch calls pending...");

/* another call was setup during the callback */
ngx_proxy_wasm_ctx_set_next_action(pwexec->parent,
NGX_PROXY_WASM_ACTION_PAUSE);

Expand Down Expand Up @@ -900,7 +899,8 @@ ngx_http_proxy_wasm_dispatch_resume_handler(ngx_wasm_socket_tcp_t *sock)

error:

pwexec->ncalls--;
/* call has errored */
ngx_queue_remove(&call->q);

error2:

Expand Down
1 change: 1 addition & 0 deletions src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ typedef enum {

struct ngx_http_proxy_wasm_dispatch_s {
ngx_pool_t *pool; /* owned */
ngx_queue_t q; /* stored by caller */
uint32_t id;
ngx_msec_t timeout;
ngx_wasm_socket_tcp_t sock;
Expand Down
48 changes: 42 additions & 6 deletions t/03-proxy_wasm/hfuncs/133-proxy_dispatch_http_edge_cases.t
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,43 @@ qr/\A.*? on_request_headers.*



=== TEST 4: proxy_wasm - dispatch_http_call() on dispatch response (1 subsequent call)
=== TEST 4: proxy_wasm - dispatch_http_call() 2 parallel calls producing a response
--- valgrind
--- load_nginx_modules: ngx_http_echo_module
--- wasm_modules: hostcalls
--- config
location /dispatched {
echo "Hello world";
}

location /t {
proxy_wasm hostcalls 'on=request_headers \
test=/t/dispatch_http_call \
host=127.0.0.1:$TEST_NGINX_SERVER_PORT \
path=/dispatched \
ncalls=2 \
on_http_call_response=echo_response_body';
echo ok;
}
--- response_headers_like
pwm-call-id: 0
--- response_body
Hello world
--- grep_error_log eval: qr/\[info\] .*? on_(http_call_response|request|response|log).*/
--- grep_error_log_out eval
qr/\A.*? on_request_headers.*
.*? on_http_call_response \(id: 0, status: 200, headers: 5, body_bytes: 12, trailers: 0, op: echo_response_body\).*
.*? on_response_headers.*
.*? on_response_body.*
.*? on_log/
--- error_log eval
qr/\[notice\] .*? local response produced, cancelling pending dispatch calls/
--- no_error_log
[error]



=== TEST 5: proxy_wasm - dispatch_http_call() on dispatch response (1 subsequent call)
--- load_nginx_modules: ngx_http_echo_module
--- wasm_modules: hostcalls
--- config
Expand All @@ -144,7 +180,7 @@ called 2 times



=== TEST 5: proxy_wasm - dispatch_http_call() on dispatch response (2 subsequent calls)
=== TEST 6: proxy_wasm - dispatch_http_call() on dispatch response (2 subsequent calls)
--- valgrind
--- load_nginx_modules: ngx_http_echo_module
--- wasm_modules: hostcalls
Expand Down Expand Up @@ -181,7 +217,7 @@ called 3 times



=== TEST 6: proxy_wasm - dispatch_http_call() on_tick (1 call)
=== TEST 7: proxy_wasm - dispatch_http_call() on_tick (1 call)
--- load_nginx_modules: ngx_http_echo_module
--- wasm_modules: hostcalls
--- config
Expand All @@ -208,7 +244,7 @@ qr/on_root_http_call_response \(id: 0, status: 200, headers: 5, body_bytes: 12,



=== TEST 7: proxy_wasm - dispatch_http_call() on_tick (10 calls)
=== TEST 8: proxy_wasm - dispatch_http_call() on_tick (10 calls)
--- valgrind
--- load_nginx_modules: ngx_http_echo_module
--- wasm_modules: hostcalls
Expand Down Expand Up @@ -247,7 +283,7 @@ on_root_http_call_response \(id: 9, status: 200, headers: 5, body_bytes: 12, tra



=== TEST 8: proxy_wasm - dispatch_http_call() on_request_headers, filter chain execution sanity
=== TEST 9: proxy_wasm - dispatch_http_call() on_request_headers, filter chain execution sanity
should execute all filters request and response steps
--- load_nginx_modules: ngx_http_echo_module
--- wasm_modules: hostcalls
Expand Down Expand Up @@ -292,7 +328,7 @@ qr/\A.*? on_request_headers.*



=== TEST 9: proxy_wasm - dispatch_http_call() on_request_body, filter chain execution sanity
=== TEST 10: proxy_wasm - dispatch_http_call() on_request_body, filter chain execution sanity
should execute all filters request and response steps
--- load_nginx_modules: ngx_http_echo_module
--- wasm_modules: hostcalls
Expand Down

0 comments on commit 45a0c12

Please sign in to comment.