Skip to content

Commit

Permalink
feat(shm/ffi) expose shm kv/metrics shms through FFI
Browse files Browse the repository at this point in the history
  • Loading branch information
casimiro committed Sep 3, 2024
1 parent b8a931a commit 728c175
Show file tree
Hide file tree
Showing 13 changed files with 1,735 additions and 0 deletions.
506 changes: 506 additions & 0 deletions lib/resty/wasmx/shm.lua

Large diffs are not rendered by default.

216 changes: 216 additions & 0 deletions src/common/lua/ngx_wasm_lua_ffi.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "ddebug.h"

#include <ngx_wasm_lua_ffi.h>
#include <ngx_wa_shm_kv.h>
#include <ngx_http_lua_common.h>


Expand Down Expand Up @@ -257,3 +258,218 @@ ngx_http_wasm_ffi_set_host_properties_handlers(ngx_http_request_t *r,
return ngx_proxy_wasm_properties_set_ffi_handlers(pwctx, getter, setter, r);
}
#endif


void
ngx_wa_ffi_shm_lock(ngx_wa_shm_t *shm)
{
ngx_wa_assert(shm);

ngx_wa_shm_lock(shm);
}


void
ngx_wa_ffi_shm_unlock(ngx_wa_shm_t *shm)
{
ngx_wa_assert(shm);

ngx_wa_shm_unlock(shm);
}


ngx_int_t
ngx_wa_ffi_shm_setup_zones(ngx_wa_ffi_shm_setup_zones_handler_pt setup_handler)
{
ngx_uint_t i;
ngx_array_t *shms = ngx_wasmx_shms((ngx_cycle_t *) ngx_cycle);
ngx_wa_shm_t *shm;
ngx_wa_shm_mapping_t *mappings;

ngx_wa_assert(setup_handler);

if (shms == NULL) {
return NGX_ABORT;
}

mappings = shms->elts;

for (i = 0; i < shms->nelts; i++) {
shm = mappings[i].zone->data;
setup_handler(shm);
}

return NGX_OK;
}


static void
shm_kv_retrieve_keys(ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel,
ngx_uint_t start, ngx_uint_t limit,
ngx_str_t **keys, ngx_uint_t *total)
{
ngx_wa_shm_kv_node_t *n = (ngx_wa_shm_kv_node_t *) node;

if (limit > 0 && (*total - start) == limit) {
return;
}

if (keys && *total >= start) {
keys[(*total - start)] = &n->key.str;
}

(*total)++;

if (node->left != sentinel) {
shm_kv_retrieve_keys(node->left, sentinel, start, limit, keys, total);
}

if (node->right != sentinel) {
shm_kv_retrieve_keys(node->right, sentinel, start, limit, keys, total);
}
}


ngx_int_t
ngx_wa_ffi_shm_iterate_keys(ngx_wa_shm_t *shm,
ngx_uint_t page, ngx_uint_t page_size,
ngx_str_t **keys, ngx_uint_t *total)
{
ngx_wa_shm_kv_t *kv;

ngx_wa_assert(shm && shm->type != NGX_WA_SHM_TYPE_QUEUE);

if (!ngx_wa_shm_locked(shm)) {
return NGX_ABORT;
}

kv = shm->data;

if (page >= kv->nelts) {
return NGX_DECLINED;
}

if (kv->rbtree.root != kv->rbtree.sentinel) {
shm_kv_retrieve_keys(kv->rbtree.root, kv->rbtree.sentinel,
page, page_size, keys, total);
}

*total = *total - page;

return NGX_OK;
}


ngx_int_t
ngx_wa_ffi_shm_get_kv_value(ngx_wa_shm_t *shm,
ngx_str_t *k, ngx_str_t **v, uint32_t *cas)
{
unsigned unlock = 0;
ngx_int_t rc;

ngx_wa_assert(shm && k && v && cas);

if (!ngx_wa_shm_locked(shm)) {
ngx_wa_shm_lock(shm);
unlock = 1;
}

rc = ngx_wa_shm_kv_get_locked(shm, k, NULL, v, cas);

if (unlock) {
ngx_wa_shm_unlock(shm);
}

return rc;
}


ngx_int_t
ngx_wa_ffi_shm_set_kv_value(ngx_wa_shm_t *shm,
ngx_str_t *k, ngx_str_t *v, uint32_t cas,
ngx_int_t *written)
{
unsigned unlock = 0;
ngx_int_t rc;

ngx_wa_assert(shm && k && v && written);

if (!ngx_wa_shm_locked(shm)) {
ngx_wa_shm_lock(shm);
unlock = 1;
}

rc = ngx_wa_shm_kv_set_locked(shm, k, v, cas, written);

if (unlock) {
ngx_wa_shm_unlock(shm);
}

return rc;
}


ngx_int_t
ngx_wa_ffi_shm_define_metric(ngx_str_t *name, ngx_wa_metric_type_e type,
uint32_t *metric_id)
{
ngx_int_t rc;
ngx_str_t prefixed_name;
ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle);
u_char buf[NGX_MAX_ERROR_STR];

ngx_wa_assert(metrics && name);

prefixed_name.data = buf;
prefixed_name.len = ngx_sprintf(buf, "lua.%V", name) - buf;

rc = ngx_wa_metrics_define(metrics, &prefixed_name, type, metric_id);
if (rc != NGX_OK) {
return rc;
}

return NGX_OK;
}


ngx_int_t
ngx_wa_ffi_shm_increment_metric(uint32_t metric_id, ngx_uint_t value)
{
ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle);

