Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaultcha committed May 5, 2024
1 parent fbe227c commit 976e74a
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 100 deletions.
2 changes: 1 addition & 1 deletion lib/resty/wasmx/proxy_wasm.lua
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ function _M.load(c_plan)
end

if get_request() then
-- FFI GC: hold a reference tied to the request lifecycle so users
-- ffi_gc: hold a reference tied to the request lifecycle so users
-- don't have to (like our test suite).
if not ngx.ctx[_M] then
ngx.ctx[_M] = {}
Expand Down
149 changes: 68 additions & 81 deletions src/common/lua/ngx_wasm_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,33 @@ static const char *WASM_LUA_ENTRY_SCRIPT = ""
"end\n";


unsigned
ngx_wasm_lua_running(ngx_wasm_subsys_env_t *env)
{
ngx_wasm_lua_ctx_t *entry_lctx = env->entry_lctx;

if (entry_lctx == NULL) {
return 0;
}

return !ngx_queue_empty(&entry_lctx->sub_ctxs);
}


static void
destroy_thread(ngx_wasm_lua_ctx_t *lctx)
{
ngx_log_debug2(NGX_LOG_DEBUG_WASM, ngx_cycle->log, 0,
"wasm freeing lua%sthread (lctx: %p)",
lctx->entry ? " entry " : " ", lctx);
lctx->entry ? " entry " : " user ", lctx);

ngx_pfree(lctx->pool, lctx->cache_key);
ngx_pfree(lctx->pool, lctx);
}


static ngx_inline unsigned
entry_thread_empty(ngx_wasm_subsys_env_t *env)
{
ngx_wasm_lua_ctx_t *entry_lctx = env->entry_lctx;

if (entry_lctx == NULL) {
return 1;
}

return ngx_queue_empty(&entry_lctx->sub_ctxs);
}


