Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
casimiro committed Dec 10, 2024
1 parent 09fece2 commit f10965e
Show file tree
Hide file tree
Showing 25 changed files with 1,165 additions and 78 deletions.
6 changes: 4 additions & 2 deletions config
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ NGX_WASMX_DEPS="\
$ngx_addon_dir/src/common/metrics/ngx_wa_metrics.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_maps.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_properties.h"
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_properties.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_foreign_call.h"

NGX_WASMX_SRCS="\
$ngx_addon_dir/src/ngx_wasmx.c \
Expand All @@ -160,8 +161,9 @@ NGX_WASMX_SRCS="\
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_host.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_maps.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_util.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_properties.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_util.c"
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_foreign_call.c"

# wasm

Expand Down
59 changes: 41 additions & 18 deletions src/common/proxy_wasm/ngx_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <ngx_proxy_wasm.h>
#include <ngx_proxy_wasm_properties.h>
#include <ngx_proxy_wasm_foreign_call.h>
#ifdef NGX_WASM_HTTP
#include <ngx_http_proxy_wasm.h>
#endif
Expand Down Expand Up @@ -839,6 +840,9 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
case NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE:
rc = filter->subsystem->resume(pwexec, step, &action);
break;
case NGX_PROXY_WASM_STEP_FOREIGN_CALLBACK:
rc = filter->subsystem->resume(pwexec, step, &action);
break;
case NGX_PROXY_WASM_STEP_TICK:
pwexec->in_tick = 1;
rc = ngx_proxy_wasm_on_tick(pwexec);
Expand Down Expand Up @@ -889,13 +893,13 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,


