Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaultcha committed May 4, 2024
1 parent 54be376 commit 6387bae
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 86 deletions.
119 changes: 53 additions & 66 deletions src/common/lua/ngx_wasm_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static const char *WASM_LUA_ENTRY_SCRIPT = ""
"end\n";


unsigned
static ngx_inline unsigned
ngx_wasm_lua_running(ngx_wasm_subsys_env_t *env)
{
ngx_wasm_lua_ctx_t *entry_lctx = env->entry_lctx;
Expand All @@ -45,7 +45,7 @@ destroy_thread(ngx_wasm_lua_ctx_t *lctx)
{
ngx_log_debug2(NGX_LOG_DEBUG_WASM, ngx_cycle->log, 0,
"wasm freeing lua%sthread (lctx: %p)",
lctx->entry ? " entry " : " ", lctx);
lctx->entry ? " entry " : " user ", lctx);

ngx_pfree(lctx->pool, lctx->cache_key);
ngx_pfree(lctx->pool, lctx);
Expand Down Expand Up @@ -87,9 +87,8 @@ entry_thread_start(ngx_wasm_subsys_env_t *env)

if (entry_lctx == NULL) {
/**
* In OpenResty, all user threads *must* be attached to a parent
* coroutine, so we create a "fake" one simulating a
* *_by_lua_block context.
* In OpenResty, all uthreads *must* be attached to a parent coroutine,
* so we create a "fake" one simulating a *_by_lua_block context.
*/
dd("creating entry thread");

Expand Down Expand Up @@ -216,10 +215,6 @@ thread_init(ngx_wasm_lua_ctx_t *lctx)
#endif

if (!lctx->entry) {
/**
* Attach new user thread to entry_ctx for OpenResty's internals;
* entry_ctx can be a *_by_lua_block context or our fake entry_ctx.
*/
coctx->parent_co_ctx = &ctx->entry_co_ctx;
}

Expand Down Expand Up @@ -266,7 +261,7 @@ thread_handle_rc(ngx_wasm_lua_ctx_t *lctx, ngx_int_t rc)

} else {
/* thread is dead, determine state by checking its return value placed
* on the stack by OpenResty's user thread implementation */
* on the stack by OpenResty's uthread implementation */
ngx_wa_assert(lua_isboolean(lctx->co, 1));

lctx->co_ctx->co_status = NGX_HTTP_LUA_CO_DEAD;
Expand All @@ -281,19 +276,46 @@ thread_handle_rc(ngx_wasm_lua_ctx_t *lctx, ngx_int_t rc)
switch (rc) {
case NGX_AGAIN:
dd("wasm lua thread yield");
ngx_wa_assert(lctx->yielded);

if (lctx->entry) {
/* find the pending sleep timer to cancel at pool cleanup */
sentinel = ngx_event_timer_rbtree.sentinel;
root = ngx_event_timer_rbtree.root;

if (root != sentinel) {
for (node = ngx_rbtree_min(root, sentinel);
node;
node = ngx_rbtree_next(&ngx_event_timer_rbtree, node))
{
ev = ngx_rbtree_data(node, ngx_event_t, timer);

if (ev->data == entry_lctx->co_ctx) {
entry_lctx->ev = ev;
break;
}
}
}
}

ngx_wasm_yield(env);
break;
case NGX_OK:
dd("wasm lua thread finished");
ngx_wa_assert(thread_is_dead(lctx));

if (!lctx->entry) {
lctx->finished = 1;
ngx_queue_remove(&lctx->q);

if (!ngx_wasm_lua_running(env)) {
/* last yielding thread finished */
ngx_wasm_continue(env);
}

if (lctx->success_handler) {
(void) lctx->success_handler(lctx);
}

lctx->finished = 1;
ngx_queue_remove(&lctx->q);
}

break;
Expand All @@ -302,13 +324,12 @@ thread_handle_rc(ngx_wasm_lua_ctx_t *lctx, ngx_int_t rc)
ngx_wasm_error(env);

if (!lctx->entry) {
lctx->finished = 1;
ngx_queue_remove(&lctx->q);

if (lctx->error_handler) {
(void) lctx->error_handler(lctx);
goto done;
}

lctx->finished = 1;
ngx_queue_remove(&lctx->q);
}

break;
Expand All @@ -320,38 +341,14 @@ thread_handle_rc(ngx_wasm_lua_ctx_t *lctx, ngx_int_t rc)
break;
}

if (lctx->entry) {
/* find the pending sleep timer to cancel at pool cleanup */
sentinel = ngx_event_timer_rbtree.sentinel;
root = ngx_event_timer_rbtree.root;

if (root == sentinel) {
goto done;
}

for (node = ngx_rbtree_min(root, sentinel);
node;
node = ngx_rbtree_next(&ngx_event_timer_rbtree, node))
{
ev = ngx_rbtree_data(node, ngx_event_t, timer);

if (ev->data == entry_lctx->co_ctx) {
entry_lctx->ev = ev;
break;
}
}
}
ngx_wa_assert(rc == NGX_OK
|| rc == NGX_AGAIN
|| rc == NGX_ERROR);

if (lctx->finished) {
destroy_thread(lctx);
}

done:

ngx_wa_assert(rc == NGX_OK
|| rc == NGX_AGAIN
|| rc == NGX_ERROR);

return rc;
}

Expand All @@ -365,7 +362,7 @@ thread_resume(ngx_wasm_lua_ctx_t *lctx)
ngx_log_debug4(NGX_LOG_DEBUG_WASM, lctx->log, 0,
"wasm resuming lua%sthread "
"(lctx: %p, L: %p, co: %p)",
lctx->entry ? " entry " : " ",
lctx->entry ? " entry " : " user ",
lctx, lctx->L, lctx->co);

switch (env->subsys->kind) {
Expand All @@ -391,7 +388,7 @@ thread_resume(ngx_wasm_lua_ctx_t *lctx)
}

dd("lua%sthread resume handler rc: %ld",
lctx->entry ? " entry " : " ", rc);
lctx->entry ? " entry " : " user ", rc);

