From 8fd1fcb15d8890408bdc1ac15df2c35eb7bc662c Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Tue, 26 Mar 2024 11:34:29 -0300 Subject: [PATCH] refactor(lua-bridge) rewrite for full yielding support Major refactor of the Lua bridge to support multiple concurrent yielding Lua threads. The old implementation would break down when scheduling more than one yielding Lua thread at a time. The new implementation "tricks" OpenResty by scheduling uthreads via C and passing these threads to the OpenResty runloop as if they were created from Lua (via `ngx.thread`). Because all uthreads must resume their "parent thread" when finished (as per OpenResty's implementation), we schedule a stub "entry thread" whenever we are trying to use the Lua bridge. This entry thread itself does nothing and is collected at request pool cleanup. List of significant changes for this refactor: - **Breaking:** the `proxy_wasm.start()` FFI function is **removed**. Only `proxy_wasm.attach()` is now necessary, and the filter chain is only resumed once the ngx_http_wasm_module `rewrite` or `access` phases are entered. Prior, `proxy_wasm.start()` would resume the filter chain during the ngx_http_lua_module phase handlers, which was incompatible with Lua threads yielding. - In ngx_wasm_socket_tcp, the `sock->env` member is now a pointer to the request's `env` instead of a copy so as to manipulate the `env->state` control variable. - The `wasm_call` directive can now yield, which allows for sanity testing of the Lua bridge yielding functionality. - A new `rctx->resume_handler` pointer holds the resume entry point back from yielding facilities into `ngx_http_core_run_phases`. For now, only the Lua bridge uses it, but other yielding facilities should be refactored to use it so as to factorize our resuming code. Fix #524 --- .../actions/setup-httpbin-server/action.yml | 3 +- .github/workflows/job-unit-tests.yml | 2 +- lib/resty/wasmx/proxy_wasm.lua | 35 +- src/common/lua/ngx_wasm_lua.c | 738 +++++++++++------- src/common/lua/ngx_wasm_lua.h | 13 +- src/common/lua/ngx_wasm_lua_ffi.c | 57 +- src/common/lua/ngx_wasm_lua_ffi.h | 1 - src/common/lua/ngx_wasm_lua_resolver.c | 28 +- src/common/ngx_wasm_socket_tcp.c | 54 +- src/common/ngx_wasm_socket_tcp.h | 2 +- src/common/ngx_wasm_socket_tcp_readers.c | 4 +- src/common/ngx_wasm_subsystem.h | 7 + src/common/proxy_wasm/ngx_proxy_wasm.c | 21 +- src/http/ngx_http_wasm.h | 5 +- src/http/ngx_http_wasm_filter_module.c | 9 + src/http/ngx_http_wasm_module.c | 103 ++- .../proxy_wasm/ngx_http_proxy_wasm_dispatch.c | 12 +- src/wasm/ngx_wasm.h | 5 +- src/wasm/ngx_wasm_core_host.c | 90 ++- src/wasm/ngx_wasm_ops.c | 50 +- src/wasm/ngx_wasm_ops.h | 4 + .../133-proxy_dispatch_http_edge_cases.t | 6 +- t/04-openresty/ffi/101-proxy_wasm_load.t | 14 +- t/04-openresty/ffi/102-proxy_wasm_start.t | 235 ------ t/04-openresty/ffi/103-proxy_wasm_attach.t | 26 +- .../ffi/200-proxy_wasm_and_lua_sanity.t | 10 - .../ffi/300-proxy_wasm_properties_get_ngx.t | 20 +- .../ffi/301-proxy_wasm_properties_get_host.t | 26 +- .../ffi/302-proxy_wasm_properties_set_ngx.t | 12 +- .../ffi/303-proxy_wasm_properties_set_host.t | 16 +- .../305-proxy_wasm_host_properties_getter.t | 51 +- .../306-proxy_wasm_host_properties_setter.t | 42 +- t/04-openresty/lua-bridge/001-sanity.t | 59 ++ .../002-proxy_wasm_lua_resolver_sanity.t | 431 ++++++++-- .../003-proxy_wasm_lua_resolver_timeouts.t | 4 +- t/lib/ngx-lua-tests/src/lua_bridge.rs | 12 + .../proxy-wasm-tests/hostcalls/src/filter.rs | 4 +- t/lib/proxy-wasm-tests/hostcalls/src/lib.rs | 1 - .../hostcalls/src/types/test_http.rs | 14 +- 39 files changed, 1367 insertions(+), 859 deletions(-) delete mode 100644 t/04-openresty/ffi/102-proxy_wasm_start.t diff --git a/.github/actions/setup-httpbin-server/action.yml b/.github/actions/setup-httpbin-server/action.yml index f2210ce39..18ff213d1 100644 --- a/.github/actions/setup-httpbin-server/action.yml +++ b/.github/actions/setup-httpbin-server/action.yml @@ -76,7 +76,8 @@ runs: --no-resolv \ --port=${{ inputs.dns_port }} \ --server=${{ inputs.upstream_dns_server }} \ - --address=/httpbin.org/127.0.0.1 + --address=/httpbin.org/127.0.0.1 \ + --address=/example.com/127.0.0.1 - name: Start httpbin proxy + server shell: bash run: | diff --git a/.github/workflows/job-unit-tests.yml b/.github/workflows/job-unit-tests.yml index ed97be703..7f8f6a1ce 100644 --- a/.github/workflows/job-unit-tests.yml +++ b/.github/workflows/job-unit-tests.yml @@ -124,7 +124,7 @@ jobs: sudo bash -c 'echo "${{ github.workspace }}/coredumps/%e.%p.%t" > /proc/sys/kernel/core_pattern' - run: make setup - run: make - - name: Run make test + - name: Run tests run: | ulimit -c unlimited make test diff --git a/lib/resty/wasmx/proxy_wasm.lua b/lib/resty/wasmx/proxy_wasm.lua index 719e20c8e..523a449a3 100644 --- a/lib/resty/wasmx/proxy_wasm.lua +++ b/lib/resty/wasmx/proxy_wasm.lua @@ -53,7 +53,6 @@ if subsystem == "http" then int ngx_http_wasm_ffi_plan_attach(ngx_http_request_t *r, ngx_wasm_plan_t *plan, unsigned int isolation); - int ngx_http_wasm_ffi_start(ngx_http_request_t *r); int ngx_http_wasm_ffi_set_property(ngx_http_request_t *r, ngx_str_t *key, ngx_str_t *value, @@ -160,6 +159,16 @@ function _M.load(c_plan) return nil, "failed loading plan" end + if get_request() then + -- ffi_gc: hold a reference tied to the request lifecycle so users + -- don't have to (like our test suite). + if not ngx.ctx[_M] then + ngx.ctx[_M] = {} + end + + ngx.ctx[_M][c_plan] = true + end + return true end @@ -215,30 +224,6 @@ function _M.attach(c_plan, opts) end -function _M.start() - local phase = ngx.get_phase() - if phase ~= "rewrite" and phase ~= "access" then - error("start must be called from 'rewrite' or 'access' phase", 2) - end - - local r = get_request() - if not r then - error("no request found", 2) - end - - local rc = C.ngx_http_wasm_ffi_start(r) - if rc == FFI_ERROR then - return nil, "unknown error" - end - - if rc == FFI_DECLINED then - return nil, "plan not loaded and attached" - end - - return true -end - - function _M.set_property(key, value) if type(key) ~= "string" then error("key must be a string", 2) diff --git a/src/common/lua/ngx_wasm_lua.c b/src/common/lua/ngx_wasm_lua.c index cbe60356d..09bd77631 100644 --- a/src/common/lua/ngx_wasm_lua.c +++ b/src/common/lua/ngx_wasm_lua.c @@ -6,45 +6,140 @@ #include #if (NGX_WASM_HTTP) #include +#include #endif +#if (NGX_WASM_STREAM) +#include +#endif + + +static ngx_int_t ngx_http_wasm_lua_resume_handler(ngx_http_request_t *r); + + +static const char *WASM_LUA_ENTRY_SCRIPT_NAME = "wasm_lua_entry_chunk"; +static const char *WASM_LUA_ENTRY_SCRIPT = "" + "while true do\n" +#if (DDEBUG) + " ngx.log(ngx.DEBUG, 'entry lua thread waking up')\n" +#endif +#if (NGX_DEBUG) + " ngx.sleep(0.1)\n" /* greater than 0, must not be a delayed event */ +#else + " ngx.sleep(30)\n" +#endif + "end\n"; + + +static void +destroy_thread(ngx_wasm_lua_ctx_t *lctx) +{ + ngx_log_debug2(NGX_LOG_DEBUG_WASM, ngx_cycle->log, 0, + "wasm freeing lua%sthread (lctx: %p)", + lctx->entry ? " entry " : " user ", lctx); + + ngx_pfree(lctx->pool, lctx->cache_key); + ngx_pfree(lctx->pool, lctx); +} + + +static ngx_inline unsigned +entry_thread_empty(ngx_wasm_subsys_env_t *env) +{ + ngx_wasm_lua_ctx_t *entry_lctx = env->entry_lctx; + + if (entry_lctx == NULL) { + return 1; + } + + return ngx_queue_empty(&entry_lctx->sub_ctxs); +} + + +static void +entry_thread_cleanup_handler(void *data) +{ + ngx_wasm_lua_ctx_t *lctx = data; + + dd("enter"); + + if (lctx->entry && lctx->ev && lctx->ev->timer_set) { + dd("delete pending timer event"); + ngx_event_del_timer(lctx->ev); + } + + destroy_thread(lctx); +} + + +static ngx_int_t +entry_thread_start(ngx_wasm_subsys_env_t *env) +{ + ngx_int_t rc; + ngx_wasm_lua_ctx_t *entry_lctx = env->entry_lctx; + ngx_http_wasm_req_ctx_t *rctx = env->ctx.rctx; + ngx_http_request_t *r = rctx->r; + ngx_http_lua_ctx_t *ctx; + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + ctx = ngx_http_lua_create_ctx(r); + if (ctx == NULL) { + return NGX_ERROR; + } + } + + if (entry_lctx == NULL) { + /** + * In OpenResty, all uthreads *must* be attached to a parent coroutine, + * so we create a "fake" one simulating a *_by_lua_block context. + */ + dd("creating entry thread"); + + entry_lctx = ngx_wasm_lua_thread_new(WASM_LUA_ENTRY_SCRIPT_NAME, + WASM_LUA_ENTRY_SCRIPT, + env, env->connection->log, + NULL, NULL, NULL); + if (entry_lctx == NULL) { + return NGX_ERROR; + } + + env->entry_lctx = entry_lctx; + + rc = ngx_wasm_lua_thread_run(entry_lctx); + ngx_wa_assert(rc != NGX_OK && rc != NGX_DONE); + /* NGX_ERROR, NGX_AGAIN */ + if (rc == NGX_ERROR) { + return NGX_ERROR; + } + } + + return NGX_OK; +} static ngx_inline unsigned -ngx_wasm_lua_thread_is_dead(ngx_wasm_lua_ctx_t *lctx) +thread_is_dead(ngx_wasm_lua_ctx_t *lctx) { - ngx_wasm_subsys_env_t *env = &lctx->env; + ngx_wasm_subsys_env_t *env = lctx->env; switch (env->subsys->kind) { #if (NGX_WASM_HTTP) case NGX_WASM_SUBSYS_HTTP: - if (lctx->co_ctx->co_status == NGX_HTTP_LUA_CO_DEAD) { - return 1; - } - - break; + return !ngx_http_lua_coroutine_alive(lctx->co_ctx); #endif #if (NGX_WASM_STREAM) case NGX_WASM_SUBSYS_STREAM: - if (lctx->co_ctx->co_status == NGX_STREAM_LUA_CO_DEAD) { - return 1; - } - - break; + return !ngx_stream_lua_coroutine_alive(lctx->co_ctx); #endif default: - ngx_wasm_log_error(NGX_LOG_WASM_NYI, lctx->log, 0, - "NYI - subsystem kind: %d", - env->subsys->kind); - ngx_wa_assert(0); - break; + ngx_wasm_bad_subsystem(env); + return 0; } - - return 0; } static u_char * -ngx_wasm_lua_thread_cache_key(ngx_pool_t *pool, const char *tag, u_char *src, +thread_cache_key(ngx_pool_t *pool, const char *tag, u_char *src, size_t src_len) { size_t tag_len; @@ -59,6 +154,7 @@ ngx_wasm_lua_thread_cache_key(ngx_pool_t *pool, const char *tag, u_char *src, p = ngx_copy(out, tag, tag_len); + /* both subsystems produce an identical md5 hash, use any */ #if (NGX_WASM_HTTP) p = ngx_http_lua_digest_hex(p, src, src_len); #elif (NGX_WASM_STREAM) @@ -75,41 +171,229 @@ ngx_wasm_lua_thread_cache_key(ngx_pool_t *pool, const char *tag, u_char *src, } -static void -ngx_wasm_lua_thread_destroy(ngx_wasm_lua_ctx_t *lctx) +static ngx_int_t +thread_init(ngx_wasm_lua_ctx_t *lctx) { - ngx_wasm_subsys_env_t *env = &lctx->env; - - dd("enter"); - - ngx_wa_assert(env); + ngx_wasm_subsys_env_t *env = lctx->env; switch (env->subsys->kind) { #if (NGX_WASM_HTTP) case NGX_WASM_SUBSYS_HTTP: { ngx_http_wasm_req_ctx_t *rctx = env->ctx.rctx; - ngx_http_lua_ctx_t *ctx = lctx->ctx.rlctx; + ngx_http_request_t *r = rctx->r; + ngx_http_lua_ctx_t *ctx; + ngx_http_lua_co_ctx_t *coctx; + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + /* existing or created by entry_thread_start */ + ngx_wa_assert(ctx); + /* preserve ngx_wasm_lua_content_wev_handler */ + ngx_wa_assert(!ctx->entered_content_phase); + + lctx->ctx.rlctx = ctx; + + /* coctx */ + + if (!ngx_http_lua_entry_thread_alive(ctx)) { + /* initializing the fake entry_ctx */ + ngx_wa_assert(lctx->entry); - rctx->wasm_lua_ctx = NULL; + ctx->context = NGX_HTTP_LUA_CONTEXT_TIMER; + coctx = &ctx->entry_co_ctx; - if (ctx) { - /* prevent ngx_http_lua_run_thread from running the - * 'done' label, which sends the last_buf chain link */ - ctx->entered_content_phase = 0; + } else { + coctx = ngx_http_lua_create_co_ctx(r, ctx); + if (coctx == NULL) { + return NGX_ERROR; + } } + coctx->co = lctx->co; + coctx->co_ref = lctx->co_ref; + coctx->co_status = NGX_HTTP_LUA_CO_RUNNING; + coctx->is_uthread = 1; +#if (NGX_LUA_USE_ASSERT) + coctx->co_top = 1; +#endif + + if (!lctx->entry) { + coctx->parent_co_ctx = &ctx->entry_co_ctx; + } + + lctx->co_ctx = coctx; + + ngx_http_lua_set_req(lctx->co, r); + ngx_http_lua_attach_co_ctx_to_L(lctx->co, lctx->co_ctx); break; } #endif +#if (NGX_WASM_STREAM) + case NGX_WASM_SUBSYS_STREAM: + break; +#endif + default: + ngx_wasm_bad_subsystem(env); + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_inline ngx_int_t +thread_handle_rc(ngx_wasm_lua_ctx_t *lctx, ngx_int_t rc) +{ + ngx_event_t *ev; + ngx_rbtree_node_t *node, *root, *sentinel; + ngx_wasm_subsys_env_t *env = lctx->env; + ngx_wasm_lua_ctx_t *entry_lctx = env->entry_lctx; + + dd("enter (rc: %ld, lctx: %p)", rc, lctx); + + if (!thread_is_dead(lctx)) { + if (rc == NGX_DONE) { + /** + * The ctx->resume_handler can return NGX_DONE when the thread + * remains in a yielding state, because some Nginx internals expect + * NGX_DONE while yielding; we confidently override it since the + * thread is not dead. + */ + rc = NGX_AGAIN; + } + + } else { + /* thread is dead, determine state by checking its return value placed + * on the stack by OpenResty's uthread implementation */ + ngx_wa_assert(lua_isboolean(lctx->co, 1)); + + lctx->co_ctx->co_status = NGX_HTTP_LUA_CO_DEAD; + + rc = lua_toboolean(lctx->co, 1) + ? NGX_OK /* thread terminated successfully */ + : NGX_ERROR; /* thread error */ + } + + dd("rc at switch: %ld", rc); + + switch (rc) { + case NGX_AGAIN: + dd("wasm lua thread yield"); + ngx_wa_assert(lctx->yielded); + + if (lctx->entry) { + /* find the pending sleep timer to cancel at pool cleanup */ + sentinel = ngx_event_timer_rbtree.sentinel; + root = ngx_event_timer_rbtree.root; + + if (root != sentinel) { + for (node = ngx_rbtree_min(root, sentinel); + node; + node = ngx_rbtree_next(&ngx_event_timer_rbtree, node)) + { + ev = ngx_rbtree_data(node, ngx_event_t, timer); + + if (ev->data == entry_lctx->co_ctx) { + entry_lctx->ev = ev; + break; + } + } + } + } + + ngx_wasm_yield(env); + break; + case NGX_OK: + dd("wasm lua thread finished"); + ngx_wa_assert(thread_is_dead(lctx)); + + if (!lctx->entry) { + lctx->finished = 1; + ngx_queue_remove(&lctx->q); + + if (entry_thread_empty(env)) { + /* last yielding thread finished */ + ngx_wasm_continue(env); + } + + if (lctx->success_handler) { + (void) lctx->success_handler(lctx); + } + } + + break; + case NGX_ERROR: + dd("wasm lua thread error"); + ngx_wasm_error(env); + + if (!lctx->entry) { + lctx->finished = 1; + ngx_queue_remove(&lctx->q); + + if (lctx->error_handler) { + (void) lctx->error_handler(lctx); + } + } + + break; default: + ngx_wasm_log_error(NGX_LOG_WASM_NYI, lctx->log, 0, + "unexpected lua resume handler rc: %d", rc); ngx_wa_assert(0); + rc = NGX_ERROR; break; } - ngx_pfree(lctx->pool, lctx->cache_key); + ngx_wa_assert(rc == NGX_OK + || rc == NGX_AGAIN + || rc == NGX_ERROR); - ngx_destroy_pool(lctx->pool); + if (lctx->finished) { + destroy_thread(lctx); + } + + return rc; +} + + +static ngx_int_t +thread_resume(ngx_wasm_lua_ctx_t *lctx) +{ + ngx_int_t rc = NGX_ERROR; + ngx_wasm_subsys_env_t *env = lctx->env; + + ngx_log_debug4(NGX_LOG_DEBUG_WASM, lctx->log, 0, + "wasm resuming lua%sthread " + "(lctx: %p, L: %p, co: %p)", + lctx->entry ? " entry " : " user ", + lctx, lctx->L, lctx->co); + + switch (env->subsys->kind) { +#if (NGX_WASM_HTTP) + case NGX_WASM_SUBSYS_HTTP: + { + ngx_http_request_t *r = env->ctx.rctx->r; + ngx_http_lua_ctx_t *ctx = lctx->ctx.rlctx; + + ngx_wa_assert(ctx == ngx_http_get_module_ctx(r, ngx_http_lua_module)); + + rc = ctx->resume_handler(r); + break; + } +#endif +#if (NGX_WASM_STREAM) + case NGX_WASM_SUBSYS_STREAM: + break; +#endif + default: + ngx_wasm_bad_subsystem(env); + return NGX_ERROR; + } + + dd("lua%sthread resume handler rc: %ld", + lctx->entry ? " entry " : " user ", rc); + + return thread_handle_rc(lctx, rc); } @@ -121,29 +405,41 @@ ngx_wasm_lua_thread_new(const char *tag, const char *src, { ngx_int_t rc; ngx_pool_t *pool; + ngx_pool_cleanup_t *cln = NULL; ngx_wasm_lua_ctx_t *lctx; - /* pool */ - - pool = ngx_create_pool(512, log); - if (pool == NULL) { - return NULL; - } + pool = env->ctx.rctx->r->pool; /* lctx */ lctx = ngx_pcalloc(pool, sizeof(ngx_wasm_lua_ctx_t)); if (lctx == NULL) { - goto error; + return NULL; } lctx->pool = pool; lctx->log = log; + lctx->env = env; lctx->data = data; lctx->success_handler = success_handler; lctx->error_handler = error_handler; - ngx_memcpy(&lctx->env, env, sizeof(ngx_wasm_subsys_env_t)); + if (src == WASM_LUA_ENTRY_SCRIPT) { + lctx->entry = 1; + ngx_queue_init(&lctx->sub_ctxs); + + cln = ngx_pool_cleanup_add(lctx->pool, 0); + if (cln == NULL) { + goto error; + } + + cln->handler = entry_thread_cleanup_handler; + cln->data = lctx; + } + + ngx_log_debug2(NGX_LOG_DEBUG_WASM, lctx->log, 0, + "wasm creating new lua%sthread (lctx: %p)", + lctx->entry ? " entry " : " user ", lctx); /* Lua VM + thread */ @@ -151,44 +447,32 @@ ngx_wasm_lua_thread_new(const char *tag, const char *src, #if (NGX_WASM_HTTP) case NGX_WASM_SUBSYS_HTTP: { - ngx_http_request_t *r = env->ctx.rctx->r; + ngx_http_request_t *r = env->ctx.rctx->r; lctx->L = ngx_http_lua_get_lua_vm(r, NULL); lctx->co = ngx_http_lua_new_thread(r, lctx->L, &lctx->co_ref); break; } #endif -#if (0 && NGX_WASM_STREAM) +#if (NGX_WASM_STREAM) case NGX_WASM_SUBSYS_STREAM: - { - /* TODO: get stream lua r */ - ngx_stream_lua_request_t *sr = NULL; - - lctx->L = ngx_stream_lua_get_lua_vm(sr, NULL); - lctx->co = ngx_stream_lua_new_thread(sr, lctx->L, lctx->co_ref); break; - } #endif default: - ngx_wasm_log_error(NGX_LOG_WASM_NYI, lctx->log, 0, - "NYI - subsystem kind: %d", - env->subsys->kind); - ngx_wa_assert(0); + ngx_wasm_bad_subsystem(env); goto error; } if (lctx->L == NULL || lctx->co == NULL) { goto error; } - /* code */ lctx->code = src; lctx->code_len = ngx_strlen(src); - lctx->cache_key = ngx_wasm_lua_thread_cache_key(lctx->pool, - tag, - (u_char *) lctx->code, - lctx->code_len); + lctx->cache_key = thread_cache_key(lctx->pool, tag, + (u_char *) lctx->code, + lctx->code_len); if (lctx->cache_key == NULL) { goto error; } @@ -218,7 +502,7 @@ ngx_wasm_lua_thread_new(const char *tag, const char *src, break; #endif default: - ngx_wa_assert(0); + ngx_wasm_bad_subsystem(env); goto error; } @@ -233,277 +517,201 @@ ngx_wasm_lua_thread_new(const char *tag, const char *src, error: - ngx_wasm_lua_thread_destroy(lctx); + if (cln == NULL) { + destroy_thread(lctx); + } return NULL; } -static ngx_int_t -ngx_wasm_lua_thread_init(ngx_wasm_lua_ctx_t *lctx) +/** + * Return values: + * NGX_OK: lua thread finished + * NGX_AGAIN: lua thread yielding + * NGX_ERROR: error + */ +ngx_int_t +ngx_wasm_lua_thread_run(ngx_wasm_lua_ctx_t *lctx) { - ngx_wasm_subsys_env_t *env = &lctx->env; + ngx_int_t rc = NGX_ERROR; + ngx_wasm_subsys_env_t *env = lctx->env; + ngx_wasm_lua_ctx_t *entry_lctx; + + ngx_log_debug4(NGX_LOG_DEBUG_WASM, lctx->log, 0, + "wasm running lua%sthread (lctx: %p, L: %p, co: %p)", + lctx->entry ? " entry " : " user ", + lctx, lctx->L, lctx->co); + + if (env->entry_lctx == NULL) { + rc = entry_thread_start(env); + if (rc != NGX_OK) { + return NGX_ERROR; + } + } + + entry_lctx = env->entry_lctx; + + if (thread_init(lctx) != NGX_OK) { + return NGX_ERROR; + } + + /* lua ctx */ switch (env->subsys->kind) { #if (NGX_WASM_HTTP) case NGX_WASM_SUBSYS_HTTP: { - ngx_http_wasm_req_ctx_t *rctx = env->ctx.rctx; - ngx_http_request_t *r = rctx->r; - ngx_http_lua_ctx_t *ctx; + ngx_http_request_t *r = env->ctx.rctx->r; + ngx_http_lua_ctx_t *ctx = lctx->ctx.rlctx; + ngx_http_wasm_req_ctx_t *rctx; - ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); - if (ctx == NULL) { - ctx = ngx_http_lua_create_ctx(r); - if (ctx == NULL) { - return NGX_ERROR; - } + ctx->cur_co_ctx = lctx->co_ctx; - } else { - ngx_http_lua_reset_ctx(r, lctx->L, ctx); + if (!lctx->entry) { + ctx->uthreads++; } - /* preserve ngx_wasm_lua_content_wev_handler */ - ngx_wa_assert(!ctx->entered_content_phase); + rc = ngx_http_lua_run_thread(lctx->L, r, ctx, lctx->nargs); + if (rc == NGX_AGAIN) { + rctx = ngx_http_get_module_ctx(r, ngx_http_wasm_module); + rctx->resume_handler = ngx_http_wasm_lua_resume_handler; + } - lctx->ctx.rlctx = ctx; break; } #endif -#if (0 && NGX_WASM_STREAM) +#if (NGX_WASM_STREAM) case NGX_WASM_SUBSYS_STREAM: - { - /* TODO: get stream lua r */ - ngx_stream_lua_request_t *r = NULL; - ngx_stream_lua_ctx_t *ctx; - - ctx = ngx_stream_get_module_ctx(r, ngx_stream_lua_module); - if (ctx == NULL) { - ctx = ngx_stream_lua_create_ctx(r); - if (ctx == NULL) { - return NGX_ERROR; - } - - } else { - ngx_stream_lua_reset_ctx(r, lctx->L, ctx); - } - - lctx->ctx.slctx = ctx; break; - } #endif default: - ngx_wa_assert(0); + ngx_wasm_bad_subsystem(env); return NGX_ERROR; } - ngx_wasm_set_resume_handler(env); - - return NGX_OK; -} + dd("lua_run_thread rc: %ld", rc); - -static ngx_inline ngx_int_t -ngx_wasm_lua_thread_handle_rc(ngx_wasm_lua_ctx_t *lctx, ngx_int_t rc) -{ - ngx_wasm_subsys_env_t *env = &lctx->env; - -#if (NGX_WASM_HTTP) - if (rc == NGX_HTTP_INTERNAL_SERVER_ERROR) { - rc = NGX_ERROR; + if (rc == NGX_AGAIN && !lua_isboolean(lctx->co, 1)) { + lctx->yielded = 1; } -#endif - - switch (rc) { - case NGX_DONE: - rc = NGX_AGAIN; - /* fallthrough */ - case NGX_AGAIN: - if (!ngx_wasm_lua_thread_is_dead(lctx)) { - lctx->yielded = 1; - ngx_wasm_set_resume_handler(env); - } - - break; - case NGX_OK: - ngx_wa_assert(ngx_wasm_lua_thread_is_dead(lctx)); - - if (lctx->success_handler) { -#if (DDEBUG) - rc = lctx->success_handler(lctx); - dd("lua success handler rc: %ld", rc); -#else - (void) lctx->success_handler(lctx); -#endif - } - rc = NGX_DONE; - ngx_wasm_lua_thread_destroy(lctx); - break; - case NGX_ERROR: - if (lctx->error_handler) { - (void) lctx->error_handler(lctx); - } - - ngx_wasm_lua_thread_destroy(lctx); - break; - default: - ngx_wasm_log_error(NGX_LOG_WASM_NYI, lctx->log, 0, - "NYI - lua resume handler rc: %d", rc); - ngx_wa_assert(0); - rc = NGX_ERROR; - break; + if (entry_lctx && !lctx->entry) { + ngx_queue_insert_tail(&entry_lctx->sub_ctxs, &lctx->q); } - ngx_wa_assert(rc == NGX_DONE - || rc == NGX_AGAIN - || rc == NGX_ERROR); - - return rc; + return thread_handle_rc(lctx, rc); } /** * Return values: - * NGX_DONE: lua thread terminated - * NGX_AGAIN: lua thread yielded + * NGX_OK: no lua thread to run + * NGX_AGAIN: lua thread yielding * NGX_ERROR: error */ ngx_int_t -ngx_wasm_lua_thread_run(ngx_wasm_lua_ctx_t *lctx) +ngx_wasm_lua_resume(ngx_wasm_subsys_env_t *env) { ngx_int_t rc; - ngx_wasm_subsys_env_t *env = &lctx->env; + ngx_queue_t *q; + ngx_http_lua_ctx_t *ctx; + ngx_http_lua_co_ctx_t *coctx; + ngx_wasm_lua_ctx_t *lctx = NULL; + ngx_wasm_lua_ctx_t *entry_lctx = env->entry_lctx; - ngx_log_debug3(NGX_LOG_DEBUG_WASM, lctx->log, 0, - "wasm running lua thread " - "(lctx: %p, L: %p, co: %p)", - lctx, lctx->L, lctx->co); + dd("enter"); - if (ngx_wasm_lua_thread_init(lctx) != NGX_OK) { - goto error; + if (entry_thread_empty(env)) { + return NGX_OK; } - /* lua ctx */ + ctx = ngx_http_get_module_ctx(env->ctx.rctx->r, ngx_http_lua_module); + if (ctx == NULL) { + return NGX_ERROR; + } - switch (env->subsys->kind) { -#if (NGX_WASM_HTTP) - case NGX_WASM_SUBSYS_HTTP: - { - ngx_http_request_t *r = env->ctx.rctx->r; - ngx_http_lua_ctx_t *ctx = lctx->ctx.rlctx; + coctx = ctx->cur_co_ctx; + if (coctx == NULL) { + return NGX_OK; + } - ctx->context = NGX_HTTP_LUA_CONTEXT_TIMER; - ctx->cur_co_ctx = &ctx->entry_co_ctx; - ctx->cur_co_ctx->co = lctx->co; - ctx->cur_co_ctx->co_ref = lctx->co_ref; -#if (NGX_LUA_USE_ASSERT) - ctx->cur_co_ctx->co_top = 1; -#endif + if (coctx == entry_lctx->co_ctx) { + /* the entry thread is resuming */ + lctx = entry_lctx; -#ifndef OPENRESTY_LUAJIT - ngx_http_lua_get_globals_table(lctx->co); - lua_setfenv(lctx->co, -2); -#endif + } else { + /* one of our user threads is resuming */ - ngx_http_lua_set_req(lctx->co, r); - ngx_http_lua_attach_co_ctx_to_L(lctx->co, ctx->cur_co_ctx); + for (q = ngx_queue_head(&entry_lctx->sub_ctxs); + q != ngx_queue_sentinel(&entry_lctx->sub_ctxs); + q = ngx_queue_next(q)) + { + lctx = ngx_queue_data(q, ngx_wasm_lua_ctx_t, q); - lctx->co_ctx = ctx->cur_co_ctx; - lctx->env.ctx.rctx->wasm_lua_ctx = lctx; + if (lctx->co_ctx == coctx) { + break; + } - rc = ngx_http_lua_run_thread(lctx->L, r, ctx, lctx->nargs); - break; + lctx = NULL; + } + + if (lctx == NULL) { + ngx_wasm_log_error(NGX_LOG_CRIT, env->connection->log, 0, + "wasm lua bridge could not find resuming " + "coroutine context"); + return NGX_ERROR; + } + + ngx_wa_assert(lctx->yielded); + ngx_wa_assert(coctx != &ctx->entry_co_ctx); } -#endif -#if (0 && NGX_WASM_STREAM) - case NGX_WASM_SUBSYS_STREAM: - { - /* TODO: get stream lua r */ - ngx_stream_lua_request_t *r = NULL; - ngx_stream_lua_ctx_t *ctx = lctx->rctx.slctx; - - ctx->context = NGX_STREAM_LUA_CONTEXT_TIMER; - ctx->cur_co_ctx = &ctx->entry_co_ctx; - ctx->cur_co_ctx->co = lctx->co; - ctx->cur_co_ctx->co_ref = lctx->co_ref; -#if (NGX_LUA_USE_ASSERT) - ctx->cur_co_ctx->co_top = 1; -#endif -#ifndef OPENRESTY_LUAJIT - ngx_stream_lua_get_globals_table(lctx->co); - lua_setfenv(lctx->co, -2); -#endif + dd("resuming%slctx: %p", lctx->entry ? " entry " : " user ", lctx); - ngx_stream_lua_set_req(co, s); + rc = thread_resume(lctx); + if (rc == NGX_ERROR) { + return NGX_ERROR; + } - lctx->co_ctx = ctx->cur_co_ctx; + dd("rc: %ld, state: %d", rc, env->state); - rc = ngx_stream_lua_run_thread(lctx->L, s, ctx, lctx->nargs); + switch (env->state) { + case NGX_WASM_STATE_YIELD: + rc = NGX_AGAIN; + break; + case NGX_WASM_STATE_ERROR: + case NGX_WASM_STATE_CONTINUE: + rc = NGX_OK; break; - } -#endif default: - ngx_wa_assert(0); - goto error; + rc = NGX_ERROR; + break; } - dd("lua thread run rc: %ld", rc); - - return ngx_wasm_lua_thread_handle_rc(lctx, rc); + dd("exit (rc: %ld)", rc); -error: - - ngx_wasm_lua_thread_destroy(lctx); - - return NGX_ERROR; + return rc; } -/** - * Return values: - * NGX_DONE: lua thread terminated - * NGX_AGAIN: lua thread yielded - * NGX_ERROR: error - */ -ngx_int_t -ngx_wasm_lua_thread_resume(ngx_wasm_lua_ctx_t *lctx) -{ - ngx_int_t rc; - ngx_wasm_subsys_env_t *env = &lctx->env; - - if (ngx_wasm_lua_thread_is_dead(lctx)) { - return NGX_DONE; - } - - ngx_log_debug3(NGX_LOG_DEBUG_WASM, lctx->log, 0, - "wasm resuming lua thread " - "(lctx: %p, L: %p, co: %p)", - lctx, lctx->L, lctx->co); - - /* lua ctx */ - - switch (env->subsys->kind) { #if (NGX_WASM_HTTP) - case NGX_WASM_SUBSYS_HTTP: - { - ngx_http_request_t *r = env->ctx.rctx->r; - ngx_http_lua_ctx_t *ctx = lctx->ctx.rlctx; - - ngx_wa_assert(ctx == ngx_http_get_module_ctx(r, ngx_http_lua_module)); +static ngx_int_t +ngx_http_wasm_lua_resume_handler(ngx_http_request_t *r) +{ + ngx_int_t rc; + ngx_http_wasm_req_ctx_t *rctx; - ngx_wasm_set_resume_handler(env); - rc = ctx->resume_handler(r); - break; - } -#endif - default: - ngx_wa_assert(0); + rctx = ngx_http_get_module_ctx(r, ngx_http_wasm_module); + if (rctx == NULL) { return NGX_ERROR; } - dd("lua resume handler rc: %ld", rc); + rc = ngx_wasm_lua_resume(&rctx->env); + if (rc != NGX_AGAIN) { + rctx->resume_handler = NULL; + } - return ngx_wasm_lua_thread_handle_rc(lctx, rc); + return rc; } +#endif diff --git a/src/common/lua/ngx_wasm_lua.h b/src/common/lua/ngx_wasm_lua.h index bd3f15afd..71236b9de 100644 --- a/src/common/lua/ngx_wasm_lua.h +++ b/src/common/lua/ngx_wasm_lua.h @@ -5,6 +5,7 @@ #include #if (NGX_WASM_HTTP) #include +#include #endif #if (NGX_WASM_STREAM) #include @@ -16,11 +17,15 @@ typedef ngx_int_t (*ngx_wasm_lua_handler_pt)(ngx_wasm_lua_ctx_t *lctx); struct ngx_wasm_lua_ctx_s { + ngx_queue_t q; /* entry sub_ctxs */ + ngx_queue_t sub_ctxs; /* entry lctx only */ + ngx_pool_t *pool; ngx_log_t *log; - ngx_wasm_subsys_env_t env; + ngx_wasm_subsys_env_t *env; ngx_wasm_lua_handler_pt error_handler; ngx_wasm_lua_handler_pt success_handler; + ngx_event_t *ev; /* entry lctx sleep event */ void *data; const char *code; @@ -43,7 +48,9 @@ struct ngx_wasm_lua_ctx_s { #endif } ctx; - unsigned yielded:1; + unsigned entry:1; /* is entry lctx */ + unsigned yielded:1; /* has yielded at least once */ + unsigned finished:1; /* has finished */ }; @@ -52,7 +59,7 @@ ngx_wasm_lua_ctx_t *ngx_wasm_lua_thread_new(const char *tag, const char *src, ngx_wasm_lua_handler_pt success_handler, ngx_wasm_lua_handler_pt error_handler); ngx_int_t ngx_wasm_lua_thread_run(ngx_wasm_lua_ctx_t *lctx); -ngx_int_t ngx_wasm_lua_thread_resume(ngx_wasm_lua_ctx_t *lctx); +ngx_int_t ngx_wasm_lua_resume(ngx_wasm_subsys_env_t *env); #if 0 diff --git a/src/common/lua/ngx_wasm_lua_ffi.c b/src/common/lua/ngx_wasm_lua_ffi.c index d877b1b16..1c24ae600 100644 --- a/src/common/lua/ngx_wasm_lua_ffi.c +++ b/src/common/lua/ngx_wasm_lua_ffi.c @@ -4,6 +4,7 @@ #include "ddebug.h" #include +#include ngx_wavm_t * @@ -89,6 +90,8 @@ ngx_http_wasm_ffi_plan_new(ngx_wavm_t *vm, void ngx_http_wasm_ffi_plan_free(ngx_wasm_ops_plan_t *plan) { + dd("enter (plan: %p)", plan); + if (plan->conf.proxy_wasm.pwroot) { ngx_proxy_wasm_root_destroy(plan->conf.proxy_wasm.pwroot); @@ -118,6 +121,7 @@ ngx_http_wasm_ffi_plan_attach(ngx_http_request_t *r, ngx_wasm_ops_plan_t *plan, ngx_uint_t isolation) { ngx_int_t rc; + ngx_http_lua_ctx_t *ctx; ngx_http_wasm_req_ctx_t *rctx; ngx_http_wasm_loc_conf_t *loc; ngx_wasm_ops_plan_t *old_plan; @@ -126,6 +130,7 @@ ngx_http_wasm_ffi_plan_attach(ngx_http_request_t *r, ngx_wasm_ops_plan_t *plan, return NGX_DECLINED; } + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); loc = ngx_http_get_module_loc_conf(r, ngx_http_wasm_module); old_plan = loc->plan; @@ -145,56 +150,8 @@ ngx_http_wasm_ffi_plan_attach(ngx_http_request_t *r, ngx_wasm_ops_plan_t *plan, rctx->ffi_attached = 1; rctx->opctx.ctx.proxy_wasm.isolation = isolation; - - return NGX_OK; -} - - -ngx_int_t -ngx_http_wasm_ffi_start(ngx_http_request_t *r) -{ - ngx_int_t rc, phase; - ngx_http_wasm_req_ctx_t *rctx; - ngx_http_wasm_loc_conf_t *loc; - - rc = ngx_http_wasm_rctx(r, &rctx); - if (rc != NGX_OK) { - return rc; - } - -#if 1 - ngx_wa_assert(rctx->ffi_attached); -#else - /* - * presently, the above rctx rc is already NGX_DECLINED - * since loc->plan is empty - */ - if (!rctx->ffi_attached) { - ngx_wa_assert(0); - return NGX_DECLINED; - } -#endif - - loc = ngx_http_get_module_loc_conf(r, ngx_http_wasm_module); - - phase = (loc->pwm_req_headers_in_access == 1) - ? NGX_HTTP_ACCESS_PHASE - : NGX_HTTP_REWRITE_PHASE; - - rc = ngx_wasm_ops_resume(&rctx->opctx, phase); - if (rc == NGX_AGAIN) { - ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, - "wasm \"ffi_start\" yield"); - - ngx_wasm_yield(&rctx->env); - } - - /* ignore errors: resume could trap, but FFI call succeeded */ - - ngx_wa_assert(rc == NGX_OK - || rc == NGX_DONE - || rc == NGX_AGAIN - || rc >= NGX_HTTP_SPECIAL_RESPONSE); + rctx->opctx.ctx.proxy_wasm.req_headers_in_access = + ctx->context == NGX_HTTP_LUA_CONTEXT_ACCESS; return NGX_OK; } diff --git a/src/common/lua/ngx_wasm_lua_ffi.h b/src/common/lua/ngx_wasm_lua_ffi.h index 6e06e2009..0ed226f86 100644 --- a/src/common/lua/ngx_wasm_lua_ffi.h +++ b/src/common/lua/ngx_wasm_lua_ffi.h @@ -31,7 +31,6 @@ void ngx_http_wasm_ffi_plan_free(ngx_wasm_ops_plan_t *plan); ngx_int_t ngx_http_wasm_ffi_plan_load(ngx_wasm_ops_plan_t *plan); ngx_int_t ngx_http_wasm_ffi_plan_attach(ngx_http_request_t *r, ngx_wasm_ops_plan_t *plan, ngx_uint_t isolation); -ngx_int_t ngx_http_wasm_ffi_start(ngx_http_request_t *r); ngx_int_t ngx_http_wasm_ffi_set_property(ngx_http_request_t *r, ngx_str_t *key, ngx_str_t *value, u_char *err, size_t *errlen); ngx_int_t ngx_http_wasm_ffi_get_property(ngx_http_request_t *r, diff --git a/src/common/lua/ngx_wasm_lua_resolver.c b/src/common/lua/ngx_wasm_lua_resolver.c index 89adc14ac..a71600853 100644 --- a/src/common/lua/ngx_wasm_lua_resolver.c +++ b/src/common/lua/ngx_wasm_lua_resolver.c @@ -57,7 +57,7 @@ static const char *DNS_SOLVING_SCRIPT = "" " 'nameserver ' .. nameserver, \n" " 'options timeout:' .. timeout, \n" " }, \n" - " noSynchronisation = true, \n" + " --noSynchronisation = true, \n" " }) \n" " if not ok then \n" " error(fmt('wasm lua failed initializing dns_client ' .. \n" @@ -119,7 +119,7 @@ ngx_wasm_lua_resolver_success_handler(ngx_wasm_lua_ctx_t *lctx) { ngx_resolver_ctx_t *rslv_ctx = lctx->data; - dd("enter"); + dd("enter (rslv_ctx:%p)", rslv_ctx); /* resolution should have succeeded */ ngx_wa_assert(rslv_ctx->naddrs); @@ -142,9 +142,11 @@ ngx_wasm_lua_resolver_resolve(ngx_resolver_ctx_t *rslv_ctx) /* lctx */ + dd("resolving with rslv_ctx: %p", rslv_ctx); + lctx = ngx_wasm_lua_thread_new(DNS_SOLVING_SCRIPT_NAME, DNS_SOLVING_SCRIPT, - &sock->env, + sock->env, sock->log, rslv_ctx, ngx_wasm_lua_resolver_success_handler, @@ -174,26 +176,20 @@ ngx_wasm_lua_resolver_resolve(ngx_resolver_ctx_t *rslv_ctx) /* run */ rc = ngx_wasm_lua_thread_run(lctx); - - dd("lua thread run rc: %ld", rc); - - switch (rc) { - case NGX_ERROR: + if (rc == NGX_ERROR) { goto error; - case NGX_DONE: - /* lua thread finished, mock ngx_resolve_name */ - rc = NGX_OK; - break; - default: - break; } + dd("exit (rc: %ld)", rc); + ngx_wa_assert(rc == NGX_OK || rc == NGX_AGAIN); return rc; error: + dd("error exit"); + ngx_free(rslv_ctx); return NGX_ERROR; @@ -282,6 +278,6 @@ ngx_wasm_lua_resolver_handler(ngx_wasm_lua_ctx_t *lctx, u_char *ip, dd("exit"); - /* end of the lua thread - * the handler only updated the socket error */ + /* end of the lua thread, the handler only updated the socket + * error */ } diff --git a/src/common/ngx_wasm_socket_tcp.c b/src/common/ngx_wasm_socket_tcp.c index 482e34ccf..3f69d9103 100644 --- a/src/common/ngx_wasm_socket_tcp.c +++ b/src/common/ngx_wasm_socket_tcp.c @@ -83,10 +83,10 @@ ngx_wasm_socket_tcp_resume(ngx_wasm_socket_tcp_t *sock) ngx_log_debug0(NGX_LOG_DEBUG_WASM, sock->log, 0, "wasm tcp socket resuming"); - switch (sock->env.subsys->kind) { + switch (sock->env->subsys->kind) { #if (NGX_WASM_HTTP) case NGX_WASM_SUBSYS_HTTP: - rctx = sock->env.ctx.rctx; + rctx = sock->env->ctx.rctx; rc = sock->resume_handler(sock); /* handle sock event */ dd("sock->resume rc: %ld", rc); @@ -108,10 +108,7 @@ ngx_wasm_socket_tcp_resume(ngx_wasm_socket_tcp_t *sock) break; #endif default: - ngx_wasm_log_error(NGX_LOG_WASM_NYI, sock->log, 0, - "NYI - subsystem kind: %d", - sock->env.subsys->kind); - ngx_wa_assert(0); + ngx_wasm_bad_subsystem(sock->env); break; } } @@ -126,9 +123,9 @@ ngx_wasm_socket_tcp_init(ngx_wasm_socket_tcp_t *sock, ngx_memzero(sock, sizeof(ngx_wasm_socket_tcp_t)); - ngx_memcpy(&sock->env, env, sizeof(ngx_wasm_subsys_env_t)); + sock->env = env; - switch (sock->env.subsys->kind) { + switch (sock->env->subsys->kind) { #if (NGX_WASM_HTTP) case NGX_WASM_SUBSYS_HTTP: sock->free_bufs = env->ctx.rctx->free_bufs; @@ -142,10 +139,7 @@ ngx_wasm_socket_tcp_init(ngx_wasm_socket_tcp_t *sock, break; #endif default: - ngx_wasm_log_error(NGX_LOG_WASM_NYI, sock->log, 0, - "NYI - subsystem kind: %d", - sock->env.subsys->kind); - ngx_wa_assert(0); + ngx_wasm_bad_subsystem(sock->env); return NGX_ERROR; } @@ -262,10 +256,10 @@ ngx_wasm_socket_tcp_connect(ngx_wasm_socket_tcp_t *sock) } #if (NGX_WASM_HTTP) - rctx = sock->env.ctx.rctx; + rctx = sock->env->ctx.rctx; r = rctx->r; - if (sock->env.subsys->kind == NGX_WASM_SUBSYS_HTTP) { + if (sock->env->subsys->kind == NGX_WASM_SUBSYS_HTTP) { if (rctx->fake_request) { wcf = ngx_wasm_core_cycle_get_conf(ngx_cycle); @@ -312,7 +306,7 @@ ngx_wasm_socket_tcp_connect(ngx_wasm_socket_tcp_t *sock) } #endif - ngx_wasm_set_resume_handler(&sock->env); + ngx_wasm_set_resume_handler(sock->env); if (sock->url.addrs && sock->url.addrs[0].sockaddr) { sock->resolved.sockaddr = sock->url.addrs[0].sockaddr; @@ -350,7 +344,7 @@ ngx_wasm_socket_tcp_connect(ngx_wasm_socket_tcp_t *sock) rslv_tmp.name = sock->url.host; - switch (sock->env.subsys->kind) { + switch (sock->env->subsys->kind) { #if (NGX_WASM_HTTP) case NGX_WASM_SUBSYS_HTTP: if (!rctx->fake_request) { @@ -376,7 +370,7 @@ ngx_wasm_socket_tcp_connect(ngx_wasm_socket_tcp_t *sock) #endif #if (NGX_WASM_STREAM) case NGX_WASM_SUBSYS_STREAM: - s = sock->env.ctx.sctx->s; + s = sock->env->ctx.sctx->s; ssrvcf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); rslv_tmp.timeout = ssrvcf->resolver_timeout; @@ -384,7 +378,7 @@ ngx_wasm_socket_tcp_connect(ngx_wasm_socket_tcp_t *sock) break; #endif default: - ngx_wa_assert(0); + ngx_wasm_bad_subsystem(sock->env); return NGX_ERROR; } @@ -395,6 +389,8 @@ ngx_wasm_socket_tcp_connect(ngx_wasm_socket_tcp_t *sock) ngx_wa_assert(rslv_ctx != NGX_NO_RESOLVER); + dd("new rslv_ctx:%p", rslv_ctx); + rslv_ctx->name = rslv_tmp.name; rslv_ctx->timeout = rslv_tmp.timeout; rslv_ctx->handler = ngx_wasm_socket_resolve_handler; @@ -513,6 +509,10 @@ ngx_wasm_socket_resolve_handler(ngx_resolver_ctx_t *ctx) /* connect */ + /* Note: the Lua bridge may have finished all threads and + * resumed continuation, but we still need to hold the yield */ + ngx_wasm_yield(sock->env); + ngx_wasm_socket_tcp_connect_peer(sock); return; @@ -569,7 +569,7 @@ ngx_wasm_socket_tcp_connect_peer(ngx_wasm_socket_tcp_t *sock) if (c->pool == NULL) { c->pool = sock->pool; #if 0 - /* disabled: inherited from sock->env.pool for reusing the socket */ + /* disabled: inherited from sock->env->pool for reusing the socket */ c->pool = ngx_create_pool(128, sock->log); if (c->pool == NULL) { return NGX_ERROR; @@ -584,13 +584,13 @@ ngx_wasm_socket_tcp_connect_peer(ngx_wasm_socket_tcp_t *sock) c->read->log = c->log; c->write->log = c->log; c->data = sock; - c->sendfile &= sock->env.connection->sendfile; + c->sendfile &= sock->env->connection->sendfile; if (rc == NGX_OK) { ngx_wasm_socket_tcp_connect_handler(sock); } else if (rc == NGX_AGAIN) { - ngx_wasm_set_resume_handler(&sock->env); + ngx_wasm_set_resume_handler(sock->env); ngx_add_timer(c->write, sock->connect_timeout); } @@ -878,7 +878,7 @@ ngx_wasm_socket_tcp_send(ngx_wasm_socket_tcp_t *sock, ngx_chain_t *cl) ngx_chain_update_chains(sock->pool, &sock->free_bufs, &sock->busy_bufs, - &cl, sock->env.buf_tag); + &cl, sock->env->buf_tag); sock->write_event_handler = ngx_wasm_socket_tcp_nop_handler; @@ -916,7 +916,7 @@ ngx_wasm_socket_tcp_send(ngx_wasm_socket_tcp_t *sock, ngx_chain_t *cl) return NGX_ERROR; } - ngx_wasm_set_resume_handler(&sock->env); + ngx_wasm_set_resume_handler(sock->env); return NGX_AGAIN; } @@ -1032,7 +1032,7 @@ ngx_wasm_socket_tcp_read(ngx_wasm_socket_tcp_t *sock, if (sock->bufs_in == NULL) { cl = ngx_wasm_chain_get_free_buf(sock->pool, &sock->free_bufs, - sock->buffer_size, sock->env.buf_tag, + sock->buffer_size, sock->env->buf_tag, sock->buffer_reuse); if (cl == NULL) { return NGX_ERROR; @@ -1106,7 +1106,7 @@ ngx_wasm_socket_tcp_read(ngx_wasm_socket_tcp_t *sock, cl = ngx_wasm_chain_get_free_buf(sock->pool, &sock->free_bufs, sock->buffer_size, - sock->env.buf_tag, + sock->env->buf_tag, sock->buffer_reuse); if (cl == NULL) { return NGX_ERROR; @@ -1162,7 +1162,7 @@ ngx_wasm_socket_tcp_read(ngx_wasm_socket_tcp_t *sock, } } else if (rc == NGX_AGAIN) { - ngx_wasm_set_resume_handler(&sock->env); + ngx_wasm_set_resume_handler(sock->env); if (rev->active) { ngx_add_timer(rev, sock->read_timeout); @@ -1230,7 +1230,7 @@ ngx_wasm_socket_tcp_destroy(ngx_wasm_socket_tcp_t *sock) if (c && c->pool) { #if 0 - /* disabled: c->pool is inherited from sock->env.pool */ + /* disabled: c->pool is inherited from sock->env->pool */ ngx_destroy_pool(c->pool); #endif c->pool = NULL; diff --git a/src/common/ngx_wasm_socket_tcp.h b/src/common/ngx_wasm_socket_tcp.h index fd4894b6d..144724d62 100644 --- a/src/common/ngx_wasm_socket_tcp.h +++ b/src/common/ngx_wasm_socket_tcp.h @@ -36,7 +36,7 @@ typedef struct { struct ngx_wasm_socket_tcp_s { ngx_pool_t *pool; ngx_log_t *log; - ngx_wasm_subsys_env_t env; + ngx_wasm_subsys_env_t *env; ngx_wasm_socket_tcp_resume_handler_pt resume_handler; void *data; diff --git a/src/common/ngx_wasm_socket_tcp_readers.c b/src/common/ngx_wasm_socket_tcp_readers.c index c4e1f0386..9f94749f0 100644 --- a/src/common/ngx_wasm_socket_tcp_readers.c +++ b/src/common/ngx_wasm_socket_tcp_readers.c @@ -542,7 +542,7 @@ ngx_wasm_read_http_response(ngx_buf_t *src, ngx_chain_t *buf_in, ssize_t bytes, cl = ngx_wasm_chain_get_free_buf(in_ctx->pool, &rctx->free_bufs, in_ctx->rest, - in_ctx->sock->env.buf_tag, + in_ctx->sock->env->buf_tag, in_ctx->sock->buffer_reuse); if (cl == NULL) { return NGX_ERROR; @@ -600,7 +600,7 @@ ngx_wasm_read_http_response(ngx_buf_t *src, ngx_chain_t *buf_in, ssize_t bytes, in_ctx->body = ngx_wasm_chain_get_free_buf(in_ctx->pool, &rctx->free_bufs, in_ctx->body_len, - in_ctx->sock->env.buf_tag, + in_ctx->sock->env->buf_tag, in_ctx->sock->buffer_reuse); if (in_ctx->body == NULL) { return NGX_ERROR; diff --git a/src/common/ngx_wasm_subsystem.h b/src/common/ngx_wasm_subsystem.h index 0ed431004..7f016ee10 100644 --- a/src/common/ngx_wasm_subsystem.h +++ b/src/common/ngx_wasm_subsystem.h @@ -12,6 +12,7 @@ #define ngx_wasm_continue(env) \ + ngx_wasm_set_resume_handler(env); \ (env)->state = NGX_WASM_STATE_CONTINUE #define ngx_wasm_error(env) \ @@ -24,6 +25,12 @@ #define ngx_wasm_yielding(env) \ (env)->state == NGX_WASM_STATE_YIELD +#define ngx_wasm_bad_subsystem(env) \ + ngx_wasm_log_error(NGX_LOG_WASM_NYI, env->connection->log, 0, \ + "unexpected subsystem kind: %d", \ + env->subsys->kind); \ + ngx_wa_assert(0) + ngx_wasm_phase_t *ngx_wasm_phase_lookup(ngx_wasm_subsystem_t *subsys, ngx_uint_t phaseidx); diff --git a/src/common/proxy_wasm/ngx_proxy_wasm.c b/src/common/proxy_wasm/ngx_proxy_wasm.c index c4def6a95..2e4861019 100644 --- a/src/common/proxy_wasm/ngx_proxy_wasm.c +++ b/src/common/proxy_wasm/ngx_proxy_wasm.c @@ -637,7 +637,7 @@ ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx, ngx_int_t rc = NGX_OK; ngx_proxy_wasm_exec_t *pwexec, *pwexecs; - dd("enter"); + dd("enter (action: %d)", pwctx->action); pwctx->step = step; @@ -668,6 +668,8 @@ ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx, ngx_wa_assert(pwctx->pwexecs.nelts == pwctx->nfilters); + dd("pwctx->exec_index: %ld", pwctx->exec_index); + for (i = pwctx->exec_index; i < pwctx->nfilters; i++) { pwexec = &pwexecs[i]; @@ -733,12 +735,19 @@ ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx, if (rc != NGX_OK && rc != NGX_AGAIN) { goto ret; } - } - /* next step */ + dd("end of loop pwctx->exec_index = %ld", pwctx->exec_index); + + /* next step */ - pwctx->last_completed_step = pwctx->step; - pwctx->exec_index = 0; + ngx_wa_assert(pwctx->exec_index <= pwctx->nfilters); + + if (pwctx->exec_index >= pwctx->nfilters) { + dd("next step"); + pwctx->last_completed_step = pwctx->step; + pwctx->exec_index = 0; + } + } ret: @@ -888,6 +897,8 @@ ngx_proxy_wasm_dispatch_calls_total(ngx_proxy_wasm_exec_t *pwexec) q != ngx_queue_sentinel(&pwexec->calls); q = ngx_queue_next(q), n++) { /* void */ } + dd("n: %ld", n); + return n; } diff --git a/src/http/ngx_http_wasm.h b/src/http/ngx_http_wasm.h index f5ca25c2d..e18b58af5 100644 --- a/src/http/ngx_http_wasm.h +++ b/src/http/ngx_http_wasm.h @@ -25,15 +25,12 @@ struct ngx_http_wasm_req_ctx_s { ngx_http_request_t *r; ngx_connection_t *connection; ngx_pool_t *pool; /* r->pool */ + ngx_http_handler_pt resume_handler; ngx_wasm_subsys_env_t env; ngx_wasm_op_ctx_t opctx; ngx_wasm_ops_t *ffi_engine; void *data; /* per-stream extra context */ -#if (NGX_WASM_LUA) - ngx_wasm_lua_ctx_t *wasm_lua_ctx; -#endif - ngx_chain_t *free_bufs; ngx_chain_t *busy_bufs; diff --git a/src/http/ngx_http_wasm_filter_module.c b/src/http/ngx_http_wasm_filter_module.c index 26e9c0e19..e62168e06 100644 --- a/src/http/ngx_http_wasm_filter_module.c +++ b/src/http/ngx_http_wasm_filter_module.c @@ -273,6 +273,15 @@ ngx_http_wasm_body_filter_resume(ngx_http_wasm_req_ctx_t *rctx, ngx_chain_t *in) } if (!rctx->resp_buffering) { + if (rctx->data) { + /* resp_buffering triggered a yield in ngx_proxy_wasm.c + * action2rc, so we force a continue here as we know it + * has finished or was never enabled */ + ngx_proxy_wasm_ctx_set_next_action((ngx_proxy_wasm_ctx_t *) + rctx->data, + NGX_PROXY_WASM_ACTION_CONTINUE); + } + (void) ngx_wasm_ops_resume(&rctx->opctx, NGX_HTTP_WASM_BODY_FILTER_PHASE); } diff --git a/src/http/ngx_http_wasm_module.c b/src/http/ngx_http_wasm_module.c index 7867c3d6a..86d3e5e1a 100644 --- a/src/http/ngx_http_wasm_module.c +++ b/src/http/ngx_http_wasm_module.c @@ -581,9 +581,10 @@ ngx_http_wasm_rctx(ngx_http_request_t *r, ngx_http_wasm_req_ctx_t **out) rctx->env.ssl_conf = ngx_wasm_core_ssl_conf((ngx_cycle_t *) ngx_cycle); #endif - ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, - "wasm rctx created: %p (r: %p, main: %d, fake: %d)", - rctx, r, r->main == r, rctx->fake_request); + ngx_log_debug5(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "wasm rctx created: %p (r: %p, main: %d, fake: %d, " + "env: %p)", rctx, r, r->main == r, rctx->fake_request, + &rctx->env); ngx_http_set_ctx(r, rctx, ngx_http_wasm_module); @@ -601,6 +602,7 @@ ngx_http_wasm_rctx(ngx_http_request_t *r, ngx_http_wasm_req_ctx_t **out) opctx = &rctx->opctx; opctx->ops = mcf->ops; opctx->pool = r->pool; + opctx->env = &rctx->env; opctx->log = r->connection->log; opctx->data = rctx; @@ -752,15 +754,20 @@ ngx_http_wasm_rewrite_handler(ngx_http_request_t *r) goto done; } -#if (NGX_WASM_LUA) - if (rctx->wasm_lua_ctx && rctx->wasm_lua_ctx->yielded) { - /* previous Lua yield in previous rewrite, resume */ - dd("wasm lua forcing content wev handler"); - ngx_http_wasm_wev_handler(r); - rc = NGX_DONE; - goto done; + if (rctx->resume_handler) { + rc = rctx->resume_handler(r); + dd("resume_handler rc: %ld", rc); + switch (rc) { + case NGX_AGAIN: + rc = NGX_DONE; + goto done; + case NGX_OK: + break; + default: + ngx_wa_assert(rc == NGX_ERROR); + goto done; + } } -#endif rc = ngx_wasm_ops_resume(&rctx->opctx, NGX_HTTP_REWRITE_PHASE); rc = ngx_http_wasm_check_finalize(rctx, rc); @@ -878,6 +885,21 @@ ngx_http_wasm_access_handler(ngx_http_request_t *r) goto done; } + if (rctx->resume_handler) { + rc = rctx->resume_handler(r); + dd("resume_handler rc: %ld", rc); + switch (rc) { + case NGX_AGAIN: + goto done; + case NGX_OK: + break; + default: + ngx_wa_assert(rc == NGX_ERROR); + rc = NGX_HTTP_INTERNAL_SERVER_ERROR; + goto done; + } + } + rc = ngx_wasm_ops_resume(&rctx->opctx, NGX_HTTP_ACCESS_PHASE); rc = ngx_http_wasm_check_finalize(rctx, rc); @@ -1044,17 +1066,29 @@ ngx_http_wasm_content_handler(ngx_http_request_t *r) goto done; } -#if (NGX_WASM_LUA) - if (rctx->entered_content_phase && rctx->wasm_lua_ctx) { - dd("wasm lua forcing content wev handler"); - ngx_http_wasm_wev_handler(r); - rc = ngx_http_wasm_check_finalize(rctx, NGX_AGAIN); - goto done; - } -#endif - rctx->entered_content_phase = 1; + if (rctx->resume_handler) { + rc = rctx->resume_handler(r); + dd("resume_handler rc: %ld", rc); + switch (rc) { + case NGX_AGAIN: + if (r == r->main) { + r->main->count++; + dd("r->main->count++: %d", r->main->count); + rc = NGX_DONE; + } + + goto done; + case NGX_OK: + break; + default: + ngx_wa_assert(rc == NGX_ERROR); + rc = NGX_HTTP_INTERNAL_SERVER_ERROR; + goto done; + } + } + rc = ngx_http_wasm_content(rctx); done: @@ -1108,6 +1142,9 @@ ngx_http_wasm_wev_handler(ngx_http_request_t *r) if (rctx->env.state != NGX_WASM_STATE_ERROR) { rctx->in_wev = 1; + + } else if (rctx->in_wev) { + return; } ngx_log_debug8(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, @@ -1116,26 +1153,20 @@ ngx_http_wasm_wev_handler(ngx_http_request_t *r) &r->uri, &r->args, wev->timedout, wev->ready, r == r->main, r->main->count, rctx->resp_finalized, rctx->env.state); -#if (NGX_WASM_LUA) - if (rctx->wasm_lua_ctx) { - rc = ngx_wasm_lua_thread_resume(rctx->wasm_lua_ctx); - - dd("lua thread resume rc: %ld", rc); - + if (rctx->resume_handler) { + rc = rctx->resume_handler(r); + dd("resume_handler rc: %ld", rc); switch (rc) { - case NGX_ERROR: - rc = NGX_HTTP_INTERNAL_SERVER_ERROR; - goto last_finalize; - case NGX_DONE: - rctx->wasm_lua_ctx = NULL; + case NGX_AGAIN: + return; + case NGX_OK: break; default: - break; + ngx_wa_assert(rc == NGX_ERROR); + rc = NGX_HTTP_INTERNAL_SERVER_ERROR; + goto last_finalize; } - - return; } -#endif dd("entered_content_phase: %d, resp_content_chosen: %d, fake: %d", rctx->entered_content_phase, rctx->resp_content_chosen, @@ -1209,7 +1240,7 @@ ngx_http_wasm_set_resume_handler(ngx_http_wasm_req_ctx_t *rctx) r->write_event_handler = ngx_http_wasm_wev_handler; - } else { + } else if (r->write_event_handler != ngx_http_core_run_phases) { dd("resume_handler = ngx_http_core_run_phases (r: %p, rctx: %p)", r, rctx); diff --git a/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c b/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c index 087cc3ef5..34c3172f0 100644 --- a/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c +++ b/src/http/proxy_wasm/ngx_http_proxy_wasm_dispatch.c @@ -228,6 +228,9 @@ ngx_http_proxy_wasm_dispatch(ngx_proxy_wasm_exec_t *pwexec, ngx_memcpy(call->host.data, host->data, host->len); call->host.data[call->host.len] = '\0'; + ngx_log_debug1(NGX_LOG_DEBUG_ALL, r->connection->log, 0, + "wasm new dispatch call to \"%V\"", &call->host); + /* headers/trailers */ if (ngx_proxy_wasm_pairs_unmarshal(pwexec, &call->headers, headers) @@ -904,12 +907,17 @@ ngx_http_proxy_wasm_dispatch_resume_handler(ngx_wasm_socket_tcp_t *sock) error2: - if (ecode != NGX_PROXY_WASM_ERR_NONE) { - /* catch trap for tcp socket resume retval */ + if (ecode != NGX_PROXY_WASM_ERR_NONE + || rc == NGX_ABORT) + { + /* catch trap for tcp socket resume retval or an instance + * that trapped before the response was received */ rc = NGX_ERROR; } ngx_wasm_error(&rctx->env); + ngx_proxy_wasm_ctx_set_next_action(pwexec->parent, + NGX_PROXY_WASM_ACTION_CONTINUE); ngx_http_proxy_wasm_dispatch_err(call); ngx_wa_assert(rc == NGX_ERROR); diff --git a/src/wasm/ngx_wasm.h b/src/wasm/ngx_wasm.h index c570d10b0..523caf801 100644 --- a/src/wasm/ngx_wasm.h +++ b/src/wasm/ngx_wasm.h @@ -222,13 +222,16 @@ typedef struct { typedef struct ngx_wasm_lua_ctx_s ngx_wasm_lua_ctx_t; typedef struct { + ngx_wasm_state_e state; ngx_connection_t *connection; ngx_buf_tag_t *buf_tag; ngx_wasm_subsystem_t *subsys; #if (NGX_SSL) ngx_wasm_ssl_conf_t *ssl_conf; #endif - ngx_wasm_state_e state; +#if (NGX_WASM_LUA) + ngx_wasm_lua_ctx_t *entry_lctx; +#endif union { #if (NGX_WASM_HTTP) diff --git a/src/wasm/ngx_wasm_core_host.c b/src/wasm/ngx_wasm_core_host.c index 501fc2d73..6241e7bfe 100644 --- a/src/wasm/ngx_wasm_core_host.c +++ b/src/wasm/ngx_wasm_core_host.c @@ -31,7 +31,6 @@ ngx_int_t ngx_wasm_hfuncs_test_lua_argsrets(ngx_wavm_instance_t *instance, wasm_val_t args[], wasm_val_t rets[]) { - ngx_wasm_subsys_env_t env; ngx_wasm_lua_ctx_t *lctx; #if (NGX_WASM_HTTP) ngx_http_wasm_req_ctx_t *rctx = instance->data; @@ -41,18 +40,9 @@ ngx_wasm_hfuncs_test_lua_argsrets(ngx_wavm_instance_t *instance, "ngx.log(ngx.INFO, 'arg: ', arg)\n" "return 123, 456"; - ngx_memzero(&env, sizeof(ngx_wasm_subsys_env_t)); - -#if (NGX_WASM_HTTP) - env.connection = rctx->r->connection; - env.buf_tag = buf_tag; - env.subsys = &ngx_http_wasm_subsystem; - env.ctx.rctx = rctx; -#endif - lctx = ngx_wasm_lua_thread_new(SCRIPT_NAME, SCRIPT, - &env, + &rctx->env, instance->log, NULL, NULL, NULL); if (lctx == NULL) { @@ -72,33 +62,81 @@ ngx_int_t ngx_wasm_hfuncs_test_lua_bad_chunk(ngx_wavm_instance_t *instance, wasm_val_t args[], wasm_val_t rets[]) { - ngx_wasm_subsys_env_t env; - ngx_wasm_lua_ctx_t *lctx; #if (NGX_WASM_HTTP) + ngx_wasm_lua_ctx_t *lctx; ngx_http_wasm_req_ctx_t *rctx = instance->data; -#endif static const char *SCRIPT_NAME = "bad_lua_chunk"; static const char *SCRIPT = "local x = {"; - ngx_memzero(&env, sizeof(ngx_wasm_subsys_env_t)); + lctx = ngx_wasm_lua_thread_new(SCRIPT_NAME, + SCRIPT, + &rctx->env, + rctx->connection->log, + NULL, NULL, NULL); + if (lctx == NULL) { + return NGX_WAVM_ERROR; + } +#endif + + return NGX_WAVM_OK; +} + +ngx_int_t +ngx_wasm_hfuncs_test_lua_error(ngx_wavm_instance_t *instance, + wasm_val_t args[], wasm_val_t rets[]) +{ #if (NGX_WASM_HTTP) - env.connection = rctx->r->connection; - env.buf_tag = buf_tag; - env.subsys = &ngx_http_wasm_subsystem; - env.ctx.rctx = rctx; + ngx_wasm_lua_ctx_t *lctx; + ngx_http_wasm_req_ctx_t *rctx = instance->data; + static const char *SCRIPT_NAME = "error_lua_chunk"; + static const char *SCRIPT = "if _G.sleep_before_error then\n" + " print('sleeping before error')\n" + " ngx.sleep(0.2)\n" + "end\n" + "\n" + "error('my error')"; + + lctx = ngx_wasm_lua_thread_new(SCRIPT_NAME, + SCRIPT, + &rctx->env, + rctx->connection->log, + NULL, NULL, NULL); + if (lctx == NULL) { + return NGX_WAVM_ERROR; + } + + (void) ngx_wasm_lua_thread_run(lctx); #endif + return NGX_WAVM_OK; +} + + +ngx_int_t +ngx_wasm_hfuncs_test_lua_sleep(ngx_wavm_instance_t *instance, + wasm_val_t args[], wasm_val_t rets[]) +{ +#if (NGX_WASM_HTTP) + ngx_wasm_lua_ctx_t *lctx; + ngx_http_wasm_req_ctx_t *rctx = instance->data; + static const char *SCRIPT_NAME = "sleep_lua_chunk"; + static const char *SCRIPT = "for i = 1, 2 do\n" + " print('sleeping for 250ms')\n" + " ngx.sleep(0.25)\n" + "end"; + lctx = ngx_wasm_lua_thread_new(SCRIPT_NAME, SCRIPT, - &env, - instance->log, + &rctx->env, + rctx->connection->log, NULL, NULL, NULL); if (lctx == NULL) { return NGX_WAVM_ERROR; } (void) ngx_wasm_lua_thread_run(lctx); +#endif return NGX_WAVM_OK; } @@ -122,6 +160,16 @@ static ngx_wavm_host_func_def_t ngx_wasm_core_hfuncs[] = { &ngx_wasm_hfuncs_test_lua_bad_chunk, NULL, NULL }, + + { ngx_string("ngx_wasm_lua_test_error"), + &ngx_wasm_hfuncs_test_lua_error, + NULL, + NULL }, + + { ngx_string("ngx_wasm_lua_test_sleep"), + &ngx_wasm_hfuncs_test_lua_sleep, + NULL, + NULL }, #endif ngx_wavm_hfunc_null diff --git a/src/wasm/ngx_wasm_ops.c b/src/wasm/ngx_wasm_ops.c index f8b0219b1..7f73d7c49 100644 --- a/src/wasm/ngx_wasm_ops.c +++ b/src/wasm/ngx_wasm_ops.c @@ -160,6 +160,7 @@ ngx_wasm_ops_plan_load(ngx_wasm_ops_plan_t *plan, ngx_log_t *log) break; case NGX_WASM_OP_CALL: op->handler = &ngx_wasm_op_call_handler; + op->conf.call.idx = j; op->conf.call.funcref = ngx_wavm_module_func_lookup(op->module, &op->conf.call.func_name); @@ -254,9 +255,9 @@ ngx_wasm_ops_resume(ngx_wasm_op_ctx_t *ctx, ngx_uint_t phaseidx) dd("enter (phaseidx: %ld, phase: \"%.*s\")", phaseidx, (int) phase->name.len, phase->name.data); +#if 0 /* check last phase */ -#if 0 switch (phaseidx) { default: if (ctx->last_phase @@ -299,6 +300,10 @@ ngx_wasm_ops_resume(ngx_wasm_op_ctx_t *ctx, ngx_uint_t phaseidx) || rc == NGX_AGAIN || rc == NGX_DONE); + if (phase != ctx->last_phase) { + ctx->cur_idx = 0; + } + ctx->last_phase = phase; dd("ops resume: setting last phase to \"%.*s\" (%ld)", @@ -318,11 +323,17 @@ ngx_wasm_op_call_handler(ngx_wasm_op_ctx_t *opctx, ngx_wasm_phase_t *phase, ngx_wasm_op_t *op) { ngx_int_t rc; + ngx_uint_t idx; ngx_wavm_instance_t *instance; ngx_wavm_funcref_t *funcref; ngx_wa_assert(op->code == NGX_WASM_OP_CALL); + idx = op->conf.call.idx; + + dd("enter (op: %p, cur_idx: %ld, op idx: %ld)", + op, opctx->cur_idx, idx); + funcref = op->conf.call.funcref; if (funcref == NULL) { ngx_wasm_log_error(NGX_LOG_ERR, opctx->log, 0, @@ -332,6 +343,23 @@ ngx_wasm_op_call_handler(ngx_wasm_op_ctx_t *opctx, ngx_wasm_phase_t *phase, return NGX_ERROR; } + if (opctx->cur_idx > idx) { + return NGX_DECLINED; + } + +#if 0 + if (opctx->cur_idx == idx) { + switch (opctx->env->state) { + case NGX_WASM_STATE_YIELD: + return NGX_AGAIN; + case NGX_WASM_STATE_ERROR: + return NGX_ERROR; + default: + break; + } + } +#endif + ngx_log_debug3(NGX_LOG_DEBUG_WASM, opctx->log, 0, "wasm ops calling \"%V.%V\" in \"%V\" phase", &op->module->name, &funcref->name, &phase->name); @@ -346,12 +374,25 @@ ngx_wasm_op_call_handler(ngx_wasm_op_ctx_t *opctx, ngx_wasm_phase_t *phase, ngx_wavm_instance_destroy(instance); + opctx->cur_idx++; + if (rc == NGX_ERROR || rc == NGX_ABORT) { return NGX_ERROR; } ngx_wa_assert(rc == NGX_OK); + dd("ops state: %d", opctx->env->state); + + switch (opctx->env->state) { + case NGX_WASM_STATE_YIELD: + return NGX_AGAIN; + case NGX_WASM_STATE_ERROR: + return NGX_ERROR; + default: + break; + } + /* next call op */ return NGX_DECLINED; @@ -400,7 +441,14 @@ ngx_wasm_op_proxy_wasm_handler(ngx_wasm_op_ctx_t *opctx, } pwctx->phase = phase; +#if 0 pwctx->action = NGX_PROXY_WASM_ACTION_CONTINUE; +#endif + + if (opctx->ctx.proxy_wasm.req_headers_in_access) { + pwctx->req_headers_in_access = 1; + } + switch (phase->index) { diff --git a/src/wasm/ngx_wasm_ops.h b/src/wasm/ngx_wasm_ops.h index 7c370bd00..a1f2b79e6 100644 --- a/src/wasm/ngx_wasm_ops.h +++ b/src/wasm/ngx_wasm_ops.h @@ -33,10 +33,12 @@ typedef struct { typedef struct { ngx_uint_t isolation; + unsigned req_headers_in_access:1; } ngx_wasm_op_proxy_wasm_ctx_t; typedef struct { + ngx_uint_t idx; ngx_str_t func_name; ngx_wavm_funcref_t *funcref; } ngx_wasm_op_call_t; @@ -97,10 +99,12 @@ typedef struct { struct ngx_wasm_op_ctx_s { ngx_pool_t *pool; ngx_log_t *log; + ngx_wasm_subsys_env_t *env; ngx_wasm_ops_t *ops; ngx_wasm_ops_plan_t *plan; ngx_wasm_phase_t *last_phase; void *data; + ngx_uint_t cur_idx; union { ngx_wasm_op_call_ctx_t call; diff --git a/t/03-proxy_wasm/hfuncs/133-proxy_dispatch_http_edge_cases.t b/t/03-proxy_wasm/hfuncs/133-proxy_dispatch_http_edge_cases.t index 31d1bdbde..c8ac4634d 100644 --- a/t/03-proxy_wasm/hfuncs/133-proxy_dispatch_http_edge_cases.t +++ b/t/03-proxy_wasm/hfuncs/133-proxy_dispatch_http_edge_cases.t @@ -207,9 +207,9 @@ called 2 times echo fail; } --- response_headers_like -pwm-call-1: dispatch 1 -pwm-call-2: dispatch 2 -pwm-call-3: dispatch 3 +pwm-call-0: dispatch 1 +pwm-call-1: dispatch 2 +pwm-call-2: dispatch 3 --- response_body called 3 times --- no_error_log diff --git a/t/04-openresty/ffi/101-proxy_wasm_load.t b/t/04-openresty/ffi/101-proxy_wasm_load.t index f1b110027..9378e0fec 100644 --- a/t/04-openresty/ffi/101-proxy_wasm_load.t +++ b/t/04-openresty/ffi/101-proxy_wasm_load.t @@ -208,19 +208,9 @@ qr/#0 on_configure, config_size: 0.*/ ngx.log(ngx.ERR, err) return end + } - local ok, err = proxy_wasm.start() - if not ok then - ngx.log(ngx.ERR, err) - return - end - - local ok, err = proxy_wasm.start() - if not ok then - ngx.log(ngx.ERR, err) - return - end - + content_by_lua_block { ngx.say("ok") } } diff --git a/t/04-openresty/ffi/102-proxy_wasm_start.t b/t/04-openresty/ffi/102-proxy_wasm_start.t deleted file mode 100644 index 03546385b..000000000 --- a/t/04-openresty/ffi/102-proxy_wasm_start.t +++ /dev/null @@ -1,235 +0,0 @@ -# vim:set ft= ts=4 sw=4 et fdm=marker: - -use strict; -use lib '.'; -use t::TestWasmX::Lua; - -skip_no_openresty(); - -plan_tests(4); -run_tests(); - -__DATA__ - -=== TEST 1: start() - fail with no plan ---- wasm_modules: on_phases ---- config - location /t { - rewrite_by_lua_block { - local proxy_wasm = require "resty.wasmx.proxy_wasm" - - local ok, err = proxy_wasm.start() - if not ok then - return ngx.say(err) - end - - ngx.say("failed") - } - } ---- response_body -plan not loaded and attached ---- no_error_log -[error] -[crit] - - - -=== TEST 2: start() - fail with plan not loaded ---- load_nginx_modules: ngx_http_echo_module ---- wasm_modules: on_phases ---- config - location /t { - rewrite_by_lua_block { - local proxy_wasm = require "resty.wasmx.proxy_wasm" - local filters = { - { name = "on_phases" }, - } - - local c_plan, err = proxy_wasm.new(filters) - if not c_plan then - return ngx.say(err) - end - - local ok, err = proxy_wasm.start() - if not ok then - return ngx.say(err) - end - } - - echo failed; - } ---- response_body -plan not loaded and attached ---- no_error_log -[error] -[crit] - - - -=== TEST 3: start() - fail with plan loaded but not attached ---- load_nginx_modules: ngx_http_echo_module ---- wasm_modules: on_phases ---- config - location /t { - rewrite_by_lua_block { - local proxy_wasm = require "resty.wasmx.proxy_wasm" - local filters = { - { name = "on_phases" }, - } - - local c_plan, err = proxy_wasm.new(filters) - if not c_plan then - return ngx.say(err) - end - - local ok, err = proxy_wasm.load(c_plan) - if not ok then - return ngx.say(err) - end - - ok, err = proxy_wasm.start() - if not ok then - return ngx.say(err) - end - } - - echo failed; - } ---- response_body -plan not loaded and attached ---- no_error_log -[error] -[crit] - - - -=== TEST 4: start() with plan loaded and attached on rewrite ---- load_nginx_modules: ngx_http_echo_module ---- wasm_modules: on_phases ---- config - location /t { - rewrite_by_lua_block { - local proxy_wasm = require "resty.wasmx.proxy_wasm" - local filters = { - { name = "on_phases" }, - } - - local c_plan, err = proxy_wasm.new(filters) - if not c_plan then - return ngx.say(err) - end - - local ok, err = proxy_wasm.load(c_plan) - if not ok then - return ngx.say(err) - end - - ok, err = proxy_wasm.attach(c_plan) - if not ok then - return ngx.say(err) - end - - ok, err = proxy_wasm.start() - if not ok then - return ngx.say(err) - end - } - - echo ok; - } ---- request -POST /t -Hello world ---- response_body -ok ---- grep_error_log eval: qr/#\d+ on_(configure|vm_start|request|response|log).*/ ---- grep_error_log_out eval -qr/^#0 on_vm_start[^#]* -#0 on_configure, config_size: 0[^#]* -#\d+ on_request_headers, 3 headers[^#]* -#\d+ on_request_body, 11 bytes[^#]* -#\d+ on_response_headers, 5 headers[^#]* -#\d+ on_response_body, 3 bytes, eof: false[^#]* -#\d+ on_response_body, 0 bytes, eof: true[^#]* -#\d+ on_log[^#]*/ ---- no_error_log -[error] - - - -=== TEST 5: start() with plan loaded and attached on access ---- load_nginx_modules: ngx_http_echo_module ---- wasm_modules: on_phases ---- config - location /t { - access_by_lua_block { - local proxy_wasm = require "resty.wasmx.proxy_wasm" - local filters = { - { name = "on_phases" }, - } - - local c_plan, err = proxy_wasm.new(filters) - if not c_plan then - return ngx.say(err) - end - - local ok, err = proxy_wasm.load(c_plan) - if not ok then - return ngx.say(err) - end - - ok, err = proxy_wasm.attach(c_plan) - if not ok then - return ngx.say(err) - end - - ok, err = proxy_wasm.start() - if not ok then - return ngx.say(err) - end - } - - echo ok; - } ---- request -POST /t -Hello world ---- response_body -ok ---- grep_error_log eval: qr/#\d+ on_(configure|vm_start|request|response|log).*/ ---- grep_error_log_out eval -qr/^#0 on_vm_start[^#]* -#0 on_configure, config_size: 0[^#]* -#\d+ on_request_headers, 3 headers[^#]* -#\d+ on_request_body, 11 bytes[^#]* -#\d+ on_response_headers, 5 headers[^#]* -#\d+ on_response_body, 3 bytes, eof: false[^#]* -#\d+ on_response_body, 0 bytes, eof: true[^#]* -#\d+ on_log[^#]*/ ---- no_error_log -[error] - - - -=== TEST 6: start() - fail if attempt to start on init_worker ---- load_nginx_modules: ngx_http_echo_module ---- wasm_modules: on_phases ---- http_config - init_worker_by_lua_block { - local proxy_wasm = require "resty.wasmx.proxy_wasm" - - assert(proxy_wasm.start()) - } ---- config - location /t { - echo ok; - } ---- request -POST /t -Hello world ---- ignore_response_body ---- error_log eval -qr/start must be called from 'rewrite' or 'access' phase/ ---- no_error_log -[crit] -[emerg] diff --git a/t/04-openresty/ffi/103-proxy_wasm_attach.t b/t/04-openresty/ffi/103-proxy_wasm_attach.t index 56226126f..bbcb37d6b 100644 --- a/t/04-openresty/ffi/103-proxy_wasm_attach.t +++ b/t/04-openresty/ffi/103-proxy_wasm_attach.t @@ -88,8 +88,6 @@ plan not loaded if not ok then ngx.log(ngx.ERR, err) end - - assert(proxy_wasm.start()) } echo ok; @@ -136,8 +134,6 @@ qr/\[error\] .*? previous plan already attached/ if not ok then return ngx.say(err) end - - assert(proxy_wasm.start()) } echo ok; @@ -188,8 +184,6 @@ qr/^[^#]*#0 on_vm_start[^#]* if not ok then return ngx.say(err) end - - assert(proxy_wasm.start()) } echo ok; @@ -240,8 +234,6 @@ qr/^[^#]*#0 on_vm_start[^#]* if not ok then return ngx.say(err) end - - assert(proxy_wasm.start()) } echo ok; @@ -292,11 +284,6 @@ qr/^[^#]*#0 on_vm_start[^#]* if not ok then return ngx.say(err) end - - local ok, err = proxy_wasm.start() - if not ok then - return ngx.say(err) - end } echo ok; @@ -326,7 +313,6 @@ qr/^[^#]*#0 on_vm_start[^#]* local c_plan = assert(proxy_wasm.new(filters)) assert(proxy_wasm.load(c_plan)) assert(proxy_wasm.attach(c_plan)) - assert(proxy_wasm.start()) } echo ok; @@ -445,7 +431,9 @@ bad opts.isolation value: -1 rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -502,7 +490,9 @@ qr/\A\*\d+ proxy_wasm initializing filter chain \(nfilters: 1, isolation: 1\)[^# rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -567,7 +557,9 @@ qr/\A\*\d+ proxy_wasm initializing filter chain \(nfilters: 1, isolation: 2\)[^# assert(proxy_wasm.attach(_G.c_plan, { isolation = proxy_wasm.isolations.NONE })) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } diff --git a/t/04-openresty/ffi/200-proxy_wasm_and_lua_sanity.t b/t/04-openresty/ffi/200-proxy_wasm_and_lua_sanity.t index a075c3b78..1f53c2b86 100644 --- a/t/04-openresty/ffi/200-proxy_wasm_and_lua_sanity.t +++ b/t/04-openresty/ffi/200-proxy_wasm_and_lua_sanity.t @@ -34,7 +34,6 @@ __DATA__ rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) ngx.req.set_header("Lua", "yes") } @@ -88,7 +87,6 @@ request header "Lua: yes" while logging request rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) } echo ok; @@ -132,7 +130,6 @@ resp header "hello: " while logging request rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) } content_by_lua_block { @@ -173,7 +170,6 @@ request body: From Lua while logging request rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) } content_by_lua_block { @@ -229,7 +225,6 @@ Hello world rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) } content_by_lua_block { @@ -295,7 +290,6 @@ qr/\A\*\d+ .*? filter 1\/1 resuming "on_request_headers" step in "rewrite" phase rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) } content_by_lua_block { @@ -361,7 +355,6 @@ qr/\A\*\d+ .*? filter 1\/1 resuming "on_request_headers" step in "rewrite" phase rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) } content_by_lua_block { @@ -413,7 +406,6 @@ freed. /error does not concern itself with any chain. rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) } access_by_lua_block { @@ -480,7 +472,6 @@ are freed. rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.t_plan)) - assert(proxy_wasm.start()) } access_by_lua_block { @@ -496,7 +487,6 @@ are freed. rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.error_plan)) - assert(proxy_wasm.start()) } echo failed; diff --git a/t/04-openresty/ffi/300-proxy_wasm_properties_get_ngx.t b/t/04-openresty/ffi/300-proxy_wasm_properties_get_ngx.t index a74ca3bc7..bc19aa79e 100644 --- a/t/04-openresty/ffi/300-proxy_wasm_properties_get_ngx.t +++ b/t/04-openresty/ffi/300-proxy_wasm_properties_get_ngx.t @@ -71,9 +71,9 @@ ok if prop then ngx.log(ngx.INFO, "ngx.my_var is ", prop) end + } - assert(proxy_wasm.start()) - + content_by_lua_block { ngx.say("ok") } } @@ -112,9 +112,9 @@ qr/\[info\] .*? ngx.my_var is 456/ if prop then ngx.log(ngx.INFO, "ngx.my_var is ", prop) end + } - assert(proxy_wasm.start()) - + content_by_lua_block { ngx.say("ok") } } @@ -149,7 +149,9 @@ qr/\[info\] .*? ngx.my_var is 456/ access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -193,7 +195,9 @@ qr/\[info\] .*? ngx.my_var is 456/ access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -237,7 +241,9 @@ qr/\[info\] .*? ngx.my_var is 456/ access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } diff --git a/t/04-openresty/ffi/301-proxy_wasm_properties_get_host.t b/t/04-openresty/ffi/301-proxy_wasm_properties_get_host.t index fb7925f9e..2fd92a0b8 100644 --- a/t/04-openresty/ffi/301-proxy_wasm_properties_get_host.t +++ b/t/04-openresty/ffi/301-proxy_wasm_properties_get_host.t @@ -68,9 +68,9 @@ ok if prop then ngx.log(ngx.INFO, "wasmx.my_prop is ", prop) end + } - assert(proxy_wasm.start()) - + content_by_lua_block { ngx.say("ok") } } @@ -108,9 +108,9 @@ qr/\[info\] .*? wasmx.my_prop is my_value/ if prop then ngx.log(ngx.INFO, "wasmx.my_prop is ", prop) end + } - assert(proxy_wasm.start()) - + content_by_lua_block { ngx.say("ok") } } @@ -143,7 +143,9 @@ qr/\[info\] .*? wasmx.my_prop is my_value/ local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_property("wasmx.my_prop", "my_value")) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -185,7 +187,9 @@ qr/\[info\] .*? wasmx.my_prop is my_value/ local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_property("wasmx.my_prop", "my_value")) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -227,7 +231,9 @@ qr/\[info\] .*? wasmx.my_prop is my_value/ local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_property("wasmx.my_prop", "my_value")) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -268,13 +274,14 @@ qr/\[info\] .*? wasmx.my_prop is my_value/ access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) local prop, err, ecode = proxy_wasm.get_property("wasmx.my_prop") assert(prop == nil) assert(err == "property \"wasmx.my_prop\" not found") assert(ecode == "missing") + } + content_by_lua_block { ngx.say("ok") } } @@ -306,13 +313,14 @@ ok local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_property("wasmx.my_prop", "")) - assert(proxy_wasm.start()) local prop, err = proxy_wasm.get_property("wasmx.my_prop") ngx.log(ngx.INFO, "wasmx.my_prop: \"", prop, "\", err: ", tostring(err)) assert(prop == "") assert(err == nil) + } + content_by_lua_block { ngx.say("ok") } } diff --git a/t/04-openresty/ffi/302-proxy_wasm_properties_set_ngx.t b/t/04-openresty/ffi/302-proxy_wasm_properties_set_ngx.t index d835de15f..f1cf75ea6 100644 --- a/t/04-openresty/ffi/302-proxy_wasm_properties_set_ngx.t +++ b/t/04-openresty/ffi/302-proxy_wasm_properties_set_ngx.t @@ -104,8 +104,9 @@ ok assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_property("ngx.my_var", "123")) + } - assert(proxy_wasm.start()) + content_by_lua_block { ngx.say("ok") } } @@ -146,8 +147,9 @@ ok access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) + } - assert(proxy_wasm.start()) + content_by_lua_block { ngx.say("ok") } @@ -193,8 +195,9 @@ ok access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) + } - assert(proxy_wasm.start()) + content_by_lua_block { ngx.say("ok") } @@ -240,8 +243,9 @@ ok access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) + } - assert(proxy_wasm.start()) + content_by_lua_block { ngx.say("ok") } diff --git a/t/04-openresty/ffi/303-proxy_wasm_properties_set_host.t b/t/04-openresty/ffi/303-proxy_wasm_properties_set_host.t index dddeaf0a4..74bda3026 100644 --- a/t/04-openresty/ffi/303-proxy_wasm_properties_set_host.t +++ b/t/04-openresty/ffi/303-proxy_wasm_properties_set_host.t @@ -98,8 +98,9 @@ ok assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_property("wasmx.my_property", "my_value")) + } - assert(proxy_wasm.start()) + content_by_lua_block { ngx.say("ok") } } @@ -138,8 +139,9 @@ ok access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) + } - assert(proxy_wasm.start()) + content_by_lua_block { ngx.say("ok") } @@ -183,8 +185,9 @@ ok access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) + } - assert(proxy_wasm.start()) + content_by_lua_block { ngx.say("ok") } @@ -228,8 +231,9 @@ ok access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) + } - assert(proxy_wasm.start()) + content_by_lua_block { ngx.say("ok") } @@ -275,9 +279,9 @@ ok assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_property("wasmx.startup_property", "foo")) + } - assert(proxy_wasm.start()) - + content_by_lua_block { ngx.say("ok") } } diff --git a/t/04-openresty/ffi/305-proxy_wasm_host_properties_getter.t b/t/04-openresty/ffi/305-proxy_wasm_host_properties_getter.t index 13ccf1b26..0a354ae9c 100644 --- a/t/04-openresty/ffi/305-proxy_wasm_host_properties_getter.t +++ b/t/04-openresty/ffi/305-proxy_wasm_host_properties_getter.t @@ -37,7 +37,9 @@ Also testing that setting the getter in-between attach() and start() works. assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(getter, nil)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -78,7 +80,9 @@ wasmx.my_property: my_value assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(getter, nil)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -114,7 +118,9 @@ Also testing that setting the getter after attach() and start() works. rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -159,7 +165,9 @@ wasmx.my_property: my_value rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -209,7 +217,9 @@ wasmx.my_property: my_value access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -259,7 +269,9 @@ wasmx.my_property: my_value assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(getter, nil)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -302,7 +314,9 @@ qr/host trap \(internal error\): could not get \"wasmx.some_property\": some err assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(getter, nil)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -342,7 +356,9 @@ qr/property not found: wasmx.missing_property/ assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(getter, nil)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -404,7 +420,9 @@ With and without 3rd return value (caches result) assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(getter, nil)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -493,7 +511,9 @@ Produces a wasm host trap. assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(getter, nil)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -535,8 +555,9 @@ qr/host trap \(internal error\): could not get \"wasmx\.my_property\": error in if not ok then ngx.log(ngx.ERR, err) end + } - assert(proxy_wasm.start()) + content_by_lua_block { ngx.say("ok") } } @@ -576,7 +597,9 @@ qr/\[error\] .*? error in property getter: rewrite_by_lua.*?: crash\!/ assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(getter, nil)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -617,7 +640,9 @@ Produces a wasm host trap. assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(getter, nil)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } diff --git a/t/04-openresty/ffi/306-proxy_wasm_host_properties_setter.t b/t/04-openresty/ffi/306-proxy_wasm_host_properties_setter.t index 44e0d99f7..1c2074a78 100644 --- a/t/04-openresty/ffi/306-proxy_wasm_host_properties_setter.t +++ b/t/04-openresty/ffi/306-proxy_wasm_host_properties_setter.t @@ -42,7 +42,9 @@ Also testing that setting the getter in-between attach() and start() works. assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(nil, setter)) assert(proxy_wasm.set_property("wasmx.my_property", "my_value")) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -88,7 +90,9 @@ wasmx.my_property: MY_VALUE assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(nil, setter)) assert(proxy_wasm.set_property("wasmx.my_property", "my_value")) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -124,7 +128,9 @@ Also testing that setting the getter after attach() and start() works. rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -174,7 +180,9 @@ wasmx.my_property: MY_VALUE rewrite_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -230,7 +238,9 @@ wasmx.my_property: MY_VALUE access_by_lua_block { local proxy_wasm = require "resty.wasmx.proxy_wasm" assert(proxy_wasm.attach(_G.c_plan)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -305,8 +315,9 @@ With and without 2nd return value (err) ngx.log(ngx.ERR, key, ": ", err) end end + } - assert(proxy_wasm.start()) + content_by_lua_block { ngx.say("ok") } } @@ -376,7 +387,9 @@ With and without 3rd return value (caches result) assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(getter, setter)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } @@ -474,7 +487,9 @@ Produces a wasm host trap. assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(nil, setter)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -516,8 +531,9 @@ qr/host trap \(internal error\): could not set \"wasmx\.my_property\": error in if not ok then ngx.log(ngx.ERR, err) end + } - assert(proxy_wasm.start()) + content_by_lua_block { ngx.say("ok") } } @@ -557,7 +573,9 @@ qr/\[error\] .*? error in property setter: rewrite_by_lua.*?: crash\!/ assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(nil, setter)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } @@ -598,7 +616,9 @@ Produces a wasm host trap. assert(proxy_wasm.attach(_G.c_plan)) assert(proxy_wasm.set_host_properties_handlers(nil, setter)) - assert(proxy_wasm.start()) + } + + content_by_lua_block { ngx.say("ok") } } diff --git a/t/04-openresty/lua-bridge/001-sanity.t b/t/04-openresty/lua-bridge/001-sanity.t index 03457d8b9..16c7e3db3 100644 --- a/t/04-openresty/lua-bridge/001-sanity.t +++ b/t/04-openresty/lua-bridge/001-sanity.t @@ -19,6 +19,7 @@ run_tests(); __DATA__ === TEST 1: Lua bridge - Lua chunk arguments and return values +--- valgrind --- config location /t { wasm_call log ngx_lua_tests test_lua_argsrets; @@ -33,6 +34,7 @@ qr/\[info\] .*? arg: argument/ === TEST 2: Lua bridge - bad Lua chunk +--- valgrind --- config location /t { wasm_call log ngx_lua_tests test_bad_lua_chunk; @@ -45,3 +47,60 @@ qr/\[info\] .*? arg: argument/ ] --- no_error_log [crit] + + + +=== TEST 3: Lua bridge - Lua chunk can error +--- valgrind +--- config + location /t { + wasm_call rewrite ngx_lua_tests test_lua_error; + return 200; + } +--- error_code: 500 +--- error_log eval +[ + qr/\[error\] .*? lua user thread aborted: runtime error:.*? my error/, + "stack traceback:" +] +--- no_error_log +[crit] + + + +=== TEST 4: Lua bridge - Lua chunk can yield +--- config + location /t { + wasm_call rewrite ngx_lua_tests test_lua_sleep; + return 200; + } +--- error_log eval +[ + qr/\[notice\] .*? sleeping for 250ms/, + qr/\[notice\] .*? sleeping for 250ms/ +] +--- no_error_log +[crit] + + + +=== TEST 5: Lua bridge - Lua chunk can error after yielding +--- valgrind +--- load_nginx_modules: ngx_http_echo_module +--- http_config + init_worker_by_lua_block { + _G.sleep_before_error = true + } +--- config + location /t { + wasm_call access ngx_lua_tests test_lua_error; + echo ok; + } +--- error_code: 500 +--- error_log eval +[ + qr/\[error\] .*? lua user thread aborted: runtime error:.*? my error/, + qr/\[notice\] .*? sleeping before error/ +] +--- no_error_log +[crit] diff --git a/t/04-openresty/lua-bridge/002-proxy_wasm_lua_resolver_sanity.t b/t/04-openresty/lua-bridge/002-proxy_wasm_lua_resolver_sanity.t index 096e6003a..02ddca5c7 100644 --- a/t/04-openresty/lua-bridge/002-proxy_wasm_lua_resolver_sanity.t +++ b/t/04-openresty/lua-bridge/002-proxy_wasm_lua_resolver_sanity.t @@ -79,7 +79,64 @@ ok -=== TEST 3: Lua bridge - proxy_wasm_lua_resolver, disabled by default +=== TEST 3: proxy_wasm - proxy_wasm_lua_resolver, sanity + parallel (on_http_call_response) +--- valgrind +--- timeout eval: $::ExtTimeout +--- load_nginx_modules: ngx_http_echo_module +--- wasm_modules: hostcalls +--- config + location /t { + proxy_wasm_lua_resolver on; + proxy_wasm hostcalls 'on=request_headers \ + test=/t/dispatch_http_call \ + hosts=httpbin.org,example.com \ + path=/headers \ + ncalls=2 \ + n_sync_calls=2 \ + on_http_call_response=call_again'; + echo ok; + } +--- response_headers_like +pwm-call-1: .* +pwm-call-2: .* +pwm-call-3: .* +--- response_body +called 3 times +--- no_error_log + + + +=== TEST 4: proxy_wasm - proxy_wasm_lua_resolver, sanity + parallel (on_request_body) +--- timeout eval: $::ExtTimeout +--- load_nginx_modules: ngx_http_echo_module +--- wasm_modules: hostcalls +--- config + location /t { + proxy_wasm_lua_resolver on; + proxy_wasm hostcalls 'on=request_body \ + test=/t/dispatch_http_call \ + hosts=httpbin.org,example.com \ + path=/headers \ + ncalls=2'; + echo ok; + } +--- request +GET /t + +Hello world +--- response_body +ok +--- error_log eval +[ + qr/\[debug\] .*? wasm lua resolved "httpbin\.org" to ".*?"/, + qr/\[debug\] .*? wasm lua resolved "example\.com" to ".*?"/, +] +--- no_error_log +[error] + + + +=== TEST 5: Lua bridge - proxy_wasm_lua_resolver, disabled by default --- load_nginx_modules: ngx_http_echo_module --- wasm_modules: hostcalls --- http_config @@ -114,7 +171,7 @@ lua resolver -=== TEST 4: Lua bridge - proxy_wasm_lua_resolver, explicitly disabled +=== TEST 6: Lua bridge - proxy_wasm_lua_resolver, explicitly disabled --- load_nginx_modules: ngx_http_echo_module --- wasm_modules: hostcalls --- http_config @@ -149,7 +206,7 @@ lua resolver -=== TEST 5: Lua bridge - proxy_wasm_lua_resolver, explicitly disabled while enabled in wasm{} +=== TEST 7: Lua bridge - proxy_wasm_lua_resolver, explicitly disabled while enabled in wasm{} --- load_nginx_modules: ngx_http_echo_module --- main_config eval qq{ @@ -190,7 +247,7 @@ lua resolver -=== TEST 6: Lua bridge - proxy_wasm_lua_resolver, explicitly disabled while enabled in http{} +=== TEST 8: Lua bridge - proxy_wasm_lua_resolver, explicitly disabled while enabled in http{} --- load_nginx_modules: ngx_http_echo_module --- wasm_modules: hostcalls --- http_config @@ -227,7 +284,7 @@ lua resolver -=== TEST 7: Lua bridge - proxy_wasm_lua_resolver, default client settings +=== TEST 9: Lua bridge - proxy_wasm_lua_resolver, default client settings lua-resty-dns-client default timeout is 2000ms NGX_WASM_DEFAULT_RESOLVER_TIMEOUT is 30000ms Needs IPv4 resolution + external I/O to succeed. @@ -261,7 +318,7 @@ Succeeds on: -=== TEST 8: Lua bridge - proxy_wasm_lua_resolver, existing client +=== TEST 10: Lua bridge - proxy_wasm_lua_resolver, existing client --- skip_no_debug --- load_nginx_modules: ngx_http_echo_module --- wasm_modules: hostcalls @@ -278,7 +335,6 @@ Succeeds on: init_worker_by_lua_block { dns_client = require 'resty.dns.client' assert(dns_client.init({ - noSynchronisation = true, hosts = { '127.0.0.1 localhost' } })) } @@ -302,51 +358,7 @@ qr/\[debug\] .*? wasm lua resolver using existing dns_client/ -=== TEST 9: Lua bridge - proxy_wasm_lua_resolver, synchronized client -Too slow for Valgrind. -Succeeds on: -- HTTP 200 (httpbin.org/headers success) -- HTTP 502 (httpbin.org Bad Gateway) -- HTTP 504 (httpbin.org Gateway timeout) ---- skip_no_debug ---- load_nginx_modules: ngx_http_echo_module ---- wasm_modules: hostcalls ---- http_config eval -qq{ - init_worker_by_lua_block { - dns_client = require 'resty.dns.client' - assert(dns_client.init({ - --noSynchronisation = false, -- default - order = { 'A' }, - hosts = { '127.0.0.1 localhost' }, - resolvConf = { - 'nameserver $::ExtResolver', - 'options timeout:$::ExtTimeout', - } - })) - } -} ---- config - location /t { - proxy_wasm_lua_resolver on; - proxy_wasm hostcalls 'on=request_headers \ - test=/t/dispatch_http_call \ - host=httpbin.org \ - path=/headers \ - on_http_call_response=echo_response_body'; - echo failed; - } ---- error_code_like: (200|502|504) ---- response_body_like: ("Host": "httpbin(\.org)?"|.*?502 Bad Gateway.*|.*?504 Gateway Time-out.*) ---- error_log eval -qr/\[debug\] .*? wasm lua resolver using existing dns_client/ ---- no_error_log -[error] -[crit] - - - -=== TEST 10: Lua bridge - proxy_wasm_lua_resolver, SRV record +=== TEST 11: Lua bridge - proxy_wasm_lua_resolver, SRV record --- skip_no_debug --- load_nginx_modules: ngx_http_echo_module --- wasm_modules: hostcalls @@ -397,7 +409,6 @@ qr/\[debug\] .*? wasm lua resolver using existing dns_client/ dns_client = require 'resty.dns.client' assert(dns_client.init({ - noSynchronisation = true, hosts = { '127.0.0.1 localhost' } })) } @@ -423,7 +434,7 @@ hello world -=== TEST 11: Lua bridge - proxy_wasm_lua_resolver, IPv6 record +=== TEST 12: Lua bridge - proxy_wasm_lua_resolver, IPv6 record --- skip_eval: 5: system("cat /sys/module/ipv6/parameters/disable") ne 0 || defined $ENV{GITHUB_ACTIONS} --- timeout eval: $::ExtTimeout --- load_nginx_modules: ngx_http_echo_module @@ -446,7 +457,7 @@ hello world -=== TEST 12: Lua bridge - proxy_wasm_lua_resolver, NXDOMAIN +=== TEST 13: Lua bridge - proxy_wasm_lua_resolver, NXDOMAIN --- load_nginx_modules: ngx_http_echo_module --- wasm_modules: hostcalls --- config @@ -461,7 +472,7 @@ hello world --- response_body_like: 500 Internal Server Error --- grep_error_log eval: qr/\[error\].*/ --- grep_error_log_out eval -qr/\[error\] .*? lua entry thread aborted: .*? wasm lua failed resolving "foo": dns client error: 101 empty record received.*? +qr/\[error\] .*? lua user thread aborted: .*? wasm lua failed resolving "foo": dns client error: 101 empty record received.*? \[error\] .*? dispatch failed: tcp socket - lua resolver failed.*?/ --- no_error_log [crit] @@ -469,8 +480,9 @@ qr/\[error\] .*? lua entry thread aborted: .*? wasm lua failed resolving "foo": -=== TEST 13: Lua bridge - proxy_wasm_lua_resolver, failed before query +=== TEST 14: Lua bridge - proxy_wasm_lua_resolver, failed before query Failure before the Lua thread gets a chance to yield (immediate resolver failure) +--- valgrind --- skip_no_debug --- load_nginx_modules: ngx_http_echo_module --- wasm_modules: hostcalls @@ -484,7 +496,7 @@ Failure before the Lua thread gets a chance to yield (immediate resolver failure dns_client = require 'resty.dns.client' assert(dns_client.init({ - noSynchronisation = true, + noSynchronisation = true, -- avoids the Lua thread from yielding, forcing 'resolver.query' call order = { 'A' }, hosts = { '127.0.0.1 localhost' } })) @@ -503,9 +515,308 @@ Failure before the Lua thread gets a chance to yield (immediate resolver failure --- response_body_like: 500 Internal Server Error --- grep_error_log eval: qr/\[error\].*/ --- grep_error_log_out eval -qr/\[error\] .*? lua entry thread aborted: .*? wasm lua failed resolving "httpbin\.org": some made-up error.*? -\[error\] .*? dispatch failed.*?/ +qr/\[error\] .*? lua user thread aborted: .*? wasm lua failed resolving "httpbin\.org": some made-up error.*? +\[error\] .*? dispatch failed\Z/ --- error_log wasm tcp socket resolver failed before query --- no_error_log [crit] + + + +=== TEST 15: Lua bridge - proxy_wasm_lua_resolver, synchronized client, 2 parallel calls in rewrite +--- valgrind +--- timeout eval: $::ExtTimeout +--- load_nginx_modules: ngx_http_echo_module +--- wasm_modules: hostcalls +--- http_config eval +qq{ + init_worker_by_lua_block { + dns_client = require 'resty.dns.client' + assert(dns_client.init({ + noSynchronisation = false, -- default + order = { 'A' }, + resolvConf = { + 'nameserver $::ExtResolver', + 'options timeout:$::ExtTimeout', + } + })) + } +} +--- config + location /t { + proxy_wasm_lua_resolver on; + proxy_wasm hostcalls 'on=request_headers \ + test=/t/dispatch_http_call \ + host=httpbin.org \ + path=/headers \ + ncalls=2'; + echo ok; + } +--- response_headers_like +pwm-call-id: \d, \d +--- response_body +ok +--- no_error_log +[error] +[crit] + + + +=== TEST 16: Lua bridge - proxy_wasm_lua_resolver, synchronized client, 2 parallel calls in access +--- timeout eval: $::ExtTimeout +--- load_nginx_modules: ngx_http_echo_module +--- wasm_modules: hostcalls +--- http_config eval +qq{ + init_worker_by_lua_block { + dns_client = require 'resty.dns.client' + assert(dns_client.init({ + noSynchronisation = false, -- default + order = { 'A' }, + resolvConf = { + 'nameserver $::ExtResolver', + 'options timeout:$::ExtTimeout', + } + })) + } +} +--- config + location /t { + proxy_wasm_lua_resolver on; + proxy_wasm_request_headers_in_access on; + proxy_wasm hostcalls 'on=request_headers \ + test=/t/dispatch_http_call \ + host=httpbin.org \ + path=/headers \ + ncalls=2'; + echo ok; + } +--- response_headers_like +pwm-call-id: \d, \d +--- response_body +ok +--- no_error_log +[error] +[crit] + + + +=== TEST 17: proxy_wasm - proxy_wasm_lua_resolver, 2 distinct parallel calls in rewrite (no FFI) +--- valgrind +--- timeout eval: $::ExtTimeout +--- load_nginx_modules: ngx_http_echo_module +--- wasm_modules: hostcalls +--- config + location /t { + proxy_wasm_lua_resolver on; + proxy_wasm hostcalls 'on=request_headers \ + test=/t/dispatch_http_call \ + hosts=httpbin.org,example.com \ + path=/headers \ + ncalls=2'; + echo ok; + } +--- response_headers_like +pwm-call-id: \d, \d +--- response_body +ok +--- no_error_log +[error] +[crit] + + + +=== TEST 18: proxy_wasm - proxy_wasm_lua_resolver, 2 distinct parallel calls in rewrite (FFI) +TODO: also test with no_postpone +--- valgrind +--- timeout eval: $::ExtTimeout +--- load_nginx_modules: ngx_http_echo_module +--- wasm_modules: hostcalls +--- config + location /t { + proxy_wasm_lua_resolver on; + + rewrite_by_lua_block { + local proxy_wasm = require "resty.wasmx.proxy_wasm" + local filters = { + { + name = "hostcalls", + config = "test=/t/dispatch_http_call " .. + "hosts=httpbin.org,example.com " .. + "path=/headers " .. + "ncalls=2" + } + } + + local c_plan, err = proxy_wasm.new(filters) + if not c_plan then + ngx.log(ngx.ERR, "failed creating plan: ", err) + return ngx.exit(500) + end + + local ok, err = proxy_wasm.load(c_plan) + if not ok then + ngx.log(ngx.ERR, "failed loading plan: ", err) + return ngx.exit(500) + end + + ok, err = proxy_wasm.attach(c_plan) + if not ok then + ngx.log(ngx.ERR, "failed attaching plan: ", err) + return ngx.exit(500) + end + } + + echo ok; + } +--- response_headers_like +pwm-call-id: \d, \d +--- response_body +ok +--- no_error_log +[error] +[crit] + + + +=== TEST 19: proxy_wasm - proxy_wasm_lua_resolver, 2 distinct parallel calls in access (no FFI) +--- timeout eval: $::ExtTimeout +--- load_nginx_modules: ngx_http_echo_module +--- wasm_modules: hostcalls +--- config + location /t { + proxy_wasm_lua_resolver on; + proxy_wasm_request_headers_in_access on; + proxy_wasm hostcalls 'on=request_headers \ + test=/t/dispatch_http_call \ + hosts=httpbin.org,example.com \ + path=/headers \ + ncalls=2'; + echo ok; + } +--- response_headers_like +pwm-call-id: \d, \d +--- response_body +ok +--- no_error_log +[error] +[crit] + + + +=== TEST 20: proxy_wasm - proxy_wasm_lua_resolver, 2 distinct parallel calls in access (FFI) +TODO: also test with no_postpone +--- timeout eval: $::ExtTimeout +--- load_nginx_modules: ngx_http_echo_module +--- wasm_modules: hostcalls +--- config + location /t { + proxy_wasm_lua_resolver on; + + access_by_lua_block { + local proxy_wasm = require "resty.wasmx.proxy_wasm" + local filters = { + { + name = "hostcalls", + config = "test=/t/dispatch_http_call " .. + "hosts=httpbin.org,example.com " .. + "path=/headers " .. + "ncalls=2" + } + } + + local c_plan, err = proxy_wasm.new(filters) + if not c_plan then + ngx.log(ngx.ERR, "failed creating plan: ", err) + return ngx.exit(500) + end + + local ok, err = proxy_wasm.load(c_plan) + if not ok then + ngx.log(ngx.ERR, "failed loading plan: ", err) + return ngx.exit(500) + end + + ok, err = proxy_wasm.attach(c_plan) + if not ok then + ngx.log(ngx.ERR, "failed attaching plan: ", err) + return ngx.exit(500) + end + } + + echo ok; + } +--- response_headers_like +pwm-call-id: \d, \d +--- response_body +ok +--- no_error_log +[error] +[crit] + + + +=== TEST 21: proxy_wasm - proxy_wasm_lua_resolver, subsequent multiple calls in rewrite + content +--- timeout eval: $::ExtTimeout +--- load_nginx_modules: ngx_http_echo_module +--- wasm_modules: hostcalls +--- config + location /t { + proxy_wasm_lua_resolver on; + proxy_wasm hostcalls 'on=request_headers \ + test=/t/dispatch_http_call \ + hosts=httpbin.org,example.com \ + path=/headers \ + ncalls=2'; + proxy_wasm hostcalls 'on=request_body \ + test=/t/dispatch_http_call \ + hosts=httpbin.org,example.com \ + path=/headers \ + ncalls=2'; + echo ok; + } +--- request +POST /t +Hello world +--- response_headers_like +pwm-call-id: \d, \d, \d, \d +--- response_body +ok +--- no_error_log +[error] +[crit] + + + +=== TEST 22: proxy_wasm - proxy_wasm_lua_resolver, subsequent multiple calls in access + content +--- timeout eval: $::ExtTimeout +--- load_nginx_modules: ngx_http_echo_module +--- wasm_modules: hostcalls +--- config + location /t { + proxy_wasm_lua_resolver on; + proxy_wasm_request_headers_in_access on; + proxy_wasm hostcalls 'on=request_headers \ + test=/t/dispatch_http_call \ + hosts=httpbin.org,example.com \ + path=/headers \ + ncalls=2'; + proxy_wasm hostcalls 'on=request_body \ + test=/t/dispatch_http_call \ + hosts=httpbin.org,example.com \ + path=/headers \ + ncalls=2'; + echo ok; + } +--- request +GET /t + +Hello world +--- response_headers_like +pwm-call-id: \d, \d, \d, \d +--- response_body +ok +--- no_error_log +[error] +[crit] diff --git a/t/04-openresty/lua-bridge/003-proxy_wasm_lua_resolver_timeouts.t b/t/04-openresty/lua-bridge/003-proxy_wasm_lua_resolver_timeouts.t index b7a7c72c7..0c76c8939 100644 --- a/t/04-openresty/lua-bridge/003-proxy_wasm_lua_resolver_timeouts.t +++ b/t/04-openresty/lua-bridge/003-proxy_wasm_lua_resolver_timeouts.t @@ -90,7 +90,7 @@ qq{ --- grep_error_log eval: qr/\[error\].*/ --- grep_error_log_out eval qr/\[error\] .*? lua udp socket read timed out.*? -\[error\] .*? lua entry thread aborted: .*? wasm lua failed resolving "timeout_trigger": failed to receive reply.*? +\[error\] .*? lua user thread aborted: .*? wasm lua failed resolving "timeout_trigger": failed to receive reply.*? \[error\] .*? dispatch failed: tcp socket - lua resolver failed.*?/ --- no_error_log [crit] @@ -144,7 +144,7 @@ ok --- grep_error_log eval: qr/\[error\].*/ --- grep_error_log_out eval qr/\[error\] .*? lua udp socket read timed out.*? -\[error\] .*? lua entry thread aborted: .*? wasm lua failed resolving "httpbin\.org": failed to receive reply.*? +\[error\] .*? lua user thread aborted: .*? wasm lua failed resolving "httpbin\.org": failed to receive reply.*? \[error\] .*? dispatch failed: tcp socket - lua resolver failed.*?/ --- no_error_log [crit] diff --git a/t/lib/ngx-lua-tests/src/lua_bridge.rs b/t/lib/ngx-lua-tests/src/lua_bridge.rs index c1033f152..a99a1e702 100644 --- a/t/lib/ngx-lua-tests/src/lua_bridge.rs +++ b/t/lib/ngx-lua-tests/src/lua_bridge.rs @@ -1,6 +1,8 @@ extern "C" { fn ngx_wasm_lua_test_argsrets(); fn ngx_wasm_lua_test_bad_chunk(); + fn ngx_wasm_lua_test_error(); + fn ngx_wasm_lua_test_sleep(); } #[no_mangle] @@ -12,3 +14,13 @@ pub fn test_lua_argsrets() { pub fn test_bad_lua_chunk() { unsafe { ngx_wasm_lua_test_bad_chunk() } } + +#[no_mangle] +pub fn test_lua_error() { + unsafe { ngx_wasm_lua_test_error() } +} + +#[no_mangle] +pub fn test_lua_sleep() { + unsafe { ngx_wasm_lua_test_sleep() } +} diff --git a/t/lib/proxy-wasm-tests/hostcalls/src/filter.rs b/t/lib/proxy-wasm-tests/hostcalls/src/filter.rs index bea8a9391..9f771c645 100644 --- a/t/lib/proxy-wasm-tests/hostcalls/src/filter.rs +++ b/t/lib/proxy-wasm-tests/hostcalls/src/filter.rs @@ -81,7 +81,7 @@ impl Context for TestHttp { if let Some(response) = bytes { let body = String::from_utf8_lossy(&response); self.add_http_response_header( - format!("pwm-call-{}", self.n_sync_calls + 1).as_str(), + format!("pwm-call-{}", token_id).as_str(), body.trim(), ); } @@ -93,7 +93,7 @@ impl Context for TestHttp { if self.n_sync_calls < again { self.n_sync_calls += 1; - self.send_http_dispatch(); + self.send_http_dispatch(self.n_sync_calls - 1); return; } diff --git a/t/lib/proxy-wasm-tests/hostcalls/src/lib.rs b/t/lib/proxy-wasm-tests/hostcalls/src/lib.rs index 783432b53..7e992e757 100644 --- a/t/lib/proxy-wasm-tests/hostcalls/src/lib.rs +++ b/t/lib/proxy-wasm-tests/hostcalls/src/lib.rs @@ -173,7 +173,6 @@ impl RootContext for TestRoot { config: self.config.clone(), on_phases: phases, n_sync_calls: 0, - ncalls: 0, })) } } diff --git a/t/lib/proxy-wasm-tests/hostcalls/src/types/test_http.rs b/t/lib/proxy-wasm-tests/hostcalls/src/types/test_http.rs index 0fda3ce29..ace8a7381 100644 --- a/t/lib/proxy-wasm-tests/hostcalls/src/types/test_http.rs +++ b/t/lib/proxy-wasm-tests/hostcalls/src/types/test_http.rs @@ -7,7 +7,6 @@ pub struct TestHttp { pub on_phases: Vec, pub config: HashMap, pub n_sync_calls: usize, - pub ncalls: usize, } impl TestHttp { @@ -101,8 +100,8 @@ impl TestHttp { .get("ncalls") .map_or(1, |v| v.parse().expect("bad ncalls value")); - for _ in 0..n { - self.send_http_dispatch(); + for i in 0..n { + self.send_http_dispatch(i); } return Action::Pause; @@ -141,7 +140,7 @@ impl TestHttp { Action::Continue } - pub fn send_http_dispatch(&mut self) { + pub fn send_http_dispatch(&mut self, i: usize) { let mut timeout = Duration::from_secs(0); let mut headers = Vec::new(); let mut path = self @@ -207,8 +206,13 @@ impl TestHttp { headers.push(("Host", "ignoreme")); headers.push(("connection", "ignoreme")); + let host = match self.get_config("hosts") { + Some(list) => list.split(',').collect::>()[i], + None => self.get_config("host").unwrap_or(""), + }; + self.dispatch_http_call( - self.get_config("host").unwrap_or(""), + host, headers, self.get_config("body").map(|v| v.as_bytes()), vec![],