From df17c4506601afeb27739061eb2a5c3dfbf79ff2 Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Thu, 10 Aug 2023 13:45:12 -0700 Subject: [PATCH] feat(proxy-wasm) implement response body buffering First, this commit fixes `on_response_body` execution of requests when they issue subrequests that produce a body (see 004-on_http_phases.t). This unlocks testing of response body buffering via subrequests producing chunked responses (`ngx_chain_t` buffers). Response body buffering itself is implemented as part of ngx_http_wasm_filter_module, so as to be usable from other components than just proxy-wasm. Response body buffering is enabled via `rctx->resp_buffering` which is enabled when `ngx_wasm_ops_resume` returns `NGX_AGAIN`, which for filters translates to returning `PAUSE` from `on_http_response_body`. If and when the response body buffers are full, then the next `on_http_response_body` call will have `eof=false`, and more invocations can be expected, as the body *must* be proxied in chunks. If the response body fit in the buffers, then the next `on_http_response_body` call will have `eof=true` as one would expected. We catch "response body already requested" in ngx_proxy_wasm to produce a proxy-wasm error message, as this feature is currently only exposed through it. --- src/common/proxy_wasm/ngx_proxy_wasm.c | 10 +- src/http/ngx_http_wasm.h | 10 +- src/http/ngx_http_wasm_filter_module.c | 236 +++++++- src/http/ngx_http_wasm_module.c | 12 + src/http/proxy_wasm/ngx_http_proxy_wasm.c | 12 + src/wasm/ngx_wasm.h | 3 + src/wasm/ngx_wasm_ops.c | 1 + src/wasm/ngx_wasm_util.c | 14 +- t/03-proxy_wasm/004-on_http_phases.t | 49 +- t/03-proxy_wasm/006-on_http_next_action.t | 29 +- .../008-on_http_response_body_buffering.t | 519 ++++++++++++++++++ ...-proxy_wasm_oob.t => 009-proxy_wasm_oob.t} | 0 .../proxy-wasm-tests/on-phases/src/filter.rs | 20 +- 13 files changed, 864 insertions(+), 51 deletions(-) create mode 100644 t/03-proxy_wasm/008-on_http_response_body_buffering.t rename t/03-proxy_wasm/{008-proxy_wasm_oob.t => 009-proxy_wasm_oob.t} (100%) diff --git a/src/common/proxy_wasm/ngx_proxy_wasm.c b/src/common/proxy_wasm/ngx_proxy_wasm.c index c0ee1417d..3c04bba8c 100644 --- a/src/common/proxy_wasm/ngx_proxy_wasm.c +++ b/src/common/proxy_wasm/ngx_proxy_wasm.c @@ -530,6 +530,13 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx, case NGX_PROXY_WASM_ACTION_PAUSE: switch (pwctx->phase->index) { #ifdef NGX_WASM_HTTP + case NGX_HTTP_WASM_BODY_FILTER_PHASE: + ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwctx->log, 0, + "proxy_wasm buffering response after " + "\"ResponseBody\" step " + "(filter: %l/%l, pwctx: %p)", + pwexec->index + 1, pwctx->nfilters, pwctx); + goto yield; case NGX_HTTP_REWRITE_PHASE: case NGX_HTTP_ACCESS_PHASE: case NGX_HTTP_CONTENT_PHASE: @@ -795,7 +802,8 @@ ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx, rc = ngx_proxy_wasm_action2rc(pwctx, pwexec); if (rc != NGX_OK) { if (rc == NGX_AGAIN - && pwctx->exec_index + 1 <= pwctx->nfilters) + && pwctx->exec_index + 1 <= pwctx->nfilters + && step != NGX_PROXY_WASM_STEP_RESP_BODY) { dd("yield: resume on next filter " "(idx: %ld -> %ld, nelts: %ld)", diff --git a/src/http/ngx_http_wasm.h b/src/http/ngx_http_wasm.h index 94043832b..92f033f92 100644 --- a/src/http/ngx_http_wasm.h +++ b/src/http/ngx_http_wasm.h @@ -47,10 +47,11 @@ struct ngx_http_wasm_req_ctx_s { ngx_http_handler_pt r_content_handler; ngx_array_t resp_shim_headers; + ngx_int_t resp_bufs_count; /* response buffers count */ + ngx_chain_t *resp_bufs; /* response buffers */ + ngx_chain_t *resp_buf_last; /* last response buffers */ ngx_chain_t *resp_chunk; off_t resp_chunk_len; - unsigned resp_chunk_eof; /* seen last buf flag */ - off_t req_content_length_n; off_t resp_content_length_n; @@ -73,6 +74,8 @@ struct ngx_http_wasm_req_ctx_s { unsigned in_wev:1; /* in wev_handler */ unsigned resp_content_chosen:1; /* content handler has an output to produce */ + unsigned resp_chunk_eof; /* seen last buf flag */ + unsigned resp_buffering; /* enable response buffering */ unsigned resp_content_sent:1; /* has started sending output (may have yielded) */ unsigned resp_finalized:1; /* finalized connection (ourselves) */ unsigned fake_request:1; @@ -91,8 +94,9 @@ typedef struct { ngx_msec_t recv_timeout; size_t socket_buffer_size; /* wasm_socket_buffer_size */ - ngx_bufs_t socket_large_buffers; /* wasm_socket_large_buffer_size */ ngx_flag_t socket_buffer_reuse; /* wasm_socket_buffer_reuse */ + ngx_bufs_t socket_large_buffers; /* wasm_socket_large_buffer_size */ + ngx_bufs_t resp_body_buffers; /* wasm_response_body_buffers */ ngx_flag_t pwm_req_headers_in_access; ngx_flag_t pwm_lua_resolver; diff --git a/src/http/ngx_http_wasm_filter_module.c b/src/http/ngx_http_wasm_filter_module.c index ca3d537dd..f204e1031 100644 --- a/src/http/ngx_http_wasm_filter_module.c +++ b/src/http/ngx_http_wasm_filter_module.c @@ -44,6 +44,12 @@ static ngx_http_output_header_filter_pt ngx_http_next_header_filter; static ngx_http_output_body_filter_pt ngx_http_next_body_filter; +static void ngx_http_wasm_body_filter_resume(ngx_http_wasm_req_ctx_t *rctx, + ngx_chain_t *in); +static ngx_int_t ngx_http_wasm_body_filter_buffer( + ngx_http_wasm_req_ctx_t *rctx, ngx_chain_t *in); + + static ngx_int_t ngx_http_wasm_filter_init(ngx_conf_t *cf) { @@ -125,7 +131,7 @@ static ngx_int_t ngx_http_wasm_body_filter_handler(ngx_http_request_t *r, ngx_chain_t *in) { ngx_int_t rc; - ngx_http_wasm_req_ctx_t *rctx; + ngx_http_wasm_req_ctx_t *rctx = NULL, *mrctx = NULL; dd("enter"); @@ -134,22 +140,59 @@ ngx_http_wasm_body_filter_handler(ngx_http_request_t *r, ngx_chain_t *in) goto done; } - if (rc != NGX_DECLINED && !rctx->entered_header_filter) { - rc = NGX_DECLINED; - } + if (rc == NGX_DECLINED || !rctx->entered_header_filter) { + if (r != r->main) { + /* get main rctx */ + rc = ngx_http_wasm_rctx(r->main, &mrctx); + if (rc == NGX_ERROR) { + goto done; + } + } - if (rc == NGX_DECLINED) { - rc = ngx_http_next_body_filter(r, in); - goto done; - } + if (rc == NGX_DECLINED && rctx == NULL) { + /* r == r->main or r->main has no rctx */ + rc = ngx_http_next_body_filter(r, in); + goto done; + } - ngx_wasm_assert(rc == NGX_OK); + if (r != r->main && rctx == NULL) { + /* subrequest with no rctx; merge main rctx for buffering */ + ngx_wasm_assert(mrctx); + rctx = mrctx; + } + } - rctx->resp_chunk = in; - rctx->resp_chunk_len = ngx_wasm_chain_len(in, &rctx->resp_chunk_eof); + ngx_http_wasm_body_filter_resume(rctx, in); - (void) ngx_wasm_ops_resume(&rctx->opctx, - NGX_HTTP_WASM_BODY_FILTER_PHASE); + if (rctx->resp_buffering) { + rc = ngx_http_wasm_body_filter_buffer(rctx, in); + switch (rc) { + case NGX_ERROR: + goto done; + case NGX_OK: + /* chunk in buffers */ + ngx_wasm_assert(rctx->resp_bufs); + + if (!rctx->resp_chunk_eof) { + /* more to come; go again */ + rc = NGX_AGAIN; + goto done; + } + + dd("eof, resume"); + rctx->resp_buffering = 0; + ngx_http_wasm_body_filter_resume(rctx, rctx->resp_bufs); + break; + case NGX_DONE: + dd("buffers full, resume"); + rctx->resp_buffering = 0; + ngx_http_wasm_body_filter_resume(rctx, rctx->resp_bufs); + break; + default: + ngx_wasm_assert(0); + break; + } + } rc = ngx_http_next_body_filter(r, rctx->resp_chunk); dd("ngx_http_next_body_filter rc: %ld", rc); @@ -159,9 +202,12 @@ ngx_http_wasm_body_filter_handler(ngx_http_request_t *r, ngx_chain_t *in) rctx->resp_chunk = NULL; - ngx_chain_update_chains(r->connection->pool, + ngx_wasm_chain_log_debug(r->connection->log, rctx->resp_chunk, + "rctx->resp_chunk"); + + ngx_chain_update_chains(rctx->pool, &rctx->free_bufs, &rctx->busy_bufs, - &rctx->resp_chunk, buf_tag); + &rctx->resp_chunk, rctx->env.buf_tag); #ifdef NGX_WASM_RESPONSE_TRAILERS if (rctx->resp_chunk_eof && r->parent == NULL) { @@ -179,3 +225,163 @@ ngx_http_wasm_body_filter_handler(ngx_http_request_t *r, ngx_chain_t *in) return rc; } + + +static void +ngx_http_wasm_body_filter_resume(ngx_http_wasm_req_ctx_t *rctx, ngx_chain_t *in) +{ + ngx_int_t rc; + ngx_chain_t *cl; + + rctx->resp_chunk = in; + rctx->resp_chunk_len = 0; + + for (cl = rctx->resp_chunk; cl; cl = cl->next) { + rctx->resp_chunk_len += ngx_buf_size(cl->buf); + + if (cl->buf->last_buf) { + rctx->resp_chunk_eof = 1; + break; + + } else if (cl->buf->last_in_chain) { + if (rctx->r != rctx->r->main) { + rctx->resp_chunk_eof = 1; + } + + break; + } + } + + if (!rctx->resp_buffering) { + rc = ngx_wasm_ops_resume(&rctx->opctx, + NGX_HTTP_WASM_BODY_FILTER_PHASE); + if (rc == NGX_ERROR || rc > NGX_OK) { + return; + } + + if (rc == NGX_AGAIN) { + ngx_wasm_assert(rctx->resp_bufs == NULL); + rctx->resp_buffering = 1; + } + } +} + + +static ngx_int_t +ngx_http_wasm_body_filter_buffer(ngx_http_wasm_req_ctx_t *rctx, ngx_chain_t *in) +{ + off_t n, avail, copy; + ngx_buf_t *b; + ngx_chain_t *cl, *ll, *rl; + ngx_http_request_t *r = rctx->r; + ngx_http_wasm_loc_conf_t *loc; + + dd("enter"); + + ngx_wasm_assert(rctx->resp_buffering); + + cl = rctx->resp_buf_last; + loc = ngx_http_get_module_loc_conf(r, ngx_http_wasm_module); + + for (ll = in; ll; ll = ll->next) { + + n = ngx_buf_size(ll->buf); + dd("n: %ld", n); + + if (n == 0) { + if (ll->buf->last_in_chain || ll->buf->last_buf) { + if (rctx->resp_bufs == NULL) { + rctx->resp_bufs = ll; + + } else { + rctx->resp_buf_last->next = ll; + rctx->resp_buf_last = ll; + } + } + } + + while (n) { + if (cl == NULL) { + if (rctx->resp_bufs_count >= loc->resp_body_buffers.num) { + if (rctx->resp_buf_last) { + for (rl = ll; rl; rl = rl->next) { + if (ngx_buf_size(rl->buf)) { + rctx->resp_buf_last->next = rl; + rctx->resp_buf_last = rl; + } + } + } + + ngx_wasm_chain_log_debug(r->connection->log, + rctx->resp_bufs, + "response buffers: "); + + return NGX_DONE; + } + + b = ngx_create_temp_buf(rctx->pool, + loc->resp_body_buffers.size); + if (b == NULL) { + return NGX_ERROR; + } + + cl = ngx_alloc_chain_link(rctx->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = b; + cl->buf->pos = cl->buf->start; + cl->buf->last = cl->buf->pos; + cl->buf->tag = rctx->env.buf_tag; + cl->buf->memory = 1; + cl->next = NULL; + + if (rctx->resp_bufs == NULL) { + rctx->resp_bufs = cl; + + } else if (rctx->resp_buf_last) { + rctx->resp_buf_last->next = cl; + } + + rctx->resp_buf_last = cl; + rctx->resp_bufs_count++; + } + + avail = cl->buf->end - cl->buf->last; + copy = ngx_min(n, avail); + dd("avail: %ld, copy: %ld", avail, copy); + if (copy == 0) { + cl = NULL; + continue; + } + + ngx_memcpy(cl->buf->last, ll->buf->pos, copy); + + ll->buf->pos += copy; + cl->buf->last += copy; + + cl->buf->flush = ll->buf->flush; + cl->buf->last_buf = ll->buf->last_buf; + cl->buf->last_in_chain = ll->buf->last_in_chain; + + dd("f: %d, l: %d, lic: %d", ll->buf->flush, + ll->buf->last_buf, ll->buf->last_in_chain); + + if (copy < n) { + /* more to copy, next buffer */ + ngx_wasm_assert(cl->buf->last == cl->buf->end); + rctx->resp_buf_last = cl; + cl = NULL; + } + + n -= copy; + dd("next n: %ld", n); + } + } + + ngx_wasm_chain_log_debug(r->connection->log, rctx->resp_bufs, + "response buffers: "); + + return NGX_OK; +} diff --git a/src/http/ngx_http_wasm_module.c b/src/http/ngx_http_wasm_module.c index cb4f6c29d..2bc52572e 100644 --- a/src/http/ngx_http_wasm_module.c +++ b/src/http/ngx_http_wasm_module.c @@ -163,6 +163,13 @@ static ngx_command_t ngx_http_wasm_module_cmds[] = { offsetof(ngx_http_wasm_loc_conf_t, socket_large_buffers), NULL }, + { ngx_string("wasm_response_body_buffers"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE2, + ngx_conf_set_bufs_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_wasm_loc_conf_t, resp_body_buffers), + NULL }, + { ngx_string("proxy_wasm"), NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE12, ngx_http_wasm_proxy_wasm_directive, @@ -357,6 +364,11 @@ ngx_http_wasm_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) conf->socket_buffer_reuse = 1; #endif + ngx_conf_merge_bufs_value(conf->resp_body_buffers, + prev->resp_body_buffers, + NGX_WASM_DEFAULT_RESP_BODY_BUF_NUM, + NGX_WASM_DEFAULT_RESP_BODY_BUF_SIZE); + ngx_conf_merge_value(conf->pwm_req_headers_in_access, prev->pwm_req_headers_in_access, 0); diff --git a/src/http/proxy_wasm/ngx_http_proxy_wasm.c b/src/http/proxy_wasm/ngx_http_proxy_wasm.c index c38d56622..f4cc9a5b6 100644 --- a/src/http/proxy_wasm/ngx_http_proxy_wasm.c +++ b/src/http/proxy_wasm/ngx_http_proxy_wasm.c @@ -241,6 +241,18 @@ ngx_http_proxy_wasm_on_response_body(ngx_proxy_wasm_exec_t *pwexec, *out = rets->data[0].of.i32; + if (*out == NGX_PROXY_WASM_ACTION_PAUSE && rctx->resp_bufs) { + /** + * If 'on_response_body' has been called again, buffering is over. + * The buffers could be full, or eof has been reached. + */ + ngx_proxy_wasm_log_error(NGX_LOG_ERR, pwexec->log, 0, + "invalid \"on_response_body\" return " + "action (PAUSE): response body buffering " + "already requested"); + *out = NGX_PROXY_WASM_ACTION_CONTINUE; + } + return rc; } diff --git a/src/wasm/ngx_wasm.h b/src/wasm/ngx_wasm.h index e857d8e9c..1dbad995d 100644 --- a/src/wasm/ngx_wasm.h +++ b/src/wasm/ngx_wasm.h @@ -42,6 +42,8 @@ #define NGX_WASM_DEFAULT_SOCK_BUF_SIZE 1024 #define NGX_WASM_DEFAULT_SOCK_LARGE_BUF_NUM 4 #define NGX_WASM_DEFAULT_SOCK_LARGE_BUF_SIZE 8192 +#define NGX_WASM_DEFAULT_RESP_BODY_BUF_NUM 4 +#define NGX_WASM_DEFAULT_RESP_BODY_BUF_SIZE 4096 #define ngx_wasm_core_cycle_get_conf(cycle) \ (ngx_get_conf(cycle->conf_ctx, ngx_wasm_module)) \ @@ -109,6 +111,7 @@ typedef struct { ngx_wavm_t *ngx_wasm_main_vm(ngx_cycle_t *cycle); +void ngx_wasm_chain_log_debug(ngx_log_t *log, ngx_chain_t *in, char *label); size_t ngx_wasm_chain_len(ngx_chain_t *in, unsigned *eof); ngx_uint_t ngx_wasm_chain_clear(ngx_chain_t *in, size_t offset, unsigned *eof, unsigned *flush); diff --git a/src/wasm/ngx_wasm_ops.c b/src/wasm/ngx_wasm_ops.c index e7d59153f..777281ea4 100644 --- a/src/wasm/ngx_wasm_ops.c +++ b/src/wasm/ngx_wasm_ops.c @@ -411,6 +411,7 @@ ngx_wasm_op_proxy_wasm_handler(ngx_wasm_op_ctx_t *opctx, } pwctx->phase = phase; + pwctx->action = NGX_PROXY_WASM_ACTION_CONTINUE; switch (phase->index) { diff --git a/src/wasm/ngx_wasm_util.c b/src/wasm/ngx_wasm_util.c index e0800165e..28d270bd0 100644 --- a/src/wasm/ngx_wasm_util.c +++ b/src/wasm/ngx_wasm_util.c @@ -6,9 +6,8 @@ #include -#if 0 -static void -ngx_wasm_chain_log_debug(ngx_log_t *log, ngx_chain_t *in, char *fmt) +void +ngx_wasm_chain_log_debug(ngx_log_t *log, ngx_chain_t *in, char *label) { #if (NGX_DEBUG) size_t len; @@ -25,17 +24,16 @@ ngx_wasm_chain_log_debug(ngx_log_t *log, ngx_chain_t *in, char *fmt) s.len = len; s.data = buf->pos; - ngx_log_debug7(NGX_LOG_DEBUG_WASM, log, 0, + ngx_log_debug8(NGX_LOG_DEBUG_WASM, log, 0, "%s: \"%V\" (buf: %p, len: %d, last_buf: %d," - " last_in_chain: %d, flush: %d)", - fmt, &s, buf, len, buf->last_buf, - buf->last_in_chain, buf->flush); + " last_in_chain: %d, flush: %d, cl: %p)", + label, &s, buf, len, buf->last_buf, + buf->last_in_chain, buf->flush, cl); cl = cl->next; } #endif } -#endif size_t diff --git a/t/03-proxy_wasm/004-on_http_phases.t b/t/03-proxy_wasm/004-on_http_phases.t index 186f76652..651a9becf 100644 --- a/t/03-proxy_wasm/004-on_http_phases.t +++ b/t/03-proxy_wasm/004-on_http_phases.t @@ -599,7 +599,42 @@ qr#filter 1/2 resuming "on_log" step in "log" phase -=== TEST 24: proxy_wasm - same module in multiple location{} blocks +=== TEST 24: proxy_wasm - as a parent of several subrequests +--- load_nginx_modules: ngx_http_echo_module +--- wasm_modules: on_phases +--- config + location /a { + internal; + echo a; + } + + location /b { + internal; + echo bb; + } + + location /t { + echo_subrequest GET '/a'; + echo_subrequest GET '/b'; + proxy_wasm on_phases; + } +--- response_body +a +bb +--- grep_error_log eval: qr/#\d+ on_(request|response|log).*/ +--- grep_error_log_out eval +qr/#\d+ on_request_headers, 2 headers.* +#\d+ on_response_headers, 5 headers.* +#\d+ on_response_body, 2 bytes, eof: false.* +#\d+ on_response_body, 3 bytes, eof: false.* +#\d+ on_response_body, 0 bytes, eof: true/ +--- no_error_log +[error] +[crit] + + + +=== TEST 25: proxy_wasm - same module in multiple location{} blocks --- load_nginx_modules: ngx_http_echo_module --- wasm_modules: on_phases --- config @@ -637,7 +672,7 @@ on_log -=== TEST 25: proxy_wasm - chained filters in same location{} block +=== TEST 26: proxy_wasm - chained filters in same location{} block should run each filter after the other within each phase --- skip_no_debug: 5 --- load_nginx_modules: ngx_http_echo_module @@ -673,7 +708,7 @@ qr/#\d+ on_request_headers, 3 headers.* -=== TEST 26: proxy_wasm - chained filters in server{} block +=== TEST 27: proxy_wasm - chained filters in server{} block should run each filter after the other within each phase --- load_nginx_modules: ngx_http_echo_module --- wasm_modules: on_phases @@ -704,7 +739,7 @@ qr/#\d+ on_request_headers, 2 headers.* -=== TEST 27: proxy_wasm - chained filters in http{} block +=== TEST 28: proxy_wasm - chained filters in http{} block should run each filter after the other within each phase --- load_nginx_modules: ngx_http_echo_module --- wasm_modules: on_phases @@ -735,7 +770,7 @@ qr/#\d+ on_request_headers, 2 headers.* -=== TEST 28: proxy_wasm - mixed filters in server{} and http{} blocks +=== TEST 29: proxy_wasm - mixed filters in server{} and http{} blocks should run root context of both filters should not chain in request; instead, server{} overrides http{} --- load_nginx_modules: ngx_http_echo_module @@ -765,7 +800,7 @@ qr/log_msg: server .*? request: "GET \/t\s+/ -=== TEST 29: proxy_wasm - mixed filters in server{} and location{} blocks (return in rewrite) +=== TEST 30: proxy_wasm - mixed filters in server{} and location{} blocks (return in rewrite) should not chain; instead, location{} overrides server{} --- wasm_modules: on_phases --- config @@ -790,7 +825,7 @@ qr/log_msg: location .*? request: "GET \/t\s+/ -=== TEST 30: proxy_wasm - mixed filters in http{}, server{}, and location{} blocks +=== TEST 31: proxy_wasm - mixed filters in http{}, server{}, and location{} blocks should not chain; instead, location{} overrides server{}, server{} overrides http{} --- wasm_modules: on_phases --- http_config diff --git a/t/03-proxy_wasm/006-on_http_next_action.t b/t/03-proxy_wasm/006-on_http_next_action.t index cf3ae51d1..b6f00d709 100644 --- a/t/03-proxy_wasm/006-on_http_next_action.t +++ b/t/03-proxy_wasm/006-on_http_next_action.t @@ -75,7 +75,8 @@ NYI === TEST 4: proxy_wasm - on_response_body -> Pause -NYI +Triggers response buffering. +--- skip_no_debug: 5 --- wasm_modules: on_phases --- config location /t { @@ -83,12 +84,11 @@ NYI return 200; } --- response_body ---- error_log eval -[ - qr/pausing after "ResponseBody"/, - qr#\[error\] .*? filter 1/1 cannot pause in "body_filter" phase#, - qr#\[warn\] .*? filter 1/1 failed resuming "on_log" step in "log" phase \(not yieldable\)# -] +--- error_log +buffering response after "ResponseBody" step +--- no_error_log +[error] +[crit] @@ -196,8 +196,7 @@ NYI === TEST 8: proxy_wasm - subrequest on_response_body -> Pause NYI ---- timeout_expected: 1 ---- abort +--- skip_no_debug: 5 --- load_nginx_modules: ngx_http_echo_module --- wasm_modules: on_phases --- config @@ -217,13 +216,11 @@ NYI echo_subrequest GET /pause; echo_subrequest GET /nop; } ---- error_code: 200 --- response_body ok ok ---- error_log eval -[ - qr/pausing after "ResponseBody"/, - qr#\[error\] .*? filter 1/1 cannot pause in "body_filter" phase#, - qr#\[warn\] .*? filter 1/1 failed resuming "on_response_body" step in "body_filter" phase \(not yieldable\)# -] +--- error_log +buffering response after "ResponseBody" step +--- no_error_log +[error] +[crit] diff --git a/t/03-proxy_wasm/008-on_http_response_body_buffering.t b/t/03-proxy_wasm/008-on_http_response_body_buffering.t new file mode 100644 index 000000000..46e467a59 --- /dev/null +++ b/t/03-proxy_wasm/008-on_http_response_body_buffering.t @@ -0,0 +1,519 @@ +# vim:set ft= ts=4 sts=4 sw=4 et fdm=marker: + +BEGIN { + $ENV{TEST_NGINX_EVENT_TYPE} = 'poll'; +} + +use strict; +use lib '.'; +use t::TestWasm; + +skip_valgrind("wasmtime"); + +plan tests => repeat_each() * (blocks() * 5); + +add_block_preprocessor(sub { + my $block = shift; + + if (!defined $block->no_error_log) { + $block->set_value("no_error_log", "[error]\n[crit]"); + } + + if (!defined $block->load_nginx_modules) { + $block->set_value("load_nginx_modules", + "ngx_http_echo_module " . + "ngx_http_headers_more_filter_module"); + } + + if (!defined $block->wasm_modules) { + $block->set_value("wasm_modules", "on_phases"); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: proxy_wasm - on_response_body, no buffering +--- config + location /t { + echo -n 'a\n'; + echo_flush; + echo -n 'bb\n'; + echo_flush; + echo -n 'ccc\n'; + echo_flush; + echo -n 'dddd\n'; + proxy_wasm on_phases; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/on_response_body, 2 bytes, eof: false.* +on_response_body, 3 bytes, eof: false.* +on_response_body, 4 bytes, eof: false.* +on_response_body, 5 bytes, eof: false.* +on_response_body, 0 bytes, eof: true.*/ + + + +=== TEST 2: proxy_wasm - on_response_body buffering chunks, default buffers +--- config + location /t { + wasm_response_body_buffers 4 32; + echo -n 'a\n'; + echo_flush; + echo -n 'bb\n'; + echo_flush; + echo -n 'ccc\n'; + echo_flush; + echo -n 'dddd\n'; + proxy_wasm on_phases 'pause_on=response_body'; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 2 bytes, eof: false.* +on_response_body, 14 bytes, eof: true.*/ + + + +=== TEST 3: proxy_wasm - on_response_body buffering chunks, buffers too small for body +--- config + location /t { + wasm_response_body_buffers 3 2; + echo -n 'a\n'; + echo_flush; + echo -n 'bb\n'; + echo_flush; + echo -n 'ccc\n'; + echo_flush; + echo -n 'dddd\n'; + proxy_wasm on_phases 'pause_on=response_body'; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 2 bytes, eof: false.* +on_response_body, 9 bytes, eof: false.* +on_response_body, 5 bytes, eof: false.* +on_response_body, 0 bytes, eof: true.*/ + + + +=== TEST 4: proxy_wasm - on_response_body buffering chunks, buffers larger than body +--- config + location /t { + wasm_response_body_buffers 4 32; + echo -n 'a\n'; + echo_flush; + echo -n 'bb\n'; + echo_flush; + echo -n 'ccc\n'; + echo_flush; + echo -n 'dddd\n'; + proxy_wasm on_phases 'pause_on=response_body'; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 2 bytes, eof: false.* +on_response_body, 14 bytes, eof: true.*/ + + + +=== TEST 5: proxy_wasm - on_response_body buffering chunks, buffers same size as body +--- config + location /t { + wasm_response_body_buffers 1 14; + echo -n 'a\n'; + echo_flush; + echo -n 'bb\n'; + echo_flush; + echo -n 'ccc\n'; + echo_flush; + echo -n 'dddd\n'; + proxy_wasm on_phases 'pause_on=response_body'; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 2 bytes, eof: false.* +on_response_body, 14 bytes, eof: true.*/ + + + +=== TEST 6: proxy_wasm - on_response_body buffering chunks, buffer of size 1 for larger body +--- config + location /t { + wasm_response_body_buffers 1 1; + echo -n 'a\n'; + echo_flush; + echo -n 'bb\n'; + echo_flush; + echo -n 'ccc\n'; + echo_flush; + echo -n 'dddd\n'; + proxy_wasm on_phases 'pause_on=response_body'; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 2 bytes, eof: false.* +on_response_body, 2 bytes, eof: false.* +on_response_body, 3 bytes, eof: false.* +on_response_body, 4 bytes, eof: false.* +on_response_body, 5 bytes, eof: false.* +on_response_body, 0 bytes, eof: true.*/ + + + +=== TEST 7: proxy_wasm - on_response_body buffering chunks, buffers same size as body +--- config + location /t { + wasm_response_body_buffers 1 14; + # unlike 'echo -n', this format creates 2 chained buffers for each 'echo', + # the 2nd one being the newline buffer. + echo 'a'; + echo_flush; + echo 'bb'; + echo_flush; + echo 'ccc'; + echo_flush; + echo 'dddd'; + proxy_wasm on_phases 'pause_on=response_body'; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 2 bytes, eof: false.* +on_response_body, 14 bytes, eof: true.*/ + + + +=== TEST 8: proxy_wasm - on_response_body buffering chunks, buffer of size 1 for larger body +--- config + location /t { + wasm_response_body_buffers 1 1; + # unlike 'echo -n', this format creates 2 chained buffers for each 'echo', + # the 2nd one being the newline buffer. + echo 'a'; + echo_flush; + echo 'bb'; + echo_flush; + echo 'ccc'; + echo_flush; + echo 'dddd'; + proxy_wasm on_phases 'pause_on=response_body'; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 2 bytes, eof: false.* +on_response_body, 2 bytes, eof: false.* +on_response_body, 3 bytes, eof: false.* +on_response_body, 4 bytes, eof: false.* +on_response_body, 5 bytes, eof: false.* +on_response_body, 0 bytes, eof: true.*/ + + + +=== TEST 9: proxy_wasm - on_response_body buffering chunks, buffers smaller than subrequests bodies +--- config + location /a { + internal; + echo a; + } + + location /b { + internal; + echo bb; + } + + location /c { + internal; + echo ccc; + } + + location /d { + internal; + echo dddd; + } + + location /t { + wasm_response_body_buffers 1 1; + echo_subrequest GET '/a'; + echo_subrequest GET '/b'; + echo_subrequest GET '/c'; + echo_subrequest GET '/d'; + proxy_wasm on_phases 'pause_on=response_body'; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 2 bytes, eof: false.* +on_response_body, 2 bytes, eof: false.* +on_response_body, 3 bytes, eof: false.* +on_response_body, 4 bytes, eof: false.* +on_response_body, 5 bytes, eof: false.* +on_response_body, 0 bytes, eof: true.*/ + + + +=== TEST 10: proxy_wasm - on_response_body buffering with proxy_pass (buffers too small for body) +Clear Server header or else different build modes produce different +bufferring results (no-pool/openresty) +--- http_config eval +qq{ + upstream test_upstream { + server unix:$ENV{TEST_NGINX_UNIX_SOCKET}; + } + + server { + listen unix:$ENV{TEST_NGINX_UNIX_SOCKET}; + + location / { + more_clear_headers 'Server'; + echo_duplicate 128 'a'; + echo_flush; + echo_duplicate 128 'b'; + } + } +} +--- config + location /t { + wasm_response_body_buffers 1 64; + + proxy_buffer_size 256; + proxy_buffers 3 128; + proxy_busy_buffers_size 256; + + proxy_pass http://test_upstream/; + proxy_wasm on_phases 'pause_on=response_body'; + } +--- response_body_like: a{128}b{128} +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 155 bytes, eof: false.* +on_response_body, 155 bytes, eof: false.* +on_response_body, 101 bytes, eof: false.* +on_response_body, 0 bytes, eof: true/ + + + +=== TEST 11: proxy_wasm - on_response_body buffering with proxy_pass (buffers larger than body) +Clear Server header or else different build modes produce different +bufferring results (no-pool/openresty) +--- http_config eval +qq{ + upstream test_upstream { + server unix:$ENV{TEST_NGINX_UNIX_SOCKET}; + } + + server { + listen unix:$ENV{TEST_NGINX_UNIX_SOCKET}; + + location / { + more_clear_headers 'Server'; + echo_duplicate 128 "a"; + echo_flush; + echo_duplicate 128 "b"; + echo_flush; + echo_duplicate 128 "c"; + echo_flush; + echo_duplicate 128 "d"; + } + } +} +--- config + location /t { + wasm_response_body_buffers 6 256; + + proxy_buffer_size 256; + proxy_buffers 3 128; + proxy_busy_buffers_size 256; + + proxy_pass http://test_upstream/; + proxy_wasm on_phases 'pause_on=response_body'; + } +--- response_body_like: a{128}b{128}c{128}d{128} +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 155 bytes, eof: false.* +on_response_body, 512 bytes, eof: true.*/ + + + +=== TEST 12: proxy_wasm - on_response_body buffering with proxy_pass (buffers same size as body) +Clear Server header or else different build modes produce different +bufferring results (no-pool/openresty) +--- http_config eval +qq{ + upstream test_upstream { + server unix:$ENV{TEST_NGINX_UNIX_SOCKET}; + } + + server { + listen unix:$ENV{TEST_NGINX_UNIX_SOCKET}; + + location / { + more_clear_headers 'Server'; + echo_duplicate 128 "a"; + echo_flush; + echo_duplicate 128 "b"; + echo_flush; + echo_duplicate 128 "c"; + echo_flush; + echo_duplicate 128 "d"; + } + } +} +--- config + location /t { + wasm_response_body_buffers 4 128; + + proxy_buffer_size 256; + proxy_buffers 3 128; + proxy_busy_buffers_size 256; + + proxy_pass http://test_upstream/; + proxy_wasm on_phases 'pause_on=response_body'; + } +--- response_body_like: a{128}b{128}c{128}d{128} +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 155 bytes, eof: false.* +on_response_body, 512 bytes, eof: true.*/ + + + +=== TEST 13: proxy_wasm - on_response_body buffering, get_http_response_body() when buffers larger than body +--- config + location /t { + wasm_response_body_buffers 4 32; + echo -n 'a\n'; + echo_flush; + echo -n 'bb\n'; + echo_flush; + echo -n 'ccc\n'; + echo_flush; + echo -n 'dddd\n'; + proxy_wasm on_phases 'pause_on=response_body'; + proxy_wasm on_phases 'log_response_body=true'; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 2 bytes, eof: false.* +on_response_body, 14 bytes, eof: true.*/ +--- error_log +response body chunk: "a\nbb\nccc\ndddd\n" +--- no_error_log +[error] + + + +=== TEST 14: proxy_wasm - on_response_body buffering, get_http_response_body() when buffers too small for body +--- config + location /t { + wasm_response_body_buffers 3 2; + echo -n 'a\n'; + echo_flush; + echo -n 'bb\n'; + echo_flush; + echo -n 'ccc\n'; + echo_flush; + echo -n 'dddd\n'; + proxy_wasm on_phases 'pause_on=response_body'; + proxy_wasm on_phases 'log_response_body=true'; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 2 bytes, eof: false.* +on_response_body, 9 bytes, eof: false.* +on_response_body, 9 bytes, eof: false.* +on_response_body, 5 bytes, eof: false.* +on_response_body, 5 bytes, eof: false.* +on_response_body, 0 bytes, eof: true.* +on_response_body, 0 bytes, eof: true.*/ +--- error_log +response body chunk: "a\nbb\nccc\n" +response body chunk: "dddd\n" +--- no_error_log + + + +=== TEST 15: proxy_wasm - on_response_body buffering, then ask for buffering again +--- config + location /t { + wasm_response_body_buffers 3 32; + echo -n 'a\n'; + echo_flush; + echo -n 'bb\n'; + echo_flush; + echo -n 'ccc\n'; + echo_flush; + echo -n 'dddd\n'; + proxy_wasm on_phases 'pause_on=response_body pause_times=2'; + proxy_wasm on_phases 'log_response_body=true'; + } +--- response_body +a +bb +ccc +dddd +--- grep_error_log eval: qr/on_response_body, .*/ +--- grep_error_log_out eval +qr/^on_response_body, 2 bytes, eof: false.* +on_response_body, 14 bytes, eof: true.*/ +--- error_log eval +[ + "response body chunk: \"a\\nbb\\nccc\\ndddd\\n\"", + qr/\[error\] .*? invalid "on_response_body" return action \(PAUSE\): response body buffering already requested/ +] +--- no_error_log diff --git a/t/03-proxy_wasm/008-proxy_wasm_oob.t b/t/03-proxy_wasm/009-proxy_wasm_oob.t similarity index 100% rename from t/03-proxy_wasm/008-proxy_wasm_oob.t rename to t/03-proxy_wasm/009-proxy_wasm_oob.t diff --git a/t/lib/proxy-wasm-tests/on-phases/src/filter.rs b/t/lib/proxy-wasm-tests/on-phases/src/filter.rs index 447cc1367..815003f6a 100644 --- a/t/lib/proxy-wasm-tests/on-phases/src/filter.rs +++ b/t/lib/proxy-wasm-tests/on-phases/src/filter.rs @@ -82,6 +82,11 @@ impl RootContext for HttpHeadersRoot { .config .get("pause_on") .map(|s| s.parse().expect("bad pause_on")), + pause_times: self + .config + .get("pause_times") + .map_or(1, |v| v.parse().expect("bad pause_on value")), + pause_n: 0, })) } } @@ -90,21 +95,34 @@ struct OnPhases { context_id: u32, config: HashMap, pause_on: Option, + pause_times: usize, + pause_n: usize, } impl OnPhases { fn next_action(&mut self, phase: Phase) -> Action { if let Some(pause_on) = &self.pause_on { - if pause_on == &phase { + if pause_on == &phase && self.pause_n < self.pause_times { info!( "#{} pausing after \"{:?}\" phase", self.context_id, pause_on ); + self.pause_n += 1; + return Action::Pause; } } + if self.config.get("log_response_body").is_some() { + if let Some(bytes) = self.get_http_response_body(0, usize::MAX) { + match String::from_utf8(bytes) { + Ok(s) => info!("response body chunk: {:?}", s), + Err(e) => panic!("Invalid UTF-8 sequence: {}", e), + } + } + } + Action::Continue } }