ngx_uint_t
ngx_proxy_wasm_dispatch_calls_total(ngx_proxy_wasm_exec_t *pwexec)
ngx_proxy_wasm_dispatch_ops_total(ngx_proxy_wasm_exec_t *pwexec)
{
ngx_queue_t *q;
ngx_uint_t n = 0;

for (q = ngx_queue_head(&pwexec->dispatch_calls);
q != ngx_queue_sentinel(&pwexec->dispatch_calls);
for (q = ngx_queue_head(&pwexec->dispatch_ops);
q != ngx_queue_sentinel(&pwexec->dispatch_ops);
q = ngx_queue_next(q), n++) { /* void */ }

dd("n: %ld", n);
Expand All @@ -905,25 +909,44 @@ ngx_proxy_wasm_dispatch_calls_total(ngx_proxy_wasm_exec_t *pwexec)


void
ngx_proxy_wasm_dispatch_calls_cancel(ngx_proxy_wasm_exec_t *pwexec)
ngx_proxy_wasm_dispatch_ops_cancel(ngx_proxy_wasm_exec_t *pwexec)
{
#ifdef NGX_WASM_HTTP
ngx_queue_t *q;
ngx_http_proxy_wasm_dispatch_t *call;
ngx_queue_t *q;
ngx_proxy_wasm_dispatch_op_t *dop;

while (!ngx_queue_empty(&pwexec->dispatch_calls)) {
q = ngx_queue_head(&pwexec->dispatch_calls);
call = ngx_queue_data(q, ngx_http_proxy_wasm_dispatch_t, q);
while (!ngx_queue_empty(&pwexec->dispatch_ops)) {
q = ngx_queue_head(&pwexec->dispatch_ops);
dop = ngx_queue_data(q, ngx_proxy_wasm_dispatch_op_t, q);

ngx_log_debug1(NGX_LOG_DEBUG_ALL, pwexec->log, 0,
"proxy_wasm http dispatch cancelled (dispatch: %p)",
call);
#if (NGX_DEBUG)
/* though valid, clang complains if prev/next pointers aren't checked */

ngx_queue_remove(&call->q);
if (!dop->q.next || !dop->q.prev) {
return;
}
#endif

ngx_http_proxy_wasm_dispatch_destroy(call);
}
ngx_queue_remove(&dop->q);

switch (dop->type) {
#ifdef NGX_WASM_HTTP
case NGX_PROXY_WASM_DISPATCH_HTTP_CALL:
ngx_log_debug1(NGX_LOG_DEBUG_ALL, pwexec->log, 0,
"proxy_wasm http dispatch cancelled (dispatch: %p)",
dop->call);

ngx_http_proxy_wasm_dispatch_destroy(dop->call.http);
break;
#endif
default: /* NGX_PROXY_WASM_DISPATCH_FOREIGN_CALL */
ngx_log_debug1(NGX_LOG_DEBUG_ALL, pwexec->log, 0,
"proxy_wasm foreign function callback cancelled "
"(callback: %p)", dop->call);

ngx_proxy_wasm_foreign_call_destroy(dop->call.foreign);
break;
}
}
}


Expand Down Expand Up @@ -1145,7 +1168,7 @@ ngx_proxy_wasm_create_context(ngx_proxy_wasm_filter_t *filter,
rexec->filter = filter;
rexec->ictx = ictx;

ngx_queue_init(&rexec->dispatch_calls);
ngx_queue_init(&rexec->dispatch_ops);

log = filter->log;

Expand Down Expand Up @@ -1262,7 +1285,7 @@ ngx_proxy_wasm_create_context(ngx_proxy_wasm_filter_t *filter,
pwexec->ictx = ictx;
pwexec->store = ictx->store;

ngx_queue_init(&pwexec->dispatch_calls);
ngx_queue_init(&pwexec->dispatch_ops);

} else {
if (in->ictx != ictx) {
Expand Down
81 changes: 55 additions & 26 deletions src/common/proxy_wasm/ngx_proxy_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ typedef enum {
NGX_PROXY_WASM_STEP_DONE,
NGX_PROXY_WASM_STEP_TICK,
NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE,
NGX_PROXY_WASM_STEP_FOREIGN_CALLBACK,
} ngx_proxy_wasm_step_e;


Expand Down Expand Up @@ -124,7 +125,7 @@ typedef enum {
NGX_PROXY_WASM_BUFFER_GRPC_RECEIVE_BUFFER = 5,
NGX_PROXY_WASM_BUFFER_VM_CONFIGURATION = 6,
NGX_PROXY_WASM_BUFFER_PLUGIN_CONFIGURATION = 7,
NGX_PROXY_WASM_BUFFER_CALL_DATA = 8,
NGX_PROXY_WASM_BUFFER_FOREIGN_FUNCTION_ARGUMENTS = 8,
} ngx_proxy_wasm_buffer_type_e;


Expand All @@ -147,50 +148,76 @@ typedef enum {
} ngx_proxy_wasm_metric_type_e;


typedef enum {
NGX_PROXY_WASM_FOREIGN_RESOLVE_LUA = 0,
} ngx_proxy_wasm_foreign_function_e;


typedef struct ngx_proxy_wasm_ctx_s ngx_proxy_wasm_ctx_t;
typedef struct ngx_proxy_wasm_filter_s ngx_proxy_wasm_filter_t;
typedef struct ngx_proxy_wasm_exec_s ngx_proxy_wasm_exec_t;
typedef struct ngx_proxy_wasm_instance_s ngx_proxy_wasm_instance_t;
#ifdef NGX_WASM_HTTP
typedef struct ngx_http_proxy_wasm_dispatch_s ngx_http_proxy_wasm_dispatch_t;
#endif
typedef struct ngx_proxy_wasm_foreign_call_s ngx_proxy_wasm_foreign_call_t;
typedef ngx_str_t ngx_proxy_wasm_marshalled_map_t;


typedef struct {
ngx_queue_t busy;
ngx_queue_t free;
ngx_queue_t sweep;
ngx_pool_t *pool;
ngx_queue_t busy;
ngx_queue_t free;
ngx_queue_t sweep;
ngx_pool_t *pool;
} ngx_proxy_wasm_store_t;


typedef struct {
ngx_str_t log_prefix;
ngx_log_t *orig_log;
ngx_proxy_wasm_exec_t *pwexec;
ngx_str_t log_prefix;
ngx_log_t *orig_log;
ngx_proxy_wasm_exec_t *pwexec;
} ngx_proxy_wasm_log_ctx_t;


typedef enum {
NGX_PROXY_WASM_DISPATCH_HTTP_CALL,
NGX_PROXY_WASM_DISPATCH_FOREIGN_CALL,
} ngx_proxy_wasm_dispatch_op_e;


typedef struct {
ngx_queue_t q; /* stored by caller */
ngx_proxy_wasm_dispatch_op_e type;

union {
#ifdef NGX_WASM_HTTP
ngx_http_proxy_wasm_dispatch_t *http;
#endif
ngx_proxy_wasm_foreign_call_t *foreign;
} call;
} ngx_proxy_wasm_dispatch_op_t;


struct ngx_proxy_wasm_exec_s {
ngx_uint_t root_id;
ngx_uint_t id;
ngx_uint_t index;
ngx_uint_t tick_period;
ngx_rbtree_node_t node;
ngx_proxy_wasm_err_e ecode;
ngx_pool_t *pool;
ngx_log_t *log;
ngx_proxy_wasm_log_ctx_t log_ctx;
ngx_proxy_wasm_ctx_t *parent;
ngx_proxy_wasm_filter_t *filter;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_store_t *store;
ngx_event_t *ev;
ngx_uint_t root_id;
ngx_uint_t id;
ngx_uint_t index;
ngx_uint_t tick_period;
ngx_rbtree_node_t node;
ngx_proxy_wasm_err_e ecode;
ngx_pool_t *pool;
ngx_log_t *log;
ngx_proxy_wasm_log_ctx_t log_ctx;
ngx_proxy_wasm_ctx_t *parent;
ngx_proxy_wasm_filter_t *filter;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_store_t *store;
ngx_event_t *ev;
ngx_queue_t dispatch_ops;
ngx_proxy_wasm_foreign_call_t *foreign_call; /* swap pointer for host functions */
#ifdef NGX_WASM_HTTP
ngx_http_proxy_wasm_dispatch_t *dispatch_call; /* swap pointer for host functions */
ngx_http_proxy_wasm_dispatch_t *dispatch_call; /* swap pointer for host functions */
#endif
ngx_queue_t dispatch_calls;

/* flags */

Expand Down Expand Up @@ -413,8 +440,10 @@ ngx_int_t ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx,
ngx_wasm_phase_t *phase, ngx_proxy_wasm_step_e step);
ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
ngx_proxy_wasm_step_e step);
ngx_uint_t ngx_proxy_wasm_dispatch_calls_total(ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_dispatch_calls_cancel(ngx_proxy_wasm_exec_t *pwexec);
ngx_uint_t ngx_proxy_wasm_dispatch_ops_total(ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_dispatch_ops_cancel(ngx_proxy_wasm_exec_t *pwexec);
ngx_uint_t ngx_proxy_wasm_foreign_calls_total(ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_foreign_calls_cancel(ngx_proxy_wasm_exec_t *pwexec);


/* host handlers */
Expand Down
Loading

0 comments on commit f10965e

Please sign in to comment.