Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaultcha committed May 14, 2024
1 parent 8fd1fcb commit 7d00f19
Show file tree
Hide file tree
Showing 19 changed files with 215 additions and 175 deletions.
4 changes: 3 additions & 1 deletion src/common/lua/ngx_wasm_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,9 @@ thread_handle_rc(ngx_wasm_lua_ctx_t *lctx, ngx_int_t rc)
ngx_queue_remove(&lctx->q);

if (lctx->error_handler) {
(void) lctx->error_handler(lctx);
/* error_handler can override the rc (e.g. lua resolver errors
* are ignored by the request flow */
rc = lctx->error_handler(lctx);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/common/lua/ngx_wasm_lua_resolver.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ ngx_wasm_lua_resolver_error_handler(ngx_wasm_lua_ctx_t *lctx)
rslv_ctx->state = NGX_WASM_LUA_RESOLVE_ERR;

rslv_ctx->handler(rslv_ctx);
}

/* if error before yielding, we already freed the thread */
return NGX_OK;
}

dd("exit");

return NGX_OK;
return NGX_ERROR;
}


Expand Down
60 changes: 35 additions & 25 deletions src/common/ngx_wasm_socket_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,45 +72,37 @@ ngx_wasm_socket_tcp_err(ngx_wasm_socket_tcp_t *sock,
}


static void
static ngx_int_t
ngx_wasm_socket_tcp_resume(ngx_wasm_socket_tcp_t *sock)
{
#if (NGX_WASM_HTTP)
ngx_int_t rc;
ngx_http_wasm_req_ctx_t *rctx;
#endif
ngx_int_t rc = NGX_ERROR;

ngx_log_debug0(NGX_LOG_DEBUG_WASM, sock->log, 0,
"wasm tcp socket resuming");

switch (sock->env->subsys->kind) {
#if (NGX_WASM_HTTP)
case NGX_WASM_SUBSYS_HTTP:
rctx = sock->env->ctx.rctx;
rc = sock->resume_handler(sock); /* handle sock event */

dd("sock->resume rc: %ld", rc);

switch (rc) {
case NGX_AGAIN:
ngx_wasm_yield(&rctx->env);
break;
case NGX_ERROR:
{
rc = sock->resume_handler(sock);
#if 0
/* make HTTP dispatch calls failures produce HTTP 500 */
if (rc == NGX_ERROR) {
ngx_wasm_error(&rctx->env);
break;
default:
ngx_wa_assert(rc == NGX_OK);
ngx_wasm_continue(&rctx->env);
break;
}
#endif

ngx_http_wasm_resume(rctx, 1, 1); /* continue request */
break;
}
#endif
default:
ngx_wasm_bad_subsystem(sock->env);
break;
}

dd("sock->resume_handler rc: %ld", rc);

return rc;
}


Expand Down Expand Up @@ -521,7 +513,15 @@ ngx_wasm_socket_resolve_handler(ngx_resolver_ctx_t *ctx)

ngx_resolve_name_done(ctx);

ngx_wasm_socket_tcp_resume(sock);
(void) ngx_wasm_socket_tcp_resume(sock);

#if (NGX_WASM_LUA)
if (ctx->state != NGX_WASM_LUA_RESOLVE_ERR) {
ngx_wasm_resume(sock->env); /* continue request */
}
#endif

dd("exit");
}


Expand Down Expand Up @@ -652,8 +652,8 @@ ngx_wasm_socket_tcp_ssl_handshake(ngx_wasm_socket_tcp_t *sock)
static void
ngx_wasm_socket_tcp_ssl_handshake_handler(ngx_connection_t *c)
{
ngx_wasm_socket_tcp_t *sock;
ngx_int_t rc;
ngx_wasm_socket_tcp_t *sock;

sock = c->data;

Expand All @@ -671,7 +671,12 @@ ngx_wasm_socket_tcp_ssl_handshake_handler(ngx_connection_t *c)

resume:

ngx_wasm_socket_tcp_resume(sock);
rc = ngx_wasm_socket_tcp_resume(sock);
if (rc != NGX_AGAIN) {
ngx_wasm_resume(sock->env); /* continue request */
}

dd("exit");
}


