Skip to content

Commit

Permalink
feat(proxy-wasm) implement response body buffering
Browse files Browse the repository at this point in the history
First, this commit fixes `on_response_body` execution of requests when
they issue subrequests that produce a body (see 004-on_http_phases.t).
This unlocks testing of response body buffering via subrequests
producing chunked responses (`ngx_chain_t` buffers).

Response body buffering itself is implemented as part of
ngx_http_wasm_filter_module, so as to be usable from other components
than just proxy-wasm. Response body buffering is enabled via
`rctx->resp_buffering` which is enabled when `ngx_wasm_ops_resume`
returns `NGX_AGAIN`, which for filters translates to returning `PAUSE`
from `on_http_response_body`.

If and when the response body buffers are full, then the next
`on_http_response_body` call will have `eof=false`, and more invocations
can be expected, as the body *must* be proxied in chunks.

If the response body fit in the buffers, then the next
`on_http_response_body` call will have `eof=true` as one would expected.

We catch "response body already requested" in ngx_proxy_wasm to produce
a proxy-wasm error message, as this feature is currently only exposed
through it.
  • Loading branch information
thibaultcha committed Aug 25, 2023
1 parent 1bb6034 commit 77b8661
Show file tree
Hide file tree
Showing 13 changed files with 834 additions and 51 deletions.
10 changes: 9 additions & 1 deletion src/common/proxy_wasm/ngx_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,13 @@ ngx_proxy_wasm_action2rc(ngx_proxy_wasm_ctx_t *pwctx,
case NGX_PROXY_WASM_ACTION_PAUSE:
switch (pwctx->phase->index) {
#ifdef NGX_WASM_HTTP
case NGX_HTTP_WASM_BODY_FILTER_PHASE:
ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwctx->log, 0,
"proxy_wasm buffering response after "
"\"ResponseBody\" step "
"(filter: %l/%l, pwctx: %p)",
pwexec->index + 1, pwctx->nfilters, pwctx);
goto yield;
case NGX_HTTP_REWRITE_PHASE:
case NGX_HTTP_ACCESS_PHASE:
case NGX_HTTP_CONTENT_PHASE:
Expand Down Expand Up @@ -795,7 +802,8 @@ ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx,
rc = ngx_proxy_wasm_action2rc(pwctx, pwexec);
if (rc != NGX_OK) {
if (rc == NGX_AGAIN
&& pwctx->exec_index + 1 <= pwctx->nfilters)
&& pwctx->exec_index + 1 <= pwctx->nfilters
&& step != NGX_PROXY_WASM_STEP_RESP_BODY)
{
dd("yield: resume on next filter "
"(idx: %ld -> %ld, nelts: %ld)",
Expand Down
10 changes: 7 additions & 3 deletions src/http/ngx_http_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ struct ngx_http_wasm_req_ctx_s {

ngx_http_handler_pt r_content_handler;
ngx_array_t resp_shim_headers;
ngx_int_t resp_bufs_count; /* response buffers count */
ngx_chain_t *resp_bufs; /* response buffers */
ngx_chain_t *resp_buf_last; /* last response buffers */
ngx_chain_t *resp_chunk;
off_t resp_chunk_len;
unsigned resp_chunk_eof; /* seen last buf flag */

off_t req_content_length_n;
off_t resp_content_length_n;

Expand All @@ -73,6 +74,8 @@ struct ngx_http_wasm_req_ctx_s {

unsigned in_wev:1; /* in wev_handler */
unsigned resp_content_chosen:1; /* content handler has an output to produce */
unsigned resp_chunk_eof; /* seen last buf flag */
unsigned resp_buffering; /* enable response buffering */
unsigned resp_content_sent:1; /* has started sending output (may have yielded) */
unsigned resp_finalized:1; /* finalized connection (ourselves) */
unsigned fake_request:1;
Expand All @@ -91,8 +94,9 @@ typedef struct {
ngx_msec_t recv_timeout;

size_t socket_buffer_size; /* wasm_socket_buffer_size */
ngx_bufs_t socket_large_buffers; /* wasm_socket_large_buffer_size */
ngx_flag_t socket_buffer_reuse; /* wasm_socket_buffer_reuse */
ngx_bufs_t socket_large_buffers; /* wasm_socket_large_buffer_size */
ngx_bufs_t resp_body_buffers; /* wasm_response_body_buffers */

ngx_flag_t pwm_req_headers_in_access;
ngx_flag_t pwm_lua_resolver;
Expand Down
240 changes: 225 additions & 15 deletions src/http/ngx_http_wasm_filter_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ static ngx_http_output_header_filter_pt ngx_http_next_header_filter;
static ngx_http_output_body_filter_pt ngx_http_next_body_filter;


static void ngx_http_wasm_body_filter_resume(ngx_http_wasm_req_ctx_t *rctx,
ngx_chain_t *in);
static ngx_int_t ngx_http_wasm_body_filter_buffer(
ngx_http_wasm_req_ctx_t *rctx, ngx_chain_t *in);


static ngx_int_t
ngx_http_wasm_filter_init(ngx_conf_t *cf)
{
Expand Down Expand Up @@ -125,7 +131,7 @@ static ngx_int_t
ngx_http_wasm_body_filter_handler(ngx_http_request_t *r, ngx_chain_t *in)
{
ngx_int_t rc;
ngx_http_wasm_req_ctx_t *rctx;
ngx_http_wasm_req_ctx_t *rctx = NULL, *mrctx = NULL;

dd("enter");

Expand All @@ -134,22 +140,59 @@ ngx_http_wasm_body_filter_handler(ngx_http_request_t *r, ngx_chain_t *in)
goto done;
}

if (rc != NGX_DECLINED && !rctx->entered_header_filter) {
rc = NGX_DECLINED;
}
if (rc == NGX_DECLINED || !rctx->entered_header_filter) {
if (r != r->main) {
/* get main rctx */
rc = ngx_http_wasm_rctx(r->main, &mrctx);
if (rc == NGX_ERROR) {
goto done;
}
}

if (rc == NGX_DECLINED) {
rc = ngx_http_next_body_filter(r, in);
goto done;
}
if (rc == NGX_DECLINED && rctx == NULL) {
/* r == r->main or r->main has no rctx */
rc = ngx_http_next_body_filter(r, in);
goto done;
}

ngx_wasm_assert(rc == NGX_OK);
if (r != r->main && rctx == NULL) {
/* subrequest with no rctx; merge main rctx for buffering */
ngx_wasm_assert(mrctx);
rctx = mrctx;
}
}

rctx->resp_chunk = in;
rctx->resp_chunk_len = ngx_wasm_chain_len(in, &rctx->resp_chunk_eof);
ngx_http_wasm_body_filter_resume(rctx, in);

(void) ngx_wasm_ops_resume(&rctx->opctx,
NGX_HTTP_WASM_BODY_FILTER_PHASE);
if (rctx->resp_buffering) {
rc = ngx_http_wasm_body_filter_buffer(rctx, in);
switch (rc) {
case NGX_ERROR:
goto done;
case NGX_OK:
/* chunk in buffers */
ngx_wasm_assert(rctx->resp_bufs);

if (!rctx->resp_chunk_eof) {
/* more to come; go again */
rc = NGX_AGAIN;
goto done;
}

dd("eof, resume");
rctx->resp_buffering = 0;
ngx_http_wasm_body_filter_resume(rctx, rctx->resp_bufs);
break;
case NGX_DONE:
dd("buffers full, resume");
rctx->resp_buffering = 0;
ngx_http_wasm_body_filter_resume(rctx, rctx->resp_bufs);
break;
default:
ngx_wasm_assert(0);
break;
}
}

rc = ngx_http_next_body_filter(r, rctx->resp_chunk);
dd("ngx_http_next_body_filter rc: %ld", rc);
Expand All @@ -159,9 +202,12 @@ ngx_http_wasm_body_filter_handler(ngx_http_request_t *r, ngx_chain_t *in)

rctx->resp_chunk = NULL;

ngx_chain_update_chains(r->connection->pool,
ngx_wasm_chain_log_debug(r->connection->log, rctx->resp_chunk,
"rctx->resp_chunk");

ngx_chain_update_chains(rctx->pool,
&rctx->free_bufs, &rctx->busy_bufs,
&rctx->resp_chunk, buf_tag);
&rctx->resp_chunk, rctx->env.buf_tag);

#ifdef NGX_WASM_RESPONSE_TRAILERS
if (rctx->resp_chunk_eof && r->parent == NULL) {
Expand All @@ -179,3 +225,167 @@ ngx_http_wasm_body_filter_handler(ngx_http_request_t *r, ngx_chain_t *in)

return rc;
}


static void
ngx_http_wasm_body_filter_resume(ngx_http_wasm_req_ctx_t *rctx, ngx_chain_t *in)
{
ngx_int_t rc;
ngx_chain_t *cl;

rctx->resp_chunk = in;
rctx->resp_chunk_len = 0;

for (cl = rctx->resp_chunk; cl; cl = cl->next) {
rctx->resp_chunk_len += ngx_buf_size(cl->buf);

if (cl->buf->last_buf) {
rctx->resp_chunk_eof = 1;
break;

} else if (cl->buf->last_in_chain) {
if (rctx->r != rctx->r->main) {
rctx->resp_chunk_eof = 1;
}

break;
}
}

if (!rctx->resp_buffering) {
rc = ngx_wasm_ops_resume(&rctx->opctx,
NGX_HTTP_WASM_BODY_FILTER_PHASE);
if (rc == NGX_ERROR || rc > NGX_OK) {
return;
}

if (rc == NGX_AGAIN) {
ngx_wasm_assert(rctx->resp_bufs == NULL);
rctx->resp_buffering = 1;
}
}
}


static ngx_int_t
ngx_http_wasm_body_filter_buffer(ngx_http_wasm_req_ctx_t *rctx, ngx_chain_t *in)
{
off_t n, avail, copy;
ngx_buf_t *b;
ngx_chain_t *cl, *ll, *rl;
ngx_http_request_t *r = rctx->r;
ngx_http_wasm_loc_conf_t *loc;

dd("enter");

ngx_wasm_assert(rctx->resp_buffering);

cl = rctx->resp_buf_last;
loc = ngx_http_get_module_loc_conf(r, ngx_http_wasm_module);

for (ll = in; ll; ll = ll->next) {

n = ngx_buf_size(ll->buf);
dd("n: %ld", n);

if (n == 0) {
if (ll->buf->last_in_chain || ll->buf->last_buf) {
if (rctx->resp_bufs == NULL) {
rctx->resp_bufs = ll;

} else {
rctx->resp_buf_last->next = ll;
rctx->resp_buf_last = ll;
}
}
}

while (n) {
if (cl == NULL) {
if (rctx->resp_bufs_count >= loc->resp_body_buffers.num) {
if (rctx->resp_buf_last) {
for (rl = ll; rl; rl = rl->next) {
if (ngx_buf_size(rl->buf)) {
rctx->resp_buf_last->next = rl;
rctx->resp_buf_last = rl;
}
}
}

#if (DDEBUG)
ngx_wasm_chain_log_debug(r->connection->log,
rctx->resp_bufs,
"response buffers: ");
#endif

return NGX_DONE;
}

b = ngx_create_temp_buf(rctx->pool,
loc->resp_body_buffers.size);
if (b == NULL) {
return NGX_ERROR;
}

cl = ngx_alloc_chain_link(rctx->pool);
if (cl == NULL) {
return NGX_ERROR;
}

cl->buf = b;
cl->buf->pos = cl->buf->start;
cl->buf->last = cl->buf->pos;
cl->buf->tag = rctx->env.buf_tag;
cl->buf->memory = 1;
cl->next = NULL;

if (rctx->resp_bufs == NULL) {
rctx->resp_bufs = cl;

} else if (rctx->resp_buf_last) {
rctx->resp_buf_last->next = cl;
}

rctx->resp_buf_last = cl;
rctx->resp_bufs_count++;
}

avail = cl->buf->end - cl->buf->last;
copy = ngx_min(n, avail);
dd("avail: %ld, copy: %ld", avail, copy);
if (copy == 0) {
cl = NULL;
continue;
}

ngx_memcpy(cl->buf->last, ll->buf->pos, copy);

ll->buf->pos += copy;
cl->buf->last += copy;

cl->buf->flush = ll->buf->flush;
cl->buf->last_buf = ll->buf->last_buf;
cl->buf->last_in_chain = ll->buf->last_in_chain;

dd("f: %d, l: %d, lic: %d", ll->buf->flush,
ll->buf->last_buf, ll->buf->last_in_chain);

if (copy < n) {
/* more to copy, next buffer */
ngx_wasm_assert(cl->buf->last == cl->buf->end);
rctx->resp_buf_last = cl;
cl = NULL;
}

n -= copy;
dd("next n: %ld", n);
}
}

#if (DDEBUG)
ngx_wasm_chain_log_debug(r->connection->log, rctx->resp_bufs,
"response buffers: ");
#endif

return NGX_OK;
}
11 changes: 11 additions & 0 deletions src/http/ngx_http_wasm_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ static ngx_command_t ngx_http_wasm_module_cmds[] = {
offsetof(ngx_http_wasm_loc_conf_t, socket_large_buffers),
NULL },

{ ngx_string("wasm_response_body_buffers"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE2,
ngx_conf_set_bufs_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_wasm_loc_conf_t, resp_body_buffers),
NULL },

{ ngx_string("proxy_wasm"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE12,
ngx_http_wasm_proxy_wasm_directive,
Expand Down Expand Up @@ -349,6 +356,10 @@ ngx_http_wasm_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
prev->socket_large_buffers,
NGX_WASM_DEFAULT_SOCK_LARGE_BUF_NUM,
NGX_WASM_DEFAULT_SOCK_LARGE_BUF_SIZE);
ngx_conf_merge_bufs_value(conf->resp_body_buffers,
prev->resp_body_buffers,
NGX_WASM_DEFAULT_RESP_BODY_BUF_NUM,
NGX_WASM_DEFAULT_RESP_BODY_BUF_SIZE);
#if (NGX_DEBUG)
ngx_conf_merge_value(conf->socket_buffer_reuse,
prev->socket_buffer_reuse, 1);
Expand Down
12 changes: 12 additions & 0 deletions src/http/proxy_wasm/ngx_http_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,18 @@ ngx_http_proxy_wasm_on_response_body(ngx_proxy_wasm_exec_t *pwexec,

*out = rets->data[0].of.i32;

if (*out == NGX_PROXY_WASM_ACTION_PAUSE && rctx->resp_bufs) {
/**
* If 'on_response_body' has been called again, buffering is over.
* The buffers could be full, or eof has been reached.
*/
ngx_proxy_wasm_log_error(NGX_LOG_ERR, pwexec->log, 0,
"invalid \"on_response_body\" return "
"value (PAUSE): response body buffering "
"already requested");
*out = NGX_PROXY_WASM_ACTION_CONTINUE;
}

return rc;
}

Expand Down
3 changes: 3 additions & 0 deletions src/wasm/ngx_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
#define NGX_WASM_DEFAULT_SOCK_BUF_SIZE 1024
#define NGX_WASM_DEFAULT_SOCK_LARGE_BUF_NUM 4
#define NGX_WASM_DEFAULT_SOCK_LARGE_BUF_SIZE 8192
#define NGX_WASM_DEFAULT_RESP_BODY_BUF_NUM 4
#define NGX_WASM_DEFAULT_RESP_BODY_BUF_SIZE 4096

#define ngx_wasm_core_cycle_get_conf(cycle) \
(ngx_get_conf(cycle->conf_ctx, ngx_wasm_module)) \
Expand Down Expand Up @@ -109,6 +111,7 @@ typedef struct {


ngx_wavm_t *ngx_wasm_main_vm(ngx_cycle_t *cycle);
void ngx_wasm_chain_log_debug(ngx_log_t *log, ngx_chain_t *in, char *label);
size_t ngx_wasm_chain_len(ngx_chain_t *in, unsigned *eof);
ngx_uint_t ngx_wasm_chain_clear(ngx_chain_t *in, size_t offset, unsigned *eof,
unsigned *flush);
Expand Down
Loading

0 comments on commit 77b8661

Please sign in to comment.