static void
thread_cleanup_handler(void *data)
entry_thread_cleanup_handler(void *data)
{
ngx_wasm_lua_ctx_t *lctx = data;

Expand Down Expand Up @@ -87,9 +87,8 @@ entry_thread_start(ngx_wasm_subsys_env_t *env)

if (entry_lctx == NULL) {
/**
* In OpenResty, all user threads *must* be attached to a parent
* coroutine, so we create a "fake" one simulating a
* *_by_lua_block context.
* In OpenResty, all uthreads *must* be attached to a parent coroutine,
* so we create a "fake" one simulating a *_by_lua_block context.
*/
dd("creating entry thread");

Expand Down Expand Up @@ -216,10 +215,6 @@ thread_init(ngx_wasm_lua_ctx_t *lctx)
#endif

if (!lctx->entry) {
/**
* Attach new user thread to entry_ctx for OpenResty's internals;
* entry_ctx can be a *_by_lua_block context or our fake entry_ctx.
*/
coctx->parent_co_ctx = &ctx->entry_co_ctx;
}

Expand Down Expand Up @@ -266,7 +261,7 @@ thread_handle_rc(ngx_wasm_lua_ctx_t *lctx, ngx_int_t rc)

} else {
/* thread is dead, determine state by checking its return value placed
* on the stack by OpenResty's user thread implementation */
* on the stack by OpenResty's uthread implementation */
ngx_wa_assert(lua_isboolean(lctx->co, 1));

lctx->co_ctx->co_status = NGX_HTTP_LUA_CO_DEAD;
Expand All @@ -281,19 +276,46 @@ thread_handle_rc(ngx_wasm_lua_ctx_t *lctx, ngx_int_t rc)
switch (rc) {
case NGX_AGAIN:
dd("wasm lua thread yield");
ngx_wa_assert(lctx->yielded);

if (lctx->entry) {
/* find the pending sleep timer to cancel at pool cleanup */
sentinel = ngx_event_timer_rbtree.sentinel;
root = ngx_event_timer_rbtree.root;

if (root != sentinel) {
for (node = ngx_rbtree_min(root, sentinel);
node;
node = ngx_rbtree_next(&ngx_event_timer_rbtree, node))
{
ev = ngx_rbtree_data(node, ngx_event_t, timer);

if (ev->data == entry_lctx->co_ctx) {
entry_lctx->ev = ev;
break;
}
}
}
}

ngx_wasm_yield(env);
break;
case NGX_OK:
dd("wasm lua thread finished");
ngx_wa_assert(thread_is_dead(lctx));

if (!lctx->entry) {
lctx->finished = 1;
ngx_queue_remove(&lctx->q);

if (entry_thread_empty(env)) {
/* last yielding thread finished */
ngx_wasm_continue(env);
}

if (lctx->success_handler) {
(void) lctx->success_handler(lctx);
}

lctx->finished = 1;
ngx_queue_remove(&lctx->q);
}

break;
Expand All @@ -302,13 +324,12 @@ thread_handle_rc(ngx_wasm_lua_ctx_t *lctx, ngx_int_t rc)
ngx_wasm_error(env);

if (!lctx->entry) {
lctx->finished = 1;
ngx_queue_remove(&lctx->q);

if (lctx->error_handler) {
(void) lctx->error_handler(lctx);
goto done;
}

lctx->finished = 1;
ngx_queue_remove(&lctx->q);
}

break;
Expand All @@ -320,38 +341,14 @@ thread_handle_rc(ngx_wasm_lua_ctx_t *lctx, ngx_int_t rc)
break;
}

if (lctx->entry) {
/* find the pending sleep timer to cancel at pool cleanup */
sentinel = ngx_event_timer_rbtree.sentinel;
root = ngx_event_timer_rbtree.root;

if (root == sentinel) {
goto done;
}

for (node = ngx_rbtree_min(root, sentinel);
node;
node = ngx_rbtree_next(&ngx_event_timer_rbtree, node))
{
ev = ngx_rbtree_data(node, ngx_event_t, timer);

if (ev->data == entry_lctx->co_ctx) {
entry_lctx->ev = ev;
break;
}
}
}
ngx_wa_assert(rc == NGX_OK
|| rc == NGX_AGAIN
|| rc == NGX_ERROR);

if (lctx->finished) {
destroy_thread(lctx);
}

done:

ngx_wa_assert(rc == NGX_OK
|| rc == NGX_AGAIN
|| rc == NGX_ERROR);

return rc;
}

Expand All @@ -365,7 +362,7 @@ thread_resume(ngx_wasm_lua_ctx_t *lctx)
ngx_log_debug4(NGX_LOG_DEBUG_WASM, lctx->log, 0,
"wasm resuming lua%sthread "
"(lctx: %p, L: %p, co: %p)",
lctx->entry ? " entry " : " ",
lctx->entry ? " entry " : " user ",
lctx, lctx->L, lctx->co);

switch (env->subsys->kind) {
Expand All @@ -391,7 +388,7 @@ thread_resume(ngx_wasm_lua_ctx_t *lctx)
}

dd("lua%sthread resume handler rc: %ld",
lctx->entry ? " entry " : " ", rc);
lctx->entry ? " entry " : " user ", rc);

return thread_handle_rc(lctx, rc);
}
Expand Down Expand Up @@ -433,10 +430,14 @@ ngx_wasm_lua_thread_new(const char *tag, const char *src,
goto error;
}

cln->handler = thread_cleanup_handler;
cln->handler = entry_thread_cleanup_handler;
cln->data = lctx;
}

ngx_log_debug2(NGX_LOG_DEBUG_WASM, lctx->log, 0,
"wasm creating new lua%sthread (lctx: %p)",
lctx->entry ? " entry " : " user ", lctx);

/* Lua VM + thread */