Expand Down Expand Up @@ -1355,8 +1360,10 @@ ngx_wasm_socket_tcp_finalize_write(ngx_wasm_socket_tcp_t *sock)
static void
ngx_wasm_socket_tcp_handler(ngx_event_t *ev)
{
ngx_int_t rc;
ngx_connection_t *c = ev->data;
ngx_wasm_socket_tcp_t *sock = c->data;
ngx_wasm_subsys_env_t *env = sock->env;

ngx_log_debug1(NGX_LOG_DEBUG_WASM, ev->log, 0,
"wasm tcp socket handler (wev: %d)",
Expand All @@ -1375,7 +1382,10 @@ ngx_wasm_socket_tcp_handler(ngx_event_t *ev)
sock->read_event_handler(sock);
}

ngx_wasm_socket_tcp_resume(sock);
rc = ngx_wasm_socket_tcp_resume(sock);
if (rc != NGX_AGAIN) {
ngx_wasm_resume(env); /* continue request */
}

dd("exit");
}
Expand Down
15 changes: 15 additions & 0 deletions src/common/ngx_wasm_subsystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,18 @@ ngx_wasm_set_resume_handler(ngx_wasm_subsys_env_t *env)
}
#endif
}


ngx_inline void
ngx_wasm_resume(ngx_wasm_subsys_env_t *env)
{
#if (NGX_WASM_HTTP)
ngx_http_wasm_req_ctx_t *rctx;

if (env->subsys->kind == NGX_WASM_SUBSYS_HTTP) {
rctx = env->ctx.rctx;

ngx_http_wasm_resume(rctx);
}
#endif
}
1 change: 1 addition & 0 deletions src/common/ngx_wasm_subsystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
ngx_wasm_phase_t *ngx_wasm_phase_lookup(ngx_wasm_subsystem_t *subsys,
ngx_uint_t phaseidx);
void ngx_wasm_set_resume_handler(ngx_wasm_subsys_env_t *env);
void ngx_wasm_resume(ngx_wasm_subsys_env_t *env);


