Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
casimiro committed Nov 19, 2024
1 parent d42dee1 commit d4b4267
Show file tree
Hide file tree
Showing 16 changed files with 674 additions and 35 deletions.
2 changes: 2 additions & 0 deletions config
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ NGX_WASMX_DEPS="\
$ngx_addon_dir/src/common/shm/ngx_wa_shm_queue.h \
$ngx_addon_dir/src/common/metrics/ngx_wa_metrics.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_foreign_callback.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_maps.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_properties.h"

Expand All @@ -158,6 +159,7 @@ NGX_WASMX_SRCS="\
$ngx_addon_dir/src/common/metrics/ngx_wa_metrics.c \
$ngx_addon_dir/src/common/metrics/ngx_wa_histogram.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_foreign_callback.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_host.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_maps.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_properties.c \
Expand Down
44 changes: 44 additions & 0 deletions src/common/proxy_wasm/ngx_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <ngx_proxy_wasm.h>
#include <ngx_proxy_wasm_properties.h>
#include <ngx_proxy_wasm_foreign_callback.h>
#ifdef NGX_WASM_HTTP
#include <ngx_http_proxy_wasm.h>
#endif
Expand Down Expand Up @@ -839,6 +840,9 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
case NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE:
rc = filter->subsystem->resume(pwexec, step, &action);
break;
case NGX_PROXY_WASM_STEP_FOREIGN_CALLBACK:
rc = filter->subsystem->resume(pwexec, step, &action);
break;
case NGX_PROXY_WASM_STEP_TICK:
pwexec->in_tick = 1;
rc = ngx_proxy_wasm_on_tick(pwexec);
Expand Down Expand Up @@ -927,6 +931,45 @@ ngx_proxy_wasm_dispatch_calls_cancel(ngx_proxy_wasm_exec_t *pwexec)
}


ngx_uint_t
ngx_proxy_wasm_foreign_callbacks_total(ngx_proxy_wasm_exec_t *pwexec)
{
ngx_queue_t *q;
ngx_uint_t n = 0;

for (q = ngx_queue_head(&pwexec->fcallbacks);
q != ngx_queue_sentinel(&pwexec->fcallbacks);
q = ngx_queue_next(q), n++) { /* void */ }

dd("n: %ld", n);

return n;
}


void
ngx_proxy_wasm_foreign_callbacks_cancel(ngx_proxy_wasm_exec_t *pwexec)
{
#ifdef NGX_WASM_HTTP
ngx_queue_t *q;
ngx_proxy_wasm_foreign_cb_t *cb;

while (!ngx_queue_empty(&pwexec->fcallbacks)) {
q = ngx_queue_head(&pwexec->fcallbacks);
cb = ngx_queue_data(q, ngx_proxy_wasm_foreign_cb_t, q);

ngx_log_debug1(NGX_LOG_DEBUG_ALL, pwexec->log, 0,
"proxy_wasm foreign function callback cancelled"
" (callback: %p)", cb);

ngx_queue_remove(&cb->q);

ngx_proxy_wasm_foreign_callback_destroy(cb);
}
#endif
}


/* host handlers */


