Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proxy-wasm) foreign function support #626

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions config
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ NGX_WASMX_DEPS="\
$ngx_addon_dir/src/common/metrics/ngx_wa_metrics.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_maps.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_properties.h"
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_properties.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_foreign_call.h"

NGX_WASMX_SRCS="\
$ngx_addon_dir/src/ngx_wasmx.c \
Expand All @@ -160,8 +161,9 @@ NGX_WASMX_SRCS="\
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_host.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_maps.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_util.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_properties.c \
casimiro marked this conversation as resolved.
Show resolved Hide resolved
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_util.c"
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_foreign_call.c"

# wasm

Expand Down
67 changes: 45 additions & 22 deletions src/common/proxy_wasm/ngx_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <ngx_proxy_wasm.h>
#include <ngx_proxy_wasm_properties.h>
#include <ngx_proxy_wasm_foreign_call.h>
#ifdef NGX_WASM_HTTP
#include <ngx_http_proxy_wasm.h>
#endif
Expand Down Expand Up @@ -440,8 +441,8 @@ ngx_proxy_wasm_ctx_destroy(ngx_proxy_wasm_ctx_t *pwctx)
ngx_pfree(pwctx->pool, pwctx->root_id.data);
}

if (pwctx->call_status.data) {
ngx_pfree(pwctx->pool, pwctx->call_status.data);
if (pwctx->dispatch_call_status.data) {
ngx_pfree(pwctx->pool, pwctx->dispatch_call_status.data);
}

if (pwctx->response_status.data) {
Expand Down Expand Up @@ -839,6 +840,9 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
case NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE:
rc = filter->subsystem->resume(pwexec, step, &action);
break;
case NGX_PROXY_WASM_STEP_FOREIGN_CALLBACK:
rc = filter->subsystem->resume(pwexec, step, &action);
break;
case NGX_PROXY_WASM_STEP_TICK:
pwexec->in_tick = 1;
rc = ngx_proxy_wasm_on_tick(pwexec);
Expand Down Expand Up @@ -889,13 +893,13 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,


ngx_uint_t
ngx_proxy_wasm_dispatch_calls_total(ngx_proxy_wasm_exec_t *pwexec)
ngx_proxy_wasm_dispatch_ops_total(ngx_proxy_wasm_exec_t *pwexec)
{
ngx_queue_t *q;
ngx_uint_t n = 0;

for (q = ngx_queue_head(&pwexec->calls);
q != ngx_queue_sentinel(&pwexec->calls);
for (q = ngx_queue_head(&pwexec->dispatch_ops);
q != ngx_queue_sentinel(&pwexec->dispatch_ops);
q = ngx_queue_next(q), n++) { /* void */ }

dd("n: %ld", n);
Expand All @@ -905,25 +909,44 @@ ngx_proxy_wasm_dispatch_calls_total(ngx_proxy_wasm_exec_t *pwexec)


void
ngx_proxy_wasm_dispatch_calls_cancel(ngx_proxy_wasm_exec_t *pwexec)
ngx_proxy_wasm_dispatch_ops_cancel(ngx_proxy_wasm_exec_t *pwexec)
{
#ifdef NGX_WASM_HTTP
ngx_queue_t *q;
ngx_http_proxy_wasm_dispatch_t *call;
ngx_queue_t *q;
ngx_proxy_wasm_dispatch_op_t *dop;

while (!ngx_queue_empty(&pwexec->calls)) {
q = ngx_queue_head(&pwexec->calls);
call = ngx_queue_data(q, ngx_http_proxy_wasm_dispatch_t, q);
while (!ngx_queue_empty(&pwexec->dispatch_ops)) {
q = ngx_queue_head(&pwexec->dispatch_ops);
dop = ngx_queue_data(q, ngx_proxy_wasm_dispatch_op_t, q);

ngx_log_debug1(NGX_LOG_DEBUG_ALL, pwexec->log, 0,
"proxy_wasm http dispatch cancelled (dispatch: %p)",
call);
#if (NGX_DEBUG)
/* though valid, clang complains if prev/next pointers aren't checked */

ngx_queue_remove(&call->q);
if (!dop->q.next || !dop->q.prev) {
return;
}
#endif

ngx_http_proxy_wasm_dispatch_destroy(call);
}
ngx_queue_remove(&dop->q);

switch (dop->type) {
#ifdef NGX_WASM_HTTP
case NGX_PROXY_WASM_DISPATCH_HTTP_CALL:
ngx_log_debug1(NGX_LOG_DEBUG_ALL, pwexec->log, 0,
"proxy_wasm http dispatch cancelled (dispatch: %p)",
dop->call);

ngx_http_proxy_wasm_dispatch_destroy(dop->call.http);
break;
#endif
default: /* NGX_PROXY_WASM_DISPATCH_FOREIGN_CALL */
ngx_log_debug1(NGX_LOG_DEBUG_ALL, pwexec->log, 0,
"proxy_wasm foreign function callback cancelled "
"(callback: %p)", dop->call);

ngx_proxy_wasm_foreign_call_destroy(dop->call.foreign);
break;
}
}
}


Expand Down Expand Up @@ -1145,7 +1168,7 @@ ngx_proxy_wasm_create_context(ngx_proxy_wasm_filter_t *filter,
rexec->filter = filter;
rexec->ictx = ictx;

ngx_queue_init(&rexec->calls);
ngx_queue_init(&rexec->dispatch_ops);

log = filter->log;

Expand Down Expand Up @@ -1262,7 +1285,7 @@ ngx_proxy_wasm_create_context(ngx_proxy_wasm_filter_t *filter,
pwexec->ictx = ictx;
pwexec->store = ictx->store;

ngx_queue_init(&pwexec->calls);
ngx_queue_init(&pwexec->dispatch_ops);

} else {
if (in->ictx != ictx) {
Expand Down Expand Up @@ -1389,11 +1412,11 @@ ngx_proxy_wasm_on_done(ngx_proxy_wasm_exec_t *pwexec)

#if 0
#ifdef NGX_WASM_HTTP
call = pwexec->call;
call = pwexec->dispatch_call;
if (call) {
ngx_http_proxy_wasm_dispatch_destroy(call);

pwexec->call = NULL;
pwexec->dispatch_call = NULL;
}
#endif
#endif
Expand Down
109 changes: 69 additions & 40 deletions src/common/proxy_wasm/ngx_proxy_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ typedef enum {
NGX_PROXY_WASM_STEP_DONE,
NGX_PROXY_WASM_STEP_TICK,
NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE,
NGX_PROXY_WASM_STEP_FOREIGN_CALLBACK,
} ngx_proxy_wasm_step_e;


Expand Down Expand Up @@ -124,7 +125,7 @@ typedef enum {
NGX_PROXY_WASM_BUFFER_GRPC_RECEIVE_BUFFER = 5,
NGX_PROXY_WASM_BUFFER_VM_CONFIGURATION = 6,
NGX_PROXY_WASM_BUFFER_PLUGIN_CONFIGURATION = 7,
NGX_PROXY_WASM_BUFFER_CALL_DATA = 8,
NGX_PROXY_WASM_BUFFER_FOREIGN_FUNCTION_ARGUMENTS = 8,
thibaultcha marked this conversation as resolved.
Show resolved Hide resolved
} ngx_proxy_wasm_buffer_type_e;


Expand All @@ -147,50 +148,76 @@ typedef enum {
} ngx_proxy_wasm_metric_type_e;


typedef enum {
NGX_PROXY_WASM_FOREIGN_RESOLVE_LUA = 0,
} ngx_proxy_wasm_foreign_function_e;


typedef struct ngx_proxy_wasm_ctx_s ngx_proxy_wasm_ctx_t;
typedef struct ngx_proxy_wasm_filter_s ngx_proxy_wasm_filter_t;
typedef struct ngx_proxy_wasm_exec_s ngx_proxy_wasm_exec_t;
typedef struct ngx_proxy_wasm_instance_s ngx_proxy_wasm_instance_t;
#ifdef NGX_WASM_HTTP
typedef struct ngx_http_proxy_wasm_dispatch_s ngx_http_proxy_wasm_dispatch_t;
#endif
typedef struct ngx_proxy_wasm_foreign_call_s ngx_proxy_wasm_foreign_call_t;
typedef ngx_str_t ngx_proxy_wasm_marshalled_map_t;


typedef struct {
ngx_queue_t busy;
ngx_queue_t free;
ngx_queue_t sweep;
ngx_pool_t *pool;
ngx_queue_t busy;
ngx_queue_t free;
ngx_queue_t sweep;
ngx_pool_t *pool;
} ngx_proxy_wasm_store_t;


typedef struct {
ngx_str_t log_prefix;
ngx_log_t *orig_log;
ngx_proxy_wasm_exec_t *pwexec;
ngx_str_t log_prefix;
ngx_log_t *orig_log;
ngx_proxy_wasm_exec_t *pwexec;
} ngx_proxy_wasm_log_ctx_t;


typedef enum {
NGX_PROXY_WASM_DISPATCH_HTTP_CALL,
NGX_PROXY_WASM_DISPATCH_FOREIGN_CALL,
} ngx_proxy_wasm_dispatch_op_e;


typedef struct {
ngx_queue_t q; /* stored by caller */
ngx_proxy_wasm_dispatch_op_e type;

union {
#ifdef NGX_WASM_HTTP
ngx_http_proxy_wasm_dispatch_t *http;
#endif
ngx_proxy_wasm_foreign_call_t *foreign;
} call;
} ngx_proxy_wasm_dispatch_op_t;


struct ngx_proxy_wasm_exec_s {
ngx_uint_t root_id;
ngx_uint_t id;
ngx_uint_t index;
ngx_uint_t tick_period;
ngx_rbtree_node_t node;
ngx_proxy_wasm_err_e ecode;
ngx_pool_t *pool;
ngx_log_t *log;
ngx_proxy_wasm_log_ctx_t log_ctx;
ngx_proxy_wasm_ctx_t *parent;
ngx_proxy_wasm_filter_t *filter;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_store_t *store;
ngx_event_t *ev;
ngx_uint_t root_id;
ngx_uint_t id;
ngx_uint_t index;
ngx_uint_t tick_period;
ngx_rbtree_node_t node;
ngx_proxy_wasm_err_e ecode;
ngx_pool_t *pool;
ngx_log_t *log;
ngx_proxy_wasm_log_ctx_t log_ctx;
ngx_proxy_wasm_ctx_t *parent;
ngx_proxy_wasm_filter_t *filter;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_store_t *store;
ngx_event_t *ev;
ngx_queue_t dispatch_ops;
ngx_proxy_wasm_foreign_call_t *foreign_call; /* swap pointer for host functions */
#ifdef NGX_WASM_HTTP
ngx_http_proxy_wasm_dispatch_t *call; /* swap pointer for host functions */
ngx_http_proxy_wasm_dispatch_t *dispatch_call; /* swap pointer for host functions */
#endif
ngx_queue_t calls;

/* flags */

Expand Down Expand Up @@ -231,19 +258,19 @@ struct ngx_proxy_wasm_ctx_s {
size_t req_body_len;
ngx_str_t authority;
ngx_str_t scheme;
ngx_str_t path; /* r->uri + r->args */
ngx_str_t start_time; /* r->start_sec + r->start_msec */
ngx_str_t upstream_address; /* 1st part of ngx.upstream_addr */
ngx_str_t upstream_port; /* 2nd part of ngx.upstsream_addr */
ngx_str_t connection_id; /* r->connection->number */
ngx_str_t mtls; /* ngx.https && ngx.ssl_client_verify */
ngx_str_t root_id; /* pwexec->root_id */
ngx_str_t call_status; /* dispatch response status */
ngx_str_t response_status; /* response status */
ngx_str_t path; /* r->uri + r->args */
ngx_str_t start_time; /* r->start_sec + r->start_msec */
ngx_str_t upstream_address; /* 1st part of ngx.upstream_addr */
ngx_str_t upstream_port; /* 2nd part of ngx.upstsream_addr */
ngx_str_t connection_id; /* r->connection->number */
ngx_str_t mtls; /* ngx.https && ngx.ssl_client_verify */
ngx_str_t root_id; /* pwexec->root_id */
ngx_str_t dispatch_call_status; /* dispatch response status */
ngx_str_t response_status; /* response status */
#if (NGX_DEBUG)
ngx_str_t worker_id; /* ngx_worker */
ngx_str_t worker_id; /* ngx_worker */
#endif
ngx_uint_t call_code;
ngx_uint_t dispatch_call_code;
ngx_uint_t response_code;

/* host properties */
Expand All @@ -258,9 +285,9 @@ struct ngx_proxy_wasm_ctx_s {

/* flags */

unsigned main:1; /* r->main */
unsigned init:1; /* can be utilized (has no filters) */
unsigned ready:1; /* filters chain ready */
unsigned main:1; /* r->main */
unsigned init:1; /* can be utilized (has no filters) */
unsigned ready:1; /* filters chain ready */
unsigned req_headers_in_access:1;
};

Expand Down Expand Up @@ -413,8 +440,10 @@ ngx_int_t ngx_proxy_wasm_resume(ngx_proxy_wasm_ctx_t *pwctx,
ngx_wasm_phase_t *phase, ngx_proxy_wasm_step_e step);
ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
ngx_proxy_wasm_step_e step);
ngx_uint_t ngx_proxy_wasm_dispatch_calls_total(ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_dispatch_calls_cancel(ngx_proxy_wasm_exec_t *pwexec);
ngx_uint_t ngx_proxy_wasm_dispatch_ops_total(ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_dispatch_ops_cancel(ngx_proxy_wasm_exec_t *pwexec);
ngx_uint_t ngx_proxy_wasm_foreign_calls_total(ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_foreign_calls_cancel(ngx_proxy_wasm_exec_t *pwexec);


/* host handlers */
Expand Down
Loading
Loading