switch (env->subsys->kind) {
Expand Down Expand Up @@ -536,7 +537,7 @@ ngx_wasm_lua_thread_run(ngx_wasm_lua_ctx_t *lctx)

ngx_log_debug4(NGX_LOG_DEBUG_WASM, lctx->log, 0,
"wasm running lua%sthread (lctx: %p, L: %p, co: %p)",
lctx->entry ? " entry " : " ",
lctx->entry ? " entry " : " user ",
lctx, lctx->L, lctx->co);

if (env->entry_lctx == NULL) {
Expand Down Expand Up @@ -582,29 +583,15 @@ ngx_wasm_lua_thread_run(ngx_wasm_lua_ctx_t *lctx)

dd("lua_run_thread rc: %ld", rc);

if (rc == NGX_AGAIN
&& !lctx->entry
&& !lua_isboolean(lctx->co, 1))
{
/* thread is yielded, stash the co_ctx->data pointer and place our
* lctx instead, we will swap it back before resuming */
if (rc == NGX_AGAIN && !lua_isboolean(lctx->co, 1)) {
lctx->yielded = 1;
}

if (entry_lctx && !lctx->entry) {
ngx_queue_insert_tail(&entry_lctx->sub_ctxs, &lctx->q);
}

rc = thread_handle_rc(lctx, rc);

#if (NGX_DEBUG)
if (rc == NGX_OK) {
ngx_wa_assert(!lctx->yielded);
ngx_wa_assert(lctx->finished);
}
#endif

return rc;
return thread_handle_rc(lctx, rc);
}


Expand All @@ -627,8 +614,8 @@ ngx_wasm_lua_resume(ngx_wasm_subsys_env_t *env)

dd("enter");

if (!ngx_wasm_lua_running(env)) {
goto done;
if (entry_thread_empty(env)) {
return NGX_OK;
}

ctx = ngx_http_get_module_ctx(env->ctx.rctx->r, ngx_http_lua_module);
Expand Down Expand Up @@ -672,14 +659,14 @@ ngx_wasm_lua_resume(ngx_wasm_subsys_env_t *env)
ngx_wa_assert(coctx != &ctx->entry_co_ctx);
}

dd("resuming%slctx: %p", lctx->entry ? " entry " : " ", lctx);
dd("resuming%slctx: %p", lctx->entry ? " entry " : " user ", lctx);

rc = thread_resume(lctx);
if (rc == NGX_ERROR) {
return NGX_ERROR;
}

done:
dd("rc: %ld, state: %d", rc, env->state);

switch (env->state) {
case NGX_WASM_STATE_YIELD:
Expand Down
2 changes: 0 additions & 2 deletions src/common/lua/ngx_wasm_lua.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct ngx_wasm_lua_ctx_s {
ngx_wasm_lua_handler_pt success_handler;
ngx_event_t *ev; /* entry lctx sleep event */
void *data;
void *data2;

const char *code;
u_char *cache_key;
Expand Down Expand Up @@ -55,7 +54,6 @@ struct ngx_wasm_lua_ctx_s {
};


unsigned ngx_wasm_lua_running(ngx_wasm_subsys_env_t *env);
ngx_wasm_lua_ctx_t *ngx_wasm_lua_thread_new(const char *tag, const char *src,
ngx_wasm_subsys_env_t *env, ngx_log_t *log, void *data,
ngx_wasm_lua_handler_pt success_handler,
Expand Down
5 changes: 4 additions & 1 deletion src/common/ngx_wasm_socket_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ ngx_wasm_socket_tcp_connect(ngx_wasm_socket_tcp_t *sock)
"wasm tcp socket resolving...");

rc = resolver_pt(rslv_ctx);

if (rc != NGX_OK && rc != NGX_AGAIN) {
ngx_log_debug0(NGX_LOG_DEBUG_WASM, sock->log, 0,
"wasm tcp socket resolver failed before query");
Expand Down Expand Up @@ -510,6 +509,10 @@ ngx_wasm_socket_resolve_handler(ngx_resolver_ctx_t *ctx)

/* connect */

/* Note: the Lua bridge may have finished all threads and
* resumed continuation, but we still need to hold the yield */
ngx_wasm_yield(sock->env);

ngx_wasm_socket_tcp_connect_peer(sock);

return;
Expand Down
3 changes: 2 additions & 1 deletion src/http/ngx_http_wasm_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ ngx_http_wasm_rewrite_handler(ngx_http_request_t *r)
goto done;
}

#if 0
if (rctx->fake_request) {
/* Our wasm lua entry thread sleep handler is resuming on
* ngx_http_core_run_phases as we are in a tick or background phase; we
Expand All @@ -779,6 +780,7 @@ ngx_http_wasm_rewrite_handler(ngx_http_request_t *r)
rc = NGX_DONE;
goto done;
}
#endif
#endif

rc = ngx_wasm_ops_resume(&rctx->opctx, NGX_HTTP_REWRITE_PHASE);
Expand Down Expand Up @@ -1137,7 +1139,6 @@ ngx_http_wasm_wev_handler(ngx_http_request_t *r)
rctx->in_wev = 1;

} else if (rctx->in_wev) {
/* TODO */
return;
}

Expand Down
Loading

0 comments on commit 976e74a

Please sign in to comment.