Expand Down Expand Up @@ -1263,6 +1306,7 @@ ngx_proxy_wasm_create_context(ngx_proxy_wasm_filter_t *filter,
pwexec->store = ictx->store;

ngx_queue_init(&pwexec->calls);
ngx_queue_init(&pwexec->fcallbacks);

} else {
if (in->ictx != ictx) {
Expand Down
60 changes: 36 additions & 24 deletions src/common/proxy_wasm/ngx_proxy_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ typedef enum {
NGX_PROXY_WASM_STEP_DONE,
NGX_PROXY_WASM_STEP_TICK,
NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE,
NGX_PROXY_WASM_STEP_FOREIGN_CALLBACK,
} ngx_proxy_wasm_step_e;


Expand Down Expand Up @@ -124,7 +125,7 @@ typedef enum {
NGX_PROXY_WASM_BUFFER_GRPC_RECEIVE_BUFFER = 5,
NGX_PROXY_WASM_BUFFER_VM_CONFIGURATION = 6,
NGX_PROXY_WASM_BUFFER_PLUGIN_CONFIGURATION = 7,
NGX_PROXY_WASM_BUFFER_CALL_DATA = 8,
NGX_PROXY_WASM_BUFFER_FOREIGN_FUNCTION_ARGUMENTS = 8,
} ngx_proxy_wasm_buffer_type_e;


Expand All @@ -147,50 +148,58 @@ typedef enum {
} ngx_proxy_wasm_metric_type_e;


typedef enum {
NGX_PROXY_WASM_FOREIGN_RESOLVE = 0,
} ngx_proxy_wasm_foreign_function_e;


typedef struct ngx_proxy_wasm_ctx_s ngx_proxy_wasm_ctx_t;
typedef struct ngx_proxy_wasm_filter_s ngx_proxy_wasm_filter_t;
typedef struct ngx_proxy_wasm_exec_s ngx_proxy_wasm_exec_t;
typedef struct ngx_proxy_wasm_instance_s ngx_proxy_wasm_instance_t;
#ifdef NGX_WASM_HTTP
typedef struct ngx_http_proxy_wasm_dispatch_s ngx_http_proxy_wasm_dispatch_t;
#endif
typedef struct ngx_proxy_wasm_foreign_cb_s ngx_proxy_wasm_foreign_cb_t;
typedef ngx_str_t ngx_proxy_wasm_marshalled_map_t;


typedef struct {
ngx_queue_t busy;
ngx_queue_t free;
ngx_queue_t sweep;
ngx_pool_t *pool;
ngx_queue_t busy;
ngx_queue_t free;
ngx_queue_t sweep;
ngx_pool_t *pool;
} ngx_proxy_wasm_store_t;


typedef struct {
ngx_str_t log_prefix;
ngx_log_t *orig_log;
ngx_proxy_wasm_exec_t *pwexec;
ngx_str_t log_prefix;
ngx_log_t *orig_log;
ngx_proxy_wasm_exec_t *pwexec;
} ngx_proxy_wasm_log_ctx_t;


struct ngx_proxy_wasm_exec_s {
ngx_uint_t root_id;
ngx_uint_t id;
ngx_uint_t index;
ngx_uint_t tick_period;
ngx_rbtree_node_t node;
ngx_proxy_wasm_err_e ecode;
ngx_pool_t *pool;
ngx_log_t *log;
ngx_proxy_wasm_log_ctx_t log_ctx;
ngx_proxy_wasm_ctx_t *parent;
ngx_proxy_wasm_filter_t *filter;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_store_t *store;
ngx_event_t *ev;
ngx_uint_t root_id;
ngx_uint_t id;
ngx_uint_t index;
ngx_uint_t tick_period;
ngx_rbtree_node_t node;
ngx_proxy_wasm_err_e ecode;
ngx_pool_t *pool;
ngx_log_t *log;
ngx_proxy_wasm_log_ctx_t log_ctx;
ngx_proxy_wasm_ctx_t *parent;
ngx_proxy_wasm_filter_t *filter;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_store_t *store;
ngx_event_t *ev;
#ifdef NGX_WASM_HTTP
ngx_http_proxy_wasm_dispatch_t *call; /* swap pointer for host functions */
ngx_http_proxy_wasm_dispatch_t *call; /* swap pointer for host functions */
#endif
ngx_queue_t calls;
ngx_queue_t calls;
ngx_proxy_wasm_foreign_cb_t *fcallback;
ngx_queue_t fcallbacks;

/* flags */

Expand Down Expand Up @@ -415,6 +424,9 @@ ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
ngx_proxy_wasm_step_e step);
ngx_uint_t ngx_proxy_wasm_dispatch_calls_total(ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_dispatch_calls_cancel(ngx_proxy_wasm_exec_t *pwexec);
ngx_uint_t ngx_proxy_wasm_foreign_callbacks_total(
ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_foreign_callbacks_cancel(ngx_proxy_wasm_exec_t *pwexec);


/* host handlers */
Expand Down
131 changes: 131 additions & 0 deletions src/common/proxy_wasm/ngx_proxy_wasm_foreign_callback.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#ifndef DDEBUG
#define DDEBUG 0
#endif
#include "ddebug.h"

#include <ngx_proxy_wasm_foreign_callback.h>


void
ngx_proxy_wasm_foreign_callback_destroy(ngx_proxy_wasm_foreign_cb_t *cb)
{

ngx_pfree(cb->pwexec->pool, cb);
}


ngx_proxy_wasm_foreign_cb_t *
ngx_proxy_wasm_foreign_callback_alloc(ngx_proxy_wasm_exec_t *pwexec)
{
ngx_proxy_wasm_foreign_cb_t *cb;

cb = ngx_palloc(pwexec->pool, sizeof(ngx_proxy_wasm_foreign_cb_t));
cb->pwexec = pwexec;

return cb;
}


void
ngx_proxy_wasm_foreign_callback(ngx_proxy_wasm_foreign_cb_t *cb)
{
ngx_proxy_wasm_exec_t *pwexec = cb->pwexec;
ngx_proxy_wasm_err_e ecode = NGX_PROXY_WASM_ERR_NONE;
ngx_proxy_wasm_step_e step = pwexec->parent->step;

ngx_queue_remove(&cb->q);
pwexec->fcallback = cb;

#if (NGX_WASM_HTTP)
pwexec->parent->phase = ngx_wasm_phase_lookup(&ngx_http_wasm_subsystem,
NGX_WASM_BACKGROUND_PHASE);
#endif

ecode = ngx_proxy_wasm_run_step(pwexec,
NGX_PROXY_WASM_STEP_FOREIGN_CALLBACK);
if (ecode != NGX_PROXY_WASM_ERR_NONE) {
/* TODO: error handling */
}

pwexec->parent->step = step;
pwexec->fcallback = NULL;

if (ngx_proxy_wasm_foreign_callbacks_total(pwexec)) {
ngx_log_debug0(NGX_LOG_DEBUG_WASM, pwexec->log, 0,
"proxy_wasm more foreign function callbacks pending...");

#if (NGX_WASM_HTTP)
ngx_wasm_yield(&cb->rctx->env);
#endif
ngx_proxy_wasm_ctx_set_next_action(pwexec->parent,
NGX_PROXY_WASM_ACTION_PAUSE);

} else {
ngx_log_debug0(NGX_LOG_DEBUG_WASM, pwexec->log, 0,
"proxy_wasm last foreign function callback handled");

#if (NGX_WASM_HTTP)
ngx_wasm_continue(&cb->rctx->env);
#endif
ngx_proxy_wasm_ctx_set_next_action(pwexec->parent,
NGX_PROXY_WASM_ACTION_CONTINUE);

/* resume current step if unfinished */
ngx_proxy_wasm_resume(pwexec->parent, pwexec->parent->phase, step);
}

ngx_proxy_wasm_foreign_callback_destroy(cb);
}


ngx_int_t
ngx_proxy_wasm_foreign_callback_buffer_create(ngx_proxy_wasm_foreign_cb_t *cb,
size_t size)
{
ngx_buf_t *b;
ngx_chain_t *cl;
ngx_proxy_wasm_exec_t *pwexec = cb->pwexec;

ngx_wa_assert(pwexec);
ngx_wa_assert(size);

cl = ngx_alloc_chain_link(pwexec->pool);
if (cl == NULL) {
return NGX_ERROR;
}

/* TODO: if size exceeds a threshold, split allocation into N buffers */

b = ngx_create_temp_buf(pwexec->pool, size);
if (b == NULL) {
return NGX_ERROR;
}

cl->buf = b;
cl->next = NULL;

cb->args_out = cl;

return NGX_OK;
}


ngx_int_t
ngx_proxy_wasm_foreign_callback_buffer_write(ngx_proxy_wasm_foreign_cb_t *cb,
ngx_str_t *data)
{
size_t b_size;
ngx_buf_t *b = cb->args_out->buf;

b_size = b->end - b->start;

ngx_wa_assert(data->len <= b_size);

if (data->len <= b_size) {
b->last = ngx_cpymem(b->last, data->data, data->len);
}

/* TODO: data->len > b_size */

return NGX_OK;
}
27 changes: 27 additions & 0 deletions src/common/proxy_wasm/ngx_proxy_wasm_foreign_callback.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#ifndef _NGX_PROXY_WASM_FOREIGN_CALLBACK_H_INCLUDED_
#define _NGX_PROXY_WASM_FOREIGN_CALLBACK_H_INCLUDED_


#include <ngx_wasm_subsystem.h>


struct ngx_proxy_wasm_foreign_cb_s {
ngx_queue_t q;
ngx_proxy_wasm_exec_t *pwexec;
#if (NGX_WASM_HTTP)
ngx_http_wasm_req_ctx_t *rctx;
#endif
ngx_proxy_wasm_foreign_function_e fcode;
ngx_chain_t *args_out;
};


ngx_proxy_wasm_foreign_cb_t * ngx_proxy_wasm_foreign_callback_alloc(
ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_foreign_callback(ngx_proxy_wasm_foreign_cb_t *cb);
ngx_int_t ngx_proxy_wasm_foreign_callback_buffer_create(
ngx_proxy_wasm_foreign_cb_t *cb, size_t size);
ngx_int_t ngx_proxy_wasm_foreign_callback_buffer_write(
ngx_proxy_wasm_foreign_cb_t *cb, ngx_str_t *data);
void ngx_proxy_wasm_foreign_callback_destroy(ngx_proxy_wasm_foreign_cb_t *cb);
#endif
Loading

0 comments on commit d4b4267

Please sign in to comment.