return thread_handle_rc(lctx, rc);
}
Expand Down Expand Up @@ -437,6 +434,10 @@ ngx_wasm_lua_thread_new(const char *tag, const char *src,
cln->data = lctx;
}

ngx_log_debug2(NGX_LOG_DEBUG_WASM, lctx->log, 0,
"wasm creating new lua%sthread (lctx: %p)",
lctx->entry ? " entry " : " user ", lctx);

/* Lua VM + thread */

switch (env->subsys->kind) {
Expand Down Expand Up @@ -536,7 +537,7 @@ ngx_wasm_lua_thread_run(ngx_wasm_lua_ctx_t *lctx)

ngx_log_debug4(NGX_LOG_DEBUG_WASM, lctx->log, 0,
"wasm running lua%sthread (lctx: %p, L: %p, co: %p)",
lctx->entry ? " entry " : " ",
lctx->entry ? " entry " : " user ",
lctx, lctx->L, lctx->co);

if (env->entry_lctx == NULL) {
Expand Down Expand Up @@ -582,29 +583,15 @@ ngx_wasm_lua_thread_run(ngx_wasm_lua_ctx_t *lctx)

dd("lua_run_thread rc: %ld", rc);

if (rc == NGX_AGAIN
&& !lctx->entry
&& !lua_isboolean(lctx->co, 1))
{
/* thread is yielded, stash the co_ctx->data pointer and place our
* lctx instead, we will swap it back before resuming */
if (rc == NGX_AGAIN && !lua_isboolean(lctx->co, 1)) {
lctx->yielded = 1;
}

if (entry_lctx && !lctx->entry) {
ngx_queue_insert_tail(&entry_lctx->sub_ctxs, &lctx->q);
}

rc = thread_handle_rc(lctx, rc);

#if (NGX_DEBUG)
if (rc == NGX_OK) {
ngx_wa_assert(!lctx->yielded);
ngx_wa_assert(lctx->finished);
}
#endif

return rc;
return thread_handle_rc(lctx, rc);
}


Expand All @@ -628,7 +615,7 @@ ngx_wasm_lua_resume(ngx_wasm_subsys_env_t *env)
dd("enter");

if (!ngx_wasm_lua_running(env)) {
goto done;
return NGX_OK;
}

ctx = ngx_http_get_module_ctx(env->ctx.rctx->r, ngx_http_lua_module);
Expand Down Expand Up @@ -672,14 +659,14 @@ ngx_wasm_lua_resume(ngx_wasm_subsys_env_t *env)
ngx_wa_assert(coctx != &ctx->entry_co_ctx);
}

dd("resuming%slctx: %p", lctx->entry ? " entry " : " ", lctx);
dd("resuming%slctx: %p", lctx->entry ? " entry " : " user ", lctx);

rc = thread_resume(lctx);
if (rc == NGX_ERROR) {
return NGX_ERROR;
}

done:
dd("rc: %ld, state: %d", rc, env->state);

switch (env->state) {
case NGX_WASM_STATE_YIELD:
Expand Down
2 changes: 0 additions & 2 deletions src/common/lua/ngx_wasm_lua.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct ngx_wasm_lua_ctx_s {
ngx_wasm_lua_handler_pt success_handler;
ngx_event_t *ev; /* entry lctx sleep event */
void *data;
void *data2;

const char *code;
u_char *cache_key;
Expand Down Expand Up @@ -55,7 +54,6 @@ struct ngx_wasm_lua_ctx_s {
};


unsigned ngx_wasm_lua_running(ngx_wasm_subsys_env_t *env);
ngx_wasm_lua_ctx_t *ngx_wasm_lua_thread_new(const char *tag, const char *src,
ngx_wasm_subsys_env_t *env, ngx_log_t *log, void *data,
ngx_wasm_lua_handler_pt success_handler,
Expand Down
11 changes: 7 additions & 4 deletions src/common/ngx_wasm_socket_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ ngx_wasm_socket_tcp_resume(ngx_wasm_socket_tcp_t *sock)

switch (rc) {
case NGX_AGAIN:
ngx_wasm_yield(&rctx->env);
ngx_wasm_yield(sock->env);
break;
case NGX_ERROR:
ngx_wasm_error(&rctx->env);
ngx_wasm_error(sock->env);
break;
default:
ngx_wa_assert(rc == NGX_OK);
ngx_wasm_continue(&rctx->env);
ngx_wasm_continue(sock->env);
break;
}

Expand Down Expand Up @@ -400,7 +400,6 @@ ngx_wasm_socket_tcp_connect(ngx_wasm_socket_tcp_t *sock)
"wasm tcp socket resolving...");

rc = resolver_pt(rslv_ctx);

if (rc != NGX_OK && rc != NGX_AGAIN) {
ngx_log_debug0(NGX_LOG_DEBUG_WASM, sock->log, 0,
"wasm tcp socket resolver failed before query");
Expand Down Expand Up @@ -510,6 +509,10 @@ ngx_wasm_socket_resolve_handler(ngx_resolver_ctx_t *ctx)

/* connect */

/* Note: the Lua bridge may have finished all threads and
* resumed continuation, but we still need to hold the yield */
ngx_wasm_yield(sock->env);

ngx_wasm_socket_tcp_connect_peer(sock);

return;
Expand Down
2 changes: 2 additions & 0 deletions src/http/ngx_http_wasm_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ ngx_http_wasm_rewrite_handler(ngx_http_request_t *r)
goto done;
}

#if 0
if (rctx->fake_request) {
/* Our wasm lua entry thread sleep handler is resuming on
* ngx_http_core_run_phases as we are in a tick or background phase; we
Expand All @@ -779,6 +780,7 @@ ngx_http_wasm_rewrite_handler(ngx_http_request_t *r)
rc = NGX_DONE;
goto done;
}
#endif
#endif

rc = ngx_wasm_ops_resume(&rctx->opctx, NGX_HTTP_REWRITE_PHASE);
Expand Down
34 changes: 34 additions & 0 deletions src/wasm/ngx_wasm_core_host.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,35 @@ ngx_wasm_hfuncs_test_lua_error(ngx_wavm_instance_t *instance,

return NGX_WAVM_OK;
}


ngx_int_t
ngx_wasm_hfuncs_test_lua_sleep(ngx_wavm_instance_t *instance,
wasm_val_t args[], wasm_val_t rets[])
{
ngx_wasm_lua_ctx_t *lctx;
#if (NGX_WASM_HTTP)
ngx_http_wasm_req_ctx_t *rctx = instance->data;
#endif
static const char *SCRIPT_NAME = "sleep_lua_chunk";
static const char *SCRIPT = "for i = 1, 2 do\n"
" print('sleeping for 250ms')\n"
" ngx.sleep(0.25)\n"
"end";

lctx = ngx_wasm_lua_thread_new(SCRIPT_NAME,
SCRIPT,
&rctx->env,
instance->log,
NULL, NULL, NULL);
if (lctx == NULL) {
return NGX_WAVM_ERROR;
}

(void) ngx_wasm_lua_thread_run(lctx);

return NGX_WAVM_OK;
}
#endif


Expand All @@ -138,6 +167,11 @@ static ngx_wavm_host_func_def_t ngx_wasm_core_hfuncs[] = {
&ngx_wasm_hfuncs_test_lua_error,
NULL,
NULL },

{ ngx_string("ngx_wasm_lua_test_sleep"),
&ngx_wasm_hfuncs_test_lua_sleep,
NULL,
NULL },
#endif

ngx_wavm_hfunc_null
Expand Down
Loading

0 comments on commit 6387bae

Please sign in to comment.