ngx_wa_assert(metrics);

return ngx_wa_metrics_increment(metrics, metric_id, value);
}


ngx_int_t
ngx_wa_ffi_shm_record_metric(uint32_t metric_id, ngx_uint_t value)
{
ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle);

ngx_wa_assert(metrics);

return ngx_wa_metrics_record(metrics, metric_id, value);
}


ngx_int_t
ngx_wa_ffi_shm_get_metric(uint32_t metric_id, ngx_str_t *name,
u_char *m_buf, size_t mbs, u_char *h_buf, size_t hbs)
{
ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle);
ngx_wa_metric_t *m;

ngx_wa_assert(metrics && (name || metric_id) && m_buf && h_buf);

m = (ngx_wa_metric_t *) m_buf;
ngx_wa_metrics_histogram_set_buffer(m, h_buf, hbs);

if (metric_id) {
return ngx_wa_metrics_get(metrics, metric_id, m);
}

return ngx_wa_metrics_get(metrics,
ngx_crc32_long(name->data, name->len), m);
}
38 changes: 38 additions & 0 deletions src/common/lua/ngx_wasm_lua_ffi.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ typedef struct {
} ngx_wasm_ffi_filter_t;


typedef void (*ngx_wa_ffi_shm_setup_zones_handler_pt)(ngx_wa_shm_t *shm);


ngx_int_t ngx_http_wasm_ffi_plan_new(ngx_wavm_t *vm,
ngx_wasm_ffi_filter_t *filters, size_t n_filters,
ngx_wasm_ops_plan_t **out, u_char *err, size_t *errlen);
Expand All @@ -43,4 +46,39 @@ ngx_int_t ngx_http_wasm_ffi_set_host_properties_handlers(ngx_http_request_t *r,
#endif


ngx_int_t ngx_wa_ffi_shm_setup_zones(
ngx_wa_ffi_shm_setup_zones_handler_pt handler);
ngx_int_t ngx_wa_ffi_shm_iterate_keys(ngx_wa_shm_t *shm, ngx_uint_t page,
ngx_uint_t page_size, ngx_str_t **keys, ngx_uint_t *total);
void ngx_wa_ffi_shm_lock(ngx_wa_shm_t *shm);
void ngx_wa_ffi_shm_unlock(ngx_wa_shm_t *shm);

ngx_int_t ngx_wa_ffi_shm_get_kv_value(ngx_wa_shm_t *shm, ngx_str_t *k,
ngx_str_t **v, uint32_t *cas);
ngx_int_t ngx_wa_ffi_shm_set_kv_value(ngx_wa_shm_t *shm, ngx_str_t *k,
ngx_str_t *v, uint32_t cas, ngx_int_t *written);


ngx_int_t ngx_wa_ffi_shm_define_metric(ngx_str_t *name,
ngx_wa_metric_type_e type, uint32_t *metric_id);
ngx_int_t ngx_wa_ffi_shm_increment_metric(uint32_t metric_id, ngx_uint_t value);
ngx_int_t ngx_wa_ffi_shm_record_metric(uint32_t metric_id, ngx_uint_t value);
ngx_int_t ngx_wa_ffi_shm_get_metric(uint32_t metric_id, ngx_str_t *name,
u_char *m_buf, size_t mbs, u_char *h_buf, size_t hbs);


ngx_int_t
ngx_wa_ffi_shm_one_slot_metric_size()
{
return NGX_WA_METRICS_ONE_SLOT_METRIC_SIZE;
}


ngx_int_t
ngx_wa_ffi_shm_max_histogram_size()
{
return NGX_WA_METRICS_MAX_HISTOGRAM_SIZE;
}


#endif /* _NGX_WASM_LUA_FFI_H_INCLUDED_ */
7 changes: 7 additions & 0 deletions src/common/shm/ngx_wa_shm.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,11 @@ ngx_wa_shm_unlock(ngx_wa_shm_t *shm)
}


static ngx_inline unsigned
ngx_wa_shm_locked(ngx_wa_shm_t *shm)
{
return (ngx_pid_t) *shm->shpool->mutex.lock == ngx_pid;
}


#endif /* _NGX_WA_SHM_H_INCLUDED_ */
69 changes: 69 additions & 0 deletions t/04-openresty/ffi/shm/001-setup_zones.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# vim:set ft= ts=4 sts=4 sw=4 et fdm=marker:

use strict;
use lib '.';
use t::TestWasmX;
use t::TestWasmX::Lua;

skip_no_openresty();

plan_tests(7);
run_tests();

__DATA__

=== TEST 1: shm - setup_zones()
setup_zones() is silently called when resty.wasmx.shm module is loaded.

--- valgrind
--- main_config
wasm {
shm_kv kv1 16k;
shm_queue q1 16k;
}

--- config
location /t {
access_by_lua_block {
local shm = require "resty.wasmx.shm"

assert(shm.kv1)
assert(shm.q1)
assert(shm.metrics)

assert(shm.inexistent_zone == nil)

ngx.say("ok")
}
}
--- response_body
ok
--- no_error_log
[error]
[crit]
[emerg]
[alert]
[stub]



=== TEST 2: shm - setup_zones(), no zones
--- valgrind
--- config
location /t {
access_by_lua_block {
local shm = require "resty.wasmx.shm"

assert(shm.metrics == nil)

ngx.say("ok")
}
}
--- response_body
ok
--- no_error_log
[error]
[crit]
[emerg]
[alert]
[stub]
Loading

0 comments on commit 728c175

Please sign in to comment.