#endif /* _NGX_WASM_SUBSYSTEM_H_INCLUDED_ */
6 changes: 1 addition & 5 deletions src/common/proxy_wasm/ngx_proxy_wasm_host.c
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ static ngx_int_t
ngx_proxy_wasm_hfuncs_send_local_response(ngx_wavm_instance_t *instance,
wasm_val_t args[], wasm_val_t rets[])
{
int32_t status, reason_len, body_len, cl;
int32_t status, reason_len, body_len;
#if (NGX_DEBUG)
int32_t grpc_status;
#endif
Expand Down Expand Up @@ -1067,7 +1067,6 @@ ngx_proxy_wasm_hfuncs_send_local_response(ngx_wavm_instance_t *instance,

if (rctx->entered_header_filter && !rctx->entered_body_filter) {
r = rctx->r;
cl = body_len;
s.data = body;
s.len = body_len;

Expand All @@ -1078,8 +1077,6 @@ ngx_proxy_wasm_hfuncs_send_local_response(ngx_wavm_instance_t *instance,

if (body_len) {
/* append linefeed */
cl++;

rc = ngx_wasm_chain_append(r->connection->pool, &rctx->resp_chunk,
body_len, &lf, &rctx->free_bufs,
rctx->env.buf_tag, 0);
Expand All @@ -1089,7 +1086,6 @@ ngx_proxy_wasm_hfuncs_send_local_response(ngx_wavm_instance_t *instance,
}

ngx_http_wasm_set_resp_status(rctx, status, reason, reason_len);
ngx_http_wasm_set_resp_content_length(r, cl);

rctx->resp_chunk_override = 1;

Expand Down
3 changes: 1 addition & 2 deletions src/http/ngx_http_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ ngx_int_t ngx_http_wasm_prepend_resp_body(ngx_http_wasm_req_ctx_t *rctx,

/* resume handler */
void ngx_http_wasm_set_resume_handler(ngx_http_wasm_req_ctx_t *rctx);
void ngx_http_wasm_resume(ngx_http_wasm_req_ctx_t *rctx, unsigned main,
unsigned wev);
void ngx_http_wasm_resume(ngx_http_wasm_req_ctx_t *rctx);


/* externs */
Expand Down
10 changes: 9 additions & 1 deletion src/http/ngx_http_wasm_filter_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ ngx_http_wasm_header_filter_handler(ngx_http_request_t *r)
rc = ngx_http_next_header_filter(r);
goto done;

} else if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
} else if (rc == NGX_ERROR) {
if (rc == NGX_ERROR) {
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
goto done;
Expand All @@ -110,6 +110,8 @@ ngx_http_wasm_header_filter_handler(ngx_http_request_t *r)
if (rctx->resp_content_chosen) {
goto done;
}

ngx_wa_assert(0);
}
#if (NGX_DEBUG)
else if (rc == NGX_AGAIN) {
Expand All @@ -121,6 +123,12 @@ ngx_http_wasm_header_filter_handler(ngx_http_request_t *r)
"wasm \"header_filter\" ops resume rc: %d "
"(resp_chunk_override: %d)", rc,
rctx->resp_chunk_override);

} else {
/* ignore */
ngx_wa_assert(rc == NGX_OK
|| rc == NGX_DECLINED
|| rc >= NGX_HTTP_SPECIAL_RESPONSE);
}
#endif

Expand Down
30 changes: 10 additions & 20 deletions src/http/ngx_http_wasm_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ ngx_http_wasm_rewrite_handler(ngx_http_request_t *r)
break;
default:
ngx_wa_assert(rc == NGX_ERROR);
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
goto done;
}
}
Expand Down Expand Up @@ -985,6 +986,10 @@ ngx_http_wasm_content(ngx_http_wasm_req_ctx_t *rctx)
}

rc = ngx_wasm_ops_resume(&rctx->opctx, NGX_HTTP_CONTENT_PHASE);
if (rctx->env.state == NGX_WASM_STATE_ERROR) {
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
}

dd("content ops resume rc: %ld", rc);
rc = ngx_http_wasm_check_finalize(rctx, rc);
if (rc == NGX_ERROR
Expand Down Expand Up @@ -1022,8 +1027,6 @@ ngx_http_wasm_content(ngx_http_wasm_req_ctx_t *rctx)
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"wasm running orig \"content\" handler");

rctx->resp_content_chosen = 1;

rc = rctx->r_content_handler(r);

} else if (r->header_sent || rctx->resp_content_sent) {
Expand Down Expand Up @@ -1197,7 +1200,7 @@ ngx_http_wasm_wev_handler(ngx_http_request_t *r)
if (rc == NGX_OK || rc == NGX_DONE) {
if (r == r->main) {
r->write_event_handler = ngx_http_core_run_phases;
ngx_http_wasm_resume(rctx, r == r->main, 1);
ngx_http_wasm_resume(rctx);
return;
}

Expand Down Expand Up @@ -1250,34 +1253,21 @@ ngx_http_wasm_set_resume_handler(ngx_http_wasm_req_ctx_t *rctx)


void
ngx_http_wasm_resume(ngx_http_wasm_req_ctx_t *rctx, unsigned main, unsigned wev)
ngx_http_wasm_resume(ngx_http_wasm_req_ctx_t *rctx)
{
ngx_http_request_t *r = rctx->r;
ngx_connection_t *c = r->connection;

dd("enter");

ngx_wa_assert(wev);

if (ngx_wasm_yielding(&rctx->env)) {
dd("yielding");
return;
}

if (main) {
if (wev) {
dd("resuming request wev...");
r->write_event_handler(r);
dd("...done resuming request wev");
}
#if 0
else {
dd("resuming request rev...");
r->read_event_handler(r);
dd("...done resuming request");
}
#endif
}
dd("resuming request wev...");
r->write_event_handler(r);
dd("...done resuming request wev");

dd("running posted requests...");
ngx_http_run_posted_requests(c);
Expand Down
3 changes: 3 additions & 0 deletions src/http/proxy_wasm/ngx_http_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ ngx_http_proxy_wasm_on_request_body_handler(ngx_http_request_t *r)
NGX_PROXY_WASM_STEP_REQ_BODY);
if (rc == NGX_AGAIN) {
ngx_wasm_yield(&rctx->env);

} else if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
ngx_wasm_error(&rctx->env);
}
}

Expand Down
Loading

0 comments on commit 7d00f19

Please sign in to comment.