Skip to content

Commit

Permalink
feat(shm/ffi) expose kv and metrics write operations to Lua
Browse files Browse the repository at this point in the history
This commit also changes shm-related FFI functions from returning
NGX_DECLINED to returning NGX_ABORT, when the necessary conditions for
their completion are not satisfied.
  • Loading branch information
casimiro committed Aug 14, 2024
1 parent 803dc8e commit 3feee24
Show file tree
Hide file tree
Showing 13 changed files with 893 additions and 257 deletions.
308 changes: 259 additions & 49 deletions lib/resty/wasmx/shm.lua

Large diffs are not rendered by default.

146 changes: 112 additions & 34 deletions src/common/lua/ngx_wasm_lua_ffi.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,27 +260,22 @@ ngx_http_wasm_ffi_set_host_properties_handlers(ngx_http_request_t *r,


ngx_int_t
ngx_wa_ffi_shm_get_zones(ngx_wa_ffi_shm_get_zones_handler_pt handler)
ngx_wa_ffi_shm_setup_zones(ngx_wa_ffi_shm_setup_zones_handler_pt setup_handler)
{
ngx_uint_t i;
ngx_cycle_t *cycle = (ngx_cycle_t *) ngx_cycle;
ngx_array_t *shms = ngx_wasmx_shms(cycle);
ngx_array_t *shms = ngx_wasmx_shms((ngx_cycle_t *) ngx_cycle);
ngx_wa_shm_t *shm;
ngx_wa_shm_mapping_t *mappings;

if (!handler) {
return NGX_ERROR;
}

if (!shms) {
return NGX_DECLINED;
if (setup_handler == NULL || shms == NULL) {
return NGX_ABORT;
}

mappings = shms->elts;

for (i = 0; i < shms->nelts; i++) {
shm = mappings[i].zone->data;
handler(shm);
setup_handler(shm);
}

return NGX_OK;
Expand All @@ -289,78 +284,161 @@ ngx_wa_ffi_shm_get_zones(ngx_wa_ffi_shm_get_zones_handler_pt handler)

static void
shm_kv_retrieve_keys(ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel,
ngx_int_t *total, ngx_int_t limit, ngx_str_t **keys)
ngx_uint_t start, ngx_uint_t limit, ngx_uint_t *total,
ngx_str_t **keys)
{
ngx_wa_shm_kv_node_t *n = (ngx_wa_shm_kv_node_t *) node;

if (!node || node == sentinel || (limit > 0 && *total == limit)) {
if (limit > 0 && (*total - start) == limit) {
return;
}

if (keys) {
keys[*total] = &n->key.str;
if (keys && *total >= start) {
keys[(*total - start)] = &n->key.str;
}

(*total)++;

shm_kv_retrieve_keys(node->right, sentinel, total, limit, keys);
shm_kv_retrieve_keys(node->left, sentinel, total, limit, keys);
if (node->left != sentinel) {
shm_kv_retrieve_keys(node->left, sentinel, start, limit, total, keys);
}

if (node->right != sentinel) {
shm_kv_retrieve_keys(node->right, sentinel, start, limit, total, keys);
}
}


ngx_int_t
ngx_wa_ffi_shm_get_keys(ngx_wa_shm_t *shm, ngx_uint_t n, ngx_str_t **keys)
ngx_wa_ffi_shm_get_keys(ngx_wa_shm_t *shm, ngx_uint_t start, ngx_uint_t max,
ngx_str_t **keys, ngx_uint_t *total)
{
ngx_int_t total = 0;
ngx_wa_shm_kv_t *kv;

if (!shm || shm->type == NGX_WASM_SHM_TYPE_QUEUE) {
return NGX_ERROR;
if (shm == NULL || shm->type == NGX_WASM_SHM_TYPE_QUEUE) {
return NGX_ABORT;
}

kv = shm->data;

if (kv->nelts <= start) {
return NGX_ABORT;
}

ngx_shmtx_lock(&shm->shpool->mutex);

shm_kv_retrieve_keys(kv->rbtree.root, kv->rbtree.sentinel, &total, n, keys);
if (kv->rbtree.root != kv->rbtree.sentinel) {
shm_kv_retrieve_keys(kv->rbtree.root, kv->rbtree.sentinel, start, max,
total, keys);
}

ngx_shmtx_unlock(&shm->shpool->mutex);

return total;
*total = *total - start;

return NGX_OK;
}


ngx_int_t
ngx_wa_ffi_shm_get_kv_value(ngx_wa_shm_t *shm, ngx_str_t *k, ngx_str_t **v,
uint32_t *cas)
{
if (!shm || !k || !v || !cas) {
return NGX_ERROR;
if (shm == NULL || k == NULL || v == NULL || cas == NULL) {
return NGX_ABORT;
}

return ngx_wa_shm_kv_get_locked(shm, k, NULL, v, cas);
}


ngx_int_t
ngx_wa_ffi_shm_get_metric(ngx_str_t *name,
u_char *m_buf, size_t mbs,
u_char *h_buf, size_t hbs)
ngx_wa_ffi_shm_set_kv_value(ngx_wa_shm_t *shm, ngx_str_t *k, ngx_str_t *v,
uint32_t cas, ngx_int_t *written)
{
if (shm == NULL || k == NULL || v == NULL || written == NULL) {
return NGX_ABORT;
}

return ngx_wa_shm_kv_set_locked(shm, k, v, cas, written);
}


ngx_int_t
ngx_wa_ffi_shm_define_metric(ngx_str_t *name, ngx_wa_metric_type_e type)
{
uint32_t metric_id;
ngx_wa_metrics_t *metrics;
ngx_int_t rc;
ngx_str_t prefixed_name;
ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle);

if (name == NULL || metrics == NULL) {
return NGX_ABORT;
}

u_char buf[metrics->config.max_metric_name_length];

prefixed_name.data = buf;
prefixed_name.len = ngx_sprintf(buf, "lua.%V", name) - buf;

rc = ngx_wa_metrics_define(metrics, &prefixed_name, type, &metric_id);
if (rc != NGX_OK) {
return rc;
}

return metric_id;
}


ngx_int_t
ngx_wa_ffi_shm_increment_metric(uint32_t metric_id, ngx_uint_t value)
{
ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle);

if (metrics == NULL) {
return NGX_ABORT;
}

return ngx_wa_metrics_increment(metrics, metric_id, value);
}


ngx_int_t
ngx_wa_ffi_shm_record_metric(uint32_t metric_id, ngx_uint_t value)
{
ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle);

if (metrics == NULL) {
return NGX_ABORT;
}

return ngx_wa_metrics_record(metrics, metric_id, value);
}


ngx_int_t
ngx_wa_ffi_shm_get_metric(uint32_t metric_id, ngx_str_t *name,
u_char *m_buf, size_t mbs, u_char *h_buf, size_t hbs)
{
ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle);
ngx_wa_metric_t *m;

if (!name || !m_buf || !h_buf) {
return NGX_ERROR;
if ((name == NULL && !metric_id)
|| m_buf == NULL || mbs < NGX_WA_METRICS_ONE_SLOT_METRIC_SIZE
|| h_buf == NULL || hbs < NGX_WA_METRICS_MAX_HISTOGRAM_SIZE
|| metrics == NULL)
{
return NGX_ABORT;
}

metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle);
metric_id = ngx_crc32_long(name->data, name->len);
m = (ngx_wa_metric_t *) m_buf;
ngx_wa_metrics_histogram_set_buffer(m, h_buf, hbs);

ngx_wa_metrics_set_histogram_buffer(m, h_buf, hbs);
if (metric_id) {
return ngx_wa_metrics_get(metrics, metric_id, m);
}

return ngx_wa_metrics_get(metrics, metric_id, m);
return ngx_wa_metrics_get(metrics,
ngx_crc32_long(name->data, name->len), m);
}
#endif
21 changes: 15 additions & 6 deletions src/common/lua/ngx_wasm_lua_ffi.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ typedef struct {
} ngx_wasm_ffi_filter_t;


typedef void (*ngx_wa_ffi_shm_get_zones_handler_pt)(ngx_wa_shm_t *shm);
typedef void (*ngx_wa_ffi_shm_setup_zones_handler_pt)(ngx_wa_shm_t *shm);


ngx_int_t ngx_http_wasm_ffi_plan_new(ngx_wavm_t *vm,
Expand All @@ -46,13 +46,22 @@ ngx_int_t ngx_http_wasm_ffi_set_host_properties_handlers(ngx_http_request_t *r,
#endif


ngx_int_t ngx_wa_ffi_shm_get_zones(ngx_wa_ffi_shm_get_zones_handler_pt handler);
ngx_int_t ngx_wa_ffi_shm_get_keys(ngx_wa_shm_t *shm, ngx_uint_t n,
ngx_str_t **keys);
ngx_int_t ngx_wa_ffi_shm_setup_zones(
ngx_wa_ffi_shm_setup_zones_handler_pt handler);
ngx_int_t ngx_wa_ffi_shm_get_keys(ngx_wa_shm_t *shm, ngx_uint_t start,
ngx_uint_t n, ngx_str_t **keys, ngx_uint_t *total);
ngx_int_t ngx_wa_ffi_shm_get_kv_value(ngx_wa_shm_t *shm, ngx_str_t *k,
ngx_str_t **v, uint32_t *cas);
ngx_int_t ngx_wa_ffi_shm_get_metric(ngx_str_t *k, u_char *m_buf, size_t mbs,
u_char *h_buf, size_t hbs);
ngx_int_t ngx_wa_ffi_shm_set_kv_value(ngx_wa_shm_t *shm, ngx_str_t *k,
ngx_str_t *v, uint32_t cas, ngx_int_t *written);


ngx_int_t ngx_wa_ffi_shm_define_metric(ngx_str_t *name,
ngx_wa_metric_type_e type);
ngx_int_t ngx_wa_ffi_shm_increment_metric(uint32_t metric_id, ngx_uint_t value);
ngx_int_t ngx_wa_ffi_shm_record_metric(uint32_t metric_id, ngx_uint_t value);
ngx_int_t ngx_wa_ffi_shm_get_metric(uint32_t metric_id, ngx_str_t *name,
u_char *m_buf, size_t mbs, u_char *h_buf, size_t hbs);


ngx_int_t
Expand Down
3 changes: 2 additions & 1 deletion src/common/metrics/ngx_wa_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void ngx_wa_metrics_histogram_get(ngx_wa_metrics_t *metrics, ngx_wa_metric_t *m,


static ngx_inline ngx_wa_metrics_histogram_t *
ngx_wa_metrics_set_histogram_buffer(ngx_wa_metric_t *m, u_char *b, size_t s)
ngx_wa_metrics_histogram_set_buffer(ngx_wa_metric_t *m, u_char *b, size_t s)
{
m->slots[0].histogram = (ngx_wa_metrics_histogram_t *) b;
m->slots[0].histogram->n_bins = (s - sizeof(ngx_wa_metrics_histogram_t))
Expand All @@ -109,4 +109,5 @@ ngx_wa_metrics_set_histogram_buffer(ngx_wa_metric_t *m, u_char *b, size_t s)
return m->slots[0].histogram;
}


#endif /* _NGX_WA_METRICS_H_INCLUDED_ */
2 changes: 1 addition & 1 deletion src/common/proxy_wasm/ngx_proxy_wasm_host.c
Original file line number Diff line number Diff line change
Expand Up @@ -1679,7 +1679,7 @@ ngx_proxy_wasm_hfuncs_get_metric(ngx_wavm_instance_t *instance,
ret_value = NGX_WAVM_HOST_LIFT(instance, args[1].of.i32, ngx_uint_t);

m = (ngx_wa_metric_t *) m_buf;
ngx_wa_metrics_set_histogram_buffer(m, h_buf, sizeof(h_buf));
ngx_wa_metrics_histogram_set_buffer(m, h_buf, sizeof(h_buf));

rc = ngx_wa_metrics_get(metrics, metric_id, m);
if (rc == NGX_OK && m->type != NGX_WA_METRIC_HISTOGRAM) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,28 @@ run_tests();

__DATA__

=== TEST 1: shm - get_zones()
get_zones() is silently called when resty.wasmx.shm module is loaded.
=== TEST 1: shm - setup_zones()
setup_zones() is silently called when resty.wasmx.shm module is loaded.

--- valgrind
--- load_nginx_modules: ngx_http_echo_module
--- wasm_modules: hostcalls
--- shm_kv: kv1 1m
--- shm_queue: q1 1m
--- main_config
wasm {
shm_kv kv1 1m;
shm_queue q1 1m;
}

--- config
location /t {
access_by_lua_block {
local shm = require "resty.wasmx.shm"

assert(shm.kv1 ~= nil)
assert(shm.q1 ~= nil)
assert(shm.metrics ~= nil)
}
assert(shm.kv1)
assert(shm.q1)
assert(shm.metrics)

echo ok;
ngx.say("ok")
}
}
--- response_body
ok
Expand All @@ -43,32 +46,30 @@ ok



=== TEST 2: shm - get_zones(), no zones
=== TEST 2: shm - setup_zones(), no zones
--- valgrind
--- load_nginx_modules: ngx_http_echo_module
--- config
location /t {
access_by_lua_block {
local shm = require "resty.wasmx.shm"

assert(shm.metrics == nil)
}

echo ok;
ngx.say("ok")
}
}
--- response_body
ok
--- error_log eval
qr/\[info\] .*? no shm zones found/,
--- no_error_log
[error]
[crit]
[emerg]
[alert]
[stub]



=== TEST 3: shm - get_zones(), bad pointer
=== TEST 3: shm - setup_zones(), bad pointer
A call with a bad pointer is simply ignored.

--- valgrind
Expand All @@ -82,12 +83,12 @@ A call with a bad pointer is simply ignored.
ffi.cdef [[
typedef struct ngx_wa_shm_t ngx_wa_shm_t;

typedef void (*ngx_wa_ffi_shm_get_zones_handler_pt)(ngx_wa_shm_t *shm);
typedef void (*ngx_wa_ffi_shm_setup_zones_handler_pt)(ngx_wa_shm_t *shm);

int ngx_wa_ffi_shm_get_zones(ngx_wa_ffi_shm_get_zones_handler_pt handler);
int ngx_wa_ffi_shm_setup_zones(ngx_wa_ffi_shm_setup_zones_handler_pt handler);
]]

C.ngx_wa_ffi_shm_get_zones(nil)
C.ngx_wa_ffi_shm_setup_zones(nil)
}

echo ok;
Expand Down
Loading

0 comments on commit 3feee24

Please sign in to comment.