From fa15534f1ca6da4016f8eae7436409a81d818536 Mon Sep 17 00:00:00 2001 From: Caio Ramos Casimiro Date: Fri, 2 Aug 2024 16:47:40 +0100 Subject: [PATCH] feat(shm/ffi) expose shm kv/metrics shms through FFI --- lib/resty/wasmx/shm.lua | 513 ++++++++++++++++++ src/common/lua/ngx_wasm_lua_ffi.c | 188 +++++++ src/common/lua/ngx_wasm_lua_ffi.h | 38 ++ t/04-openresty/ffi/shm/001-setup_zones.t | 68 +++ t/04-openresty/ffi/shm/002-iterate_keys.t | 178 ++++++ t/04-openresty/ffi/shm/003-get_keys.t | 153 ++++++ t/04-openresty/ffi/shm/101-kv_get.t | 62 +++ t/04-openresty/ffi/shm/102-kv_set.t | 57 ++ t/04-openresty/ffi/shm/201-metrics_get.t | 145 +++++ t/04-openresty/ffi/shm/202-metrics_define.t | 109 ++++ t/04-openresty/ffi/shm/203-metrics_record.t | 80 +++ .../ffi/shm/204-metrics_increment.t | 74 +++ 12 files changed, 1665 insertions(+) create mode 100644 lib/resty/wasmx/shm.lua create mode 100644 t/04-openresty/ffi/shm/001-setup_zones.t create mode 100644 t/04-openresty/ffi/shm/002-iterate_keys.t create mode 100644 t/04-openresty/ffi/shm/003-get_keys.t create mode 100644 t/04-openresty/ffi/shm/101-kv_get.t create mode 100644 t/04-openresty/ffi/shm/102-kv_set.t create mode 100644 t/04-openresty/ffi/shm/201-metrics_get.t create mode 100644 t/04-openresty/ffi/shm/202-metrics_define.t create mode 100644 t/04-openresty/ffi/shm/203-metrics_record.t create mode 100644 t/04-openresty/ffi/shm/204-metrics_increment.t diff --git a/lib/resty/wasmx/shm.lua b/lib/resty/wasmx/shm.lua new file mode 100644 index 000000000..2d388a1b2 --- /dev/null +++ b/lib/resty/wasmx/shm.lua @@ -0,0 +1,513 @@ +-- vim:set ts=4 sw=4 sts=4 et: + +local ffi = require "ffi" +local wasmx = require "resty.wasmx" + + +local C = ffi.C +local error = error +local type = type +local table_concat = table.concat +local table_new = table.new +local str_fmt = string.format +local ffi_cast = ffi.cast +local ffi_fill = ffi.fill +local ffi_new = ffi.new +local ffi_str = ffi.string +local ngx_log = ngx.log +local ngx_debug = ngx.DEBUG +local ngx_warn = ngx.WARN +local ngx_sleep = ngx.sleep +local FFI_ABORT = wasmx.FFI_ABORT +local FFI_DECLINED = wasmx.FFI_DECLINED +local FFI_ERROR = wasmx.FFI_ERROR +local FFI_OK = wasmx.FFI_OK + + +ffi.cdef [[ + typedef unsigned char u_char; + typedef uintptr_t ngx_uint_t; + typedef ngx_uint_t ngx_msec_t; + typedef struct ngx_log_s ngx_log_t; + typedef struct ngx_slab_pool_s ngx_slab_pool_t; + + typedef enum { + NGX_WA_SHM_TYPE_KV, + NGX_WA_SHM_TYPE_QUEUE, + NGX_WA_SHM_TYPE_METRICS, + } ngx_wa_shm_type_e; + + typedef enum { + NGX_WA_SHM_EVICTION_LRU, + NGX_WA_SHM_EVICTION_SLRU, + NGX_WA_SHM_EVICTION_NONE, + } ngx_wa_shm_eviction_e; + + typedef struct { + ngx_wa_shm_type_e type; + ngx_wa_shm_eviction_e eviction; + ngx_str_t name; + ngx_log_t *log; + ngx_slab_pool_t *shpool; + void *data; + } ngx_wa_shm_t; + + typedef enum { + NGX_WA_METRIC_COUNTER, + NGX_WA_METRIC_GAUGE, + NGX_WA_METRIC_HISTOGRAM, + } ngx_wa_metric_type_e; + + + typedef struct { + ngx_uint_t value; + ngx_msec_t last_update; + } ngx_wa_metrics_gauge_t; + + typedef struct { + uint32_t upper_bound; + uint32_t count; + } ngx_wa_metrics_bin_t; + + typedef struct { + uint8_t n_bins; + ngx_wa_metrics_bin_t bins[]; + } ngx_wa_metrics_histogram_t; + + typedef union { + ngx_uint_t counter; + ngx_wa_metrics_gauge_t gauge; + ngx_wa_metrics_histogram_t *histogram; + } ngx_wa_metric_val_t; + + typedef struct { + ngx_wa_metric_type_e metric_type; + ngx_wa_metric_val_t slots[]; + } ngx_wa_metric_t; + + + typedef void (*ngx_wa_ffi_shm_setup_zones_handler)(ngx_wa_shm_t *shm); + + + void ngx_wa_ffi_shm_lock(ngx_wa_shm_t *shm); + void ngx_wa_ffi_shm_unlock(ngx_wa_shm_t *shm); + int ngx_wa_ffi_shm_setup_zones(ngx_wa_ffi_shm_setup_zones_handler handler); + int ngx_wa_ffi_shm_iterate_keys(ngx_wa_shm_t *shm, + ngx_uint_t page, + ngx_uint_t page_size, + ngx_str_t **keys, + ngx_uint_t *total); + + int ngx_wa_ffi_shm_get_kv_value(ngx_wa_shm_t *shm, + ngx_str_t *k, + ngx_str_t **v, + uint32_t *cas); + int ngx_wa_ffi_shm_set_kv_value(ngx_wa_shm_t *shm, + ngx_str_t *k, + ngx_str_t *v, + uint32_t cas, + int *written); + + int ngx_wa_ffi_shm_define_metric(ngx_str_t *name, + ngx_wa_metric_type_e m_type); + int ngx_wa_ffi_shm_record_metric(uint32_t metric_id, int value); + int ngx_wa_ffi_shm_increment_metric(uint32_t metric_id, int value); + int ngx_wa_ffi_shm_get_metric(uint32_t metric_id, + ngx_str_t *name, + u_char *mbuf, size_t mbs, + u_char *hbuf, size_t hbs); + + int ngx_wa_ffi_shm_one_slot_metric_size(); + int ngx_wa_ffi_shm_max_histogram_size(); +]] + + +local _M = {} +local _setup_zones_handler +local _initialized = false +local _WASM_SHM_KEY = {} +local _DEFAULT_KEY_BATCH_SIZE = 100 +local _types = { + ffi_shm = { + SHM_TYPE_KV = 0, + SHM_TYPE_QUEUE = 1, + SHM_TYPE_METRICS = 2, + }, + ffi_metric = { + COUNTER = 0, + GAUGE = 1, + HISTOGRAM = 2, + } +} +local _metric_type_set = { + [_types.ffi_metric.COUNTER] = 1, + [_types.ffi_metric.GAUGE] = 2, + [_types.ffi_metric.HISTOGRAM] = 3, +} +local _kbuf = ffi_new("ngx_str_t *[?]", _DEFAULT_KEY_BATCH_SIZE) +local _mbs = C.ngx_wa_ffi_shm_one_slot_metric_size() +local _mbuf = ffi_new("u_char [?]", _mbs) +local _hbs = C.ngx_wa_ffi_shm_max_histogram_size() +local _hbuf = ffi_new("u_char [?]", _hbs) + + +local function lock_shm(zone) + local shm = zone[_WASM_SHM_KEY] + + C.ngx_wa_ffi_shm_lock(shm) +end + + +local function unlock_shm(zone) + local shm = zone[_WASM_SHM_KEY] + + C.ngx_wa_ffi_shm_unlock(shm) +end + + +local function key_iterator(ctx) + if ctx.i < ctx.page_total then + local ckey = ctx.ckeys[ctx.i] + ctx.i = ctx.i + 1 + + return ffi_str(ckey.data, ckey.len) + + else + ctx.cpage_total[0] = 0 + + local rc = C.ngx_wa_ffi_shm_iterate_keys(ctx.shm, + ctx.page, ctx.page_size, + ctx.ckeys, ctx.cpage_total) + if rc == FFI_ABORT then + local zone_name = ffi_str(ctx.shm.name.data, ctx.shm.name.len) + local err = { + "attempt to iterate an unlocked shm zone.", + "please call resty.wasmx.shm.%s:lock() before calling", + "iterate_keys() and resty.wasmx.shm.%s:unlock() after", + } + + ngx_log(ngx.ERR, str_fmt(table_concat(err), zone_name, zone_name)) + + return nil + end + + if rc == FFI_DECLINED then + return nil + end + + ngx_sleep(0) + + ctx.page_total = tonumber(ctx.cpage_total[0]) + ctx.page = ctx.page + ctx.page_total + ctx.i = 1 + + local ckey = ctx.ckeys[0] + + return ffi_str(ckey.data, ckey.len) + end +end + + +local function iterate_keys(zone, page_size) + if page_size ~= nil and type(page_size) ~= "number" then + error("page_size must be a number", 2) + end + + local ctx = { + shm = zone[_WASM_SHM_KEY], + i = 0, + page = 0, + page_size = page_size and page_size or _DEFAULT_KEY_BATCH_SIZE, + page_total = 0, + cpage_total = ffi_new("ngx_uint_t [1]"), + ckeys = page_size and ffi_new("ngx_str_t *[?]", page_size) or _kbuf + } + + return key_iterator, ctx +end + + +local function get_keys(zone, max) + if max ~= nil and type(max) ~= "number" then + error("max must be number", 2) + end + + local shm = zone[_WASM_SHM_KEY] + local page = 0 + local nkeys + local ctotal = ffi_new("ngx_uint_t [1]") + + lock_shm(zone) + + if max == 0 then + local rc = C.ngx_wa_ffi_shm_iterate_keys(shm, page, 0, nil, ctotal) + nkeys = tonumber(ctotal[0]) + + if rc == FFI_DECLINED then + unlock_shm(zone) + return {} + end + + else + nkeys = max and max or _DEFAULT_KEY_BATCH_SIZE + end + + local ckeys = ffi_new("ngx_str_t *[?]", nkeys) + + C.ngx_wa_ffi_shm_iterate_keys(shm, page, nkeys, ckeys, ctotal) + + unlock_shm(zone) + + local total = tonumber(ctotal[0]) + local keys = table_new(0, total) + + for i = 1, total do + local ckey = ckeys[i - 1] + keys[i] = ffi_str(ckey.data, ckey.len) + end + + return keys +end + + +local function get_kv_value(zone, key) + if type(key) ~= "string" then + error("key must be string", 2) + end + + local shm = zone[_WASM_SHM_KEY] + local cname = ffi_new("ngx_str_t", { data = key, len = #key }) + local cvalue = ffi_new("ngx_str_t *[1]") + local ccas = ffi_new("uint32_t [1]") + + local rc = C.ngx_wa_ffi_shm_get_kv_value(shm, cname, cvalue, ccas) + if rc == FFI_DECLINED then + return nil, nil, "key not found" + end + + return ffi_str(cvalue[0].data, cvalue[0].len), tonumber(ccas[0]) +end + + +local function set_kv_value(zone, key, value, cas) + if type(key) ~= "string" then + error("key must be a string", 2) + end + + if type(value) ~= "string" then + error("value must be a string", 2) + end + + if type(cas) ~= "number" then + error("cas must be a number", 2) + end + + local shm = zone[_WASM_SHM_KEY] + local cname = ffi_new("ngx_str_t", { data = key, len = #key }) + local cvalue = ffi_new("ngx_str_t", { data = value, len = #value }) + local written = ffi_new("uint32_t [1]") + + local rc = C.ngx_wa_ffi_shm_set_kv_value(shm, cname, cvalue, cas, written) + if rc == FFI_ERR then + return nil, "failed setting kv value, no memory" + end + + return tonumber(written[0]) +end + + +local function parse_cmetric(cmetric) + if cmetric.metric_type == _types.ffi_metric.COUNTER then + return { type = "counter", value = tonumber(cmetric.slots[0].counter) } + + elseif cmetric.metric_type == _types.ffi_metric.GAUGE then + return { type = "gauge", value = tonumber(cmetric.slots[0].gauge.value) } + + elseif cmetric.metric_type == _types.ffi_metric.HISTOGRAM then + local hbuf = cmetric.slots[0].histogram + local ch = ffi_cast("ngx_wa_metrics_histogram_t *", hbuf) + local h = { type = "histogram", value = {} } + + for i = 0, ch.n_bins do + local cb = ch.bins[i] + + if cb.upper_bound == 0 then + break + end + + h.value[cb.upper_bound] = cb.count + end + + return h + end +end + + +local function get_metric(zone, metric_id) + if type(metric_id) ~= "number" then + error("metric_id must be a number", 2) + end + + ffi_fill(_mbuf, _mbs) + ffi_fill(_hbuf, _hbs) + + local rc = C.ngx_wa_ffi_shm_get_metric(metric_id, nil, + _mbuf, _mbs, _hbuf, _hbs) + if rc == FFI_DECLINED then + return nil, "metric not found" + end + + return parse_cmetric(ffi_cast("ngx_wa_metric_t *", _mbuf)) +end + + +local function get_metric_by_name(zone, name, opts) + if type(name) ~= "string" or name == "" then + error("name must be a non-empty string", 2) + end + + if opts ~= nil then + if type(opts) ~= "table" then + error("opts must be a table", 2) + end + + if opts.prefix ~= nil and type(opts.prefix) ~= "boolean" then + error("opts.prefix must be a boolean", 2) + end + end + + name = (opts and opts.prefix == false) and name or "lua." .. name + local cname = ffi_new("ngx_str_t", { data = name, len = #name }) + + ffi_fill(_mbuf, _mbs) + ffi_fill(_hbuf, _hbs) + + local rc = C.ngx_wa_ffi_shm_get_metric(0, cname, _mbuf, _mbs, _hbuf, _hbs) + if rc == FFI_DECLINED then + return nil, "metric not found" + end + + return parse_cmetric(ffi_cast("ngx_wa_metric_t *", _mbuf)) +end + + +local function define_metric(zone, name, metric_type) + if type(name) ~= "string" or name == "" then + error("name must be a non-empty string", 2) + end + + if _metric_type_set[metric_type] == nil then + local err = { + "metric_type must be either", + "resty.wasmx.shm.metrics.COUNTER,", + "resty.wasmx.shm.metrics.GAUGE, or", + "resty.wasmx.shm.metrics.HISTOGRAM", + } + + error(table_concat(err, " "), 2) + end + + local cname = ffi_new("ngx_str_t", { data = name, len = #name }) + + local m_id = C.ngx_wa_ffi_shm_define_metric(cname, metric_type) + if m_id == FFI_ABORT then + return nil, "failed defining metric, metric name too long" + end + + if m_id == FFI_ERROR then + return nil, "failed defining metric, no memory" + end + + return m_id +end + + +local function record_metric(zone, metric_id, value) + if type(metric_id) ~= "number" then + error("metric_id must be a number", 2) + end + + if type(value) ~= "number" then + error("value must be a number", 2) + end + + local rc = C.ngx_wa_ffi_shm_record_metric(metric_id, value) + if rc == FFI_DECLINED then + return nil, "metric not found" + end + + return true +end + + +local function increment_metric(zone, metric_id, value) + if type(metric_id) ~= "number" then + error("metric_id must be a number", 2) + end + + if value ~= nil and (type(value) ~= "number" or value < 1) then + error("value must be a number greater than zero", 2) + end + + value = value and value or 1 + + local rc = C.ngx_wa_ffi_shm_increment_metric(metric_id, value) + if rc == FFI_DECLINED then + return nil, "metric not found" + end + + return true +end + + +local _setup_zones_handler = ffi_cast("ngx_wa_ffi_shm_setup_zones_handler", +function(shm) + local zone_name = ffi_str(shm.name.data, shm.name.len) + _M[zone_name] = { + [_WASM_SHM_KEY] = shm, + lock = lock_shm, + unlock = unlock_shm + } + + if shm.type == _types.ffi_shm.SHM_TYPE_KV then + _M[zone_name].get_keys = get_keys + _M[zone_name].iterate_keys = iterate_keys + _M[zone_name].get = get_kv_value + _M[zone_name].set = set_kv_value + + elseif shm.type == _types.ffi_shm.SHM_TYPE_QUEUE then + -- NYI + + elseif shm.type == _types.ffi_shm.SHM_TYPE_METRICS then + _M[zone_name].get_keys = get_keys + _M[zone_name].iterate_keys = iterate_keys + _M[zone_name].get = get_metric + _M[zone_name].get_by_name = get_metric_by_name + _M[zone_name].define = define_metric + _M[zone_name].increment = increment_metric + _M[zone_name].record = record_metric + _M[zone_name].COUNTER = _types.ffi_metric.COUNTER + _M[zone_name].GAUGE = _types.ffi_metric.GAUGE + _M[zone_name].HISTOGRAM = _types.ffi_metric.HISTOGRAM + end +end) + + +function _M.init() + if _initialized then + return + end + + local rc = C.ngx_wa_ffi_shm_setup_zones(_setup_zones_handler) + if rc == FFI_ABORT then + ngx_log(ngx.DEBUG, "no shm zones found for resty.wasmx interface") + end + + _initialized = true +end + + +_M.init() + + +return _M diff --git a/src/common/lua/ngx_wasm_lua_ffi.c b/src/common/lua/ngx_wasm_lua_ffi.c index 1c24ae600..726016a1b 100644 --- a/src/common/lua/ngx_wasm_lua_ffi.c +++ b/src/common/lua/ngx_wasm_lua_ffi.c @@ -4,6 +4,7 @@ #include "ddebug.h" #include +#include #include @@ -257,3 +258,190 @@ ngx_http_wasm_ffi_set_host_properties_handlers(ngx_http_request_t *r, return ngx_proxy_wasm_properties_set_ffi_handlers(pwctx, getter, setter, r); } #endif + + +void +ngx_wa_ffi_shm_lock(ngx_wa_shm_t *shm) +{ + ngx_wa_assert(shm); + + ngx_shmtx_lock(&shm->shpool->mutex); +} + + +void +ngx_wa_ffi_shm_unlock(ngx_wa_shm_t *shm) +{ + ngx_wa_assert(shm); + + ngx_shmtx_unlock(&shm->shpool->mutex); +} + + +ngx_int_t +ngx_wa_ffi_shm_setup_zones(ngx_wa_ffi_shm_setup_zones_handler_pt setup_handler) +{ + ngx_uint_t i; + ngx_array_t *shms = ngx_wasmx_shms((ngx_cycle_t *) ngx_cycle); + ngx_wa_shm_t *shm; + ngx_wa_shm_mapping_t *mappings; + + ngx_wa_assert(setup_handler); + + if (shms == NULL) { + return NGX_ABORT; + } + + mappings = shms->elts; + + for (i = 0; i < shms->nelts; i++) { + shm = mappings[i].zone->data; + setup_handler(shm); + } + + return NGX_OK; +} + + +static void +shm_kv_retrieve_keys(ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, + ngx_uint_t start, ngx_uint_t limit, + ngx_str_t **keys, ngx_uint_t *total) +{ + ngx_wa_shm_kv_node_t *n = (ngx_wa_shm_kv_node_t *) node; + + if (limit > 0 && (*total - start) == limit) { + return; + } + + if (keys && *total >= start) { + keys[(*total - start)] = &n->key.str; + } + + (*total)++; + + if (node->left != sentinel) { + shm_kv_retrieve_keys(node->left, sentinel, start, limit, keys, total); + } + + if (node->right != sentinel) { + shm_kv_retrieve_keys(node->right, sentinel, start, limit, keys, total); + } +} + + +ngx_int_t +ngx_wa_ffi_shm_iterate_keys(ngx_wa_shm_t *shm, + ngx_uint_t page, ngx_uint_t page_size, + ngx_str_t **keys, ngx_uint_t *total) +{ + ngx_wa_shm_kv_t *kv; + + ngx_wa_assert(shm && shm->type != NGX_WA_SHM_TYPE_QUEUE); + + if ((ngx_pid_t) *shm->shpool->mutex.lock != ngx_pid) { + return NGX_ABORT; + } + + kv = shm->data; + + if (page >= kv->nelts) { + return NGX_DECLINED; + } + + if (kv->rbtree.root != kv->rbtree.sentinel) { + shm_kv_retrieve_keys(kv->rbtree.root, kv->rbtree.sentinel, + page, page_size, keys, total); + } + + *total = *total - page; + + return NGX_OK; +} + + +ngx_int_t +ngx_wa_ffi_shm_get_kv_value(ngx_wa_shm_t *shm, + ngx_str_t *k, ngx_str_t **v, uint32_t *cas) +{ + ngx_wa_assert(shm && k && v && cas); + + return ngx_wa_shm_kv_get_locked(shm, k, NULL, v, cas); +} + + +ngx_int_t +ngx_wa_ffi_shm_set_kv_value(ngx_wa_shm_t *shm, + ngx_str_t *k, ngx_str_t *v, uint32_t cas, + ngx_int_t *written) +{ + ngx_wa_assert(shm && k && v && written); + + return ngx_wa_shm_kv_set_locked(shm, k, v, cas, written); +} + + +ngx_int_t +ngx_wa_ffi_shm_define_metric(ngx_str_t *name, ngx_wa_metric_type_e type) +{ + uint32_t metric_id; + ngx_int_t rc; + ngx_str_t prefixed_name; + ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle); + u_char buf[NGX_MAX_ERROR_STR]; + + ngx_wa_assert(metrics && name); + + prefixed_name.data = buf; + prefixed_name.len = ngx_sprintf(buf, "lua.%V", name) - buf; + + rc = ngx_wa_metrics_define(metrics, &prefixed_name, type, &metric_id); + if (rc != NGX_OK) { + return rc; + } + + return metric_id; +} + + +ngx_int_t +ngx_wa_ffi_shm_increment_metric(uint32_t metric_id, ngx_uint_t value) +{ + ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle); + + ngx_wa_assert(metrics); + + return ngx_wa_metrics_increment(metrics, metric_id, value); +} + + +ngx_int_t +ngx_wa_ffi_shm_record_metric(uint32_t metric_id, ngx_uint_t value) +{ + ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle); + + ngx_wa_assert(metrics); + + return ngx_wa_metrics_record(metrics, metric_id, value); +} + + +ngx_int_t +ngx_wa_ffi_shm_get_metric(uint32_t metric_id, ngx_str_t *name, + u_char *m_buf, size_t mbs, u_char *h_buf, size_t hbs) +{ + ngx_wa_metrics_t *metrics = ngx_wasmx_metrics((ngx_cycle_t *) ngx_cycle); + ngx_wa_metric_t *m; + + ngx_wa_assert(metrics && (name || metric_id) && m_buf && h_buf); + + m = (ngx_wa_metric_t *) m_buf; + ngx_wa_metrics_histogram_set_buffer(m, h_buf, hbs); + + if (metric_id) { + return ngx_wa_metrics_get(metrics, metric_id, m); + } + + return ngx_wa_metrics_get(metrics, + ngx_crc32_long(name->data, name->len), m); +} diff --git a/src/common/lua/ngx_wasm_lua_ffi.h b/src/common/lua/ngx_wasm_lua_ffi.h index 0ed226f86..2274b2833 100644 --- a/src/common/lua/ngx_wasm_lua_ffi.h +++ b/src/common/lua/ngx_wasm_lua_ffi.h @@ -24,6 +24,9 @@ typedef struct { } ngx_wasm_ffi_filter_t; +typedef void (*ngx_wa_ffi_shm_setup_zones_handler_pt)(ngx_wa_shm_t *shm); + + ngx_int_t ngx_http_wasm_ffi_plan_new(ngx_wavm_t *vm, ngx_wasm_ffi_filter_t *filters, size_t n_filters, ngx_wasm_ops_plan_t **out, u_char *err, size_t *errlen); @@ -43,4 +46,39 @@ ngx_int_t ngx_http_wasm_ffi_set_host_properties_handlers(ngx_http_request_t *r, #endif +ngx_int_t ngx_wa_ffi_shm_setup_zones( + ngx_wa_ffi_shm_setup_zones_handler_pt handler); +ngx_int_t ngx_wa_ffi_shm_iterate_keys(ngx_wa_shm_t *shm, ngx_uint_t page, + ngx_uint_t page_size, ngx_str_t **keys, ngx_uint_t *total); +void ngx_wa_ffi_shm_lock(ngx_wa_shm_t *shm); +void ngx_wa_ffi_shm_unlock(ngx_wa_shm_t *shm); + +ngx_int_t ngx_wa_ffi_shm_get_kv_value(ngx_wa_shm_t *shm, ngx_str_t *k, + ngx_str_t **v, uint32_t *cas); +ngx_int_t ngx_wa_ffi_shm_set_kv_value(ngx_wa_shm_t *shm, ngx_str_t *k, + ngx_str_t *v, uint32_t cas, ngx_int_t *written); + + +ngx_int_t ngx_wa_ffi_shm_define_metric(ngx_str_t *name, + ngx_wa_metric_type_e type); +ngx_int_t ngx_wa_ffi_shm_increment_metric(uint32_t metric_id, ngx_uint_t value); +ngx_int_t ngx_wa_ffi_shm_record_metric(uint32_t metric_id, ngx_uint_t value); +ngx_int_t ngx_wa_ffi_shm_get_metric(uint32_t metric_id, ngx_str_t *name, + u_char *m_buf, size_t mbs, u_char *h_buf, size_t hbs); + + +ngx_int_t +ngx_wa_ffi_shm_one_slot_metric_size() +{ + return NGX_WA_METRICS_ONE_SLOT_METRIC_SIZE; +} + + +ngx_int_t +ngx_wa_ffi_shm_max_histogram_size() +{ + return NGX_WA_METRICS_MAX_HISTOGRAM_SIZE; +} + + #endif /* _NGX_WASM_LUA_FFI_H_INCLUDED_ */ diff --git a/t/04-openresty/ffi/shm/001-setup_zones.t b/t/04-openresty/ffi/shm/001-setup_zones.t new file mode 100644 index 000000000..a86af7fa6 --- /dev/null +++ b/t/04-openresty/ffi/shm/001-setup_zones.t @@ -0,0 +1,68 @@ +# vim:set ft= ts=4 sts=4 sw=4 et fdm=marker: + +use strict; +use lib '.'; +use t::TestWasmX; +use t::TestWasmX::Lua; + +skip_no_openresty(); + +plan_tests(7); +run_tests(); + +__DATA__ + +=== TEST 1: shm - setup_zones() +setup_zones() is silently called when resty.wasmx.shm module is loaded. + +--- valgrind +--- load_nginx_modules: ngx_http_echo_module +--- main_config + wasm { + shm_kv kv1 1m; + shm_queue q1 1m; + } + +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + assert(shm.kv1) + assert(shm.q1) + assert(shm.metrics) + + ngx.say("ok") + } + } +--- response_body +ok +--- no_error_log +[error] +[crit] +[emerg] +[alert] +[stub] + + + +=== TEST 2: shm - setup_zones(), no zones +--- valgrind +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + assert(shm.metrics == nil) + + ngx.say("ok") + } + } +--- response_body +ok +--- no_error_log +[error] +[crit] +[emerg] +[alert] +[stub] diff --git a/t/04-openresty/ffi/shm/002-iterate_keys.t b/t/04-openresty/ffi/shm/002-iterate_keys.t new file mode 100644 index 000000000..ffb295817 --- /dev/null +++ b/t/04-openresty/ffi/shm/002-iterate_keys.t @@ -0,0 +1,178 @@ +# vim:set ft= ts=4 sts=4 sw=4 et fdm=marker: + +use strict; +use lib '.'; +use t::TestWasmX; +use t::TestWasmX::Lua; + +skip_no_openresty(); + +plan_tests(6); +run_tests(); + +__DATA__ + +=== TEST 1: shm - iterate_keys() +--- valgrind +--- main_config + wasm { + shm_kv kv1 16k; + } +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + for i, k in ipairs({ "k1", "k2", "k3", "k4", "k5" }) do + shm.kv1:set(k, "a_value", 0) + end + + shm.kv1:lock() + + for key in shm.kv1:iterate_keys() do + ngx.say("kv1 key: " .. key) + end + + shm.kv1:unlock() + + } + } +--- response_body_like +kv1 key: k\d +kv1 key: k\d +kv1 key: k\d +kv1 key: k\d +--- no_error_log +[error] +[crit] +[emerg] +[alert] + + + +=== TEST 2: shm - iterate_keys(), with limit +--- valgrind +--- main_config + wasm { + shm_kv kv1 16k; + } +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + for i, k in ipairs({ "k1", "k2", "k3", "k4", "k5" }) do + shm.kv1:set(k, "a_value", 0) + end + + shm.kv1:lock() + + for key in shm.kv1:iterate_keys() do + ngx.say("kv1 key: " .. key) + end + + shm.kv1:unlock() + } + } +--- response_body_like +kv1 key: k\d +kv1 key: k\d +kv1 key: k\d +kv1 key: k\d +kv1 key: k\d +--- no_error_log +[error] +[crit] +[emerg] +[alert] + + + +=== TEST 3: shm - iterate_keys(), no keys +--- valgrind +--- main_config + wasm { + shm_kv kv1 16k; + } +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + shm.kv1:lock() + + for key in shm.kv1:iterate_keys() do + ngx.say("kv1 key: " .. key) + end + + shm.kv1:unlock() + + ngx.say("ok") + } + } +--- response_body +ok +--- no_error_log +[error] +[crit] +[emerg] +[alert] + + + +=== TEST 4: shm - iterate_keys(), unlocked +--- valgrind +--- main_config + wasm { + shm_kv kv1 16k; + } +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + for i, k in ipairs({ "k1", "k2", "k3", "k4", "k5" }) do + shm.kv1:set(k, "a_value", 0) + end + + for key in shm.kv1:iterate_keys() do + ngx.say("kv1 key: " .. key) + end + + ngx.say("ok") + } + } +--- response_body +ok +--- error_log eval +qr/\[error\] .*? attempt to iterate an unlocked shm zone/ + +--- no_error_log +[crit] +[emerg] +[alert] + + + +=== TEST 5: shm - iterate_keys(), queue +--- valgrind +--- wasm_modules: hostcalls +--- shm_queue: q1 1m +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + shm.q1:iterate_keys() + + ngx.say("fail") + } + } +--- error_code: 500 +--- error_log eval +qr/\[error\] .*? attempt to call method 'iterate_keys' \(a nil value\)/ +--- no_error_log +[crit] +[emerg] +[alert] +[stub] diff --git a/t/04-openresty/ffi/shm/003-get_keys.t b/t/04-openresty/ffi/shm/003-get_keys.t new file mode 100644 index 000000000..6dfb082d8 --- /dev/null +++ b/t/04-openresty/ffi/shm/003-get_keys.t @@ -0,0 +1,153 @@ +# vim:set ft= ts=4 sts=4 sw=4 et fdm=marker: + +use strict; +use lib '.'; +use t::TestWasmX; +use t::TestWasmX::Lua; + +skip_no_openresty(); + +plan_tests(8); +run_tests(); + +__DATA__ + +=== TEST 1: shm - get_keys() +--- valgrind +--- wasm_modules: hostcalls +--- shm_kv: kv1 16k +--- config + location /t { + proxy_wasm hostcalls 'test=/t/shm/set_shared_data \ + key=kv1/k1 \ + value=hello1'; + + proxy_wasm hostcalls 'test=/t/shm/set_shared_data \ + key=kv1/k2 \ + value=hello2'; + + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + local keys = shm.kv1:get_keys() + + for i, k in ipairs(keys) do + ngx.log(ngx.INFO,"kv1 key: ", k) + end + + ngx.say("ok") + } + } +--- response_body +ok +--- grep_error_log eval: qr/\[info\] .*? kv1 key: kv1\/k[12]/ +--- grep_error_log_out eval +qr/\[info\] .*? kv1 key: kv1\/k1.*? +\[info\] .*? kv1 key: kv1\/k2/ +--- no_error_log +[error] +[crit] +[emerg] +[alert] +[stub] + + + +=== TEST 2: shm - get_keys(), with limit +--- valgrind +--- wasm_modules: hostcalls +--- shm_kv: kv1 16k +--- config + location /t { + proxy_wasm hostcalls 'test=/t/shm/set_shared_data \ + key=kv1/k1 \ + value=hello1'; + + proxy_wasm hostcalls 'test=/t/shm/set_shared_data \ + key=kv1/k2 \ + value=hello2'; + + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + local kv1 = shm.kv1 + local keys = kv1:get_keys(1) + + ngx.log(ngx.INFO, #keys, " key(s) retrieved") + + for i, k in ipairs(keys) do + ngx.log(ngx.INFO,"kv1 key: ", k) + end + + ngx.say("ok") + } + } +--- response_body +ok +--- grep_error_log eval: qr/\[info\] .*? kv1 key: kv1\/k[12]/ +--- grep_error_log_out eval +qr/\[info\] .*? kv1 key: kv1\/k1/ +--- no_error_log +[error] +[crit] +[emerg] +[alert] +qr/\[info\] .*? kv1 key: kv1\/k2/, + + + +=== TEST 3: shm - get_keys(), no keys +--- valgrind +--- main_config + wasm { + shm_kv kv1 16k; + } +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + assert(#shm.kv1:get_keys() == 0) + assert(#shm.kv1:get_keys(0) == 0) + assert(#shm.kv1:get_keys(1) == 0) + + ngx.say("ok") + } + } +--- no_error_log +[error] +[crit] +[emerg] +[alert] +[stub] +[stub] +[stub] + + + +=== TEST 4: shm - get_keys(), queue +--- valgrind +--- main_config + wasm { + shm_queue q1 16k; + } +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + shm.q1:get_keys() + + ngx.say("fail") + } + } +--- error_code: 500 +--- error_log eval +qr/\[error\] .*? attempt to call method 'get_keys' \(a nil value\)/ +--- no_error_log +[crit] +[emerg] +[alert] +[stub] +[stub] +[stub] diff --git a/t/04-openresty/ffi/shm/101-kv_get.t b/t/04-openresty/ffi/shm/101-kv_get.t new file mode 100644 index 000000000..923cf5dde --- /dev/null +++ b/t/04-openresty/ffi/shm/101-kv_get.t @@ -0,0 +1,62 @@ +# vim:set ft= ts=4 sts=4 sw=4 et fdm=marker: + +use strict; +use lib '.'; +use t::TestWasmX; +use t::TestWasmX::Lua; + +skip_no_openresty(); + +plan_tests(8); +run_tests(); + +__DATA__ + +=== TEST 1: shm_kv - get() +--- valgrind +--- wasm_modules: hostcalls +--- shm_kv: kv1 1m +--- config + location /t { + proxy_wasm hostcalls 'test=/t/shm/set_shared_data \ + key=kv1/k1 \ + value=hello1'; + + proxy_wasm hostcalls 'test=/t/shm/set_shared_data \ + key=kv1/k2 \ + value=hello2'; + + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + local str_format = string.format + + shm.kv1:lock() + + for k in shm.kv1:iterate_keys() do + local v, cas, err = shm.kv1:get(k) + + if not v then + ngx.say(err) + end + + ngx.log(ngx.INFO, str_format("kv1 %s: %s, cas: %d", k, v, cas)) + end + + shm.kv1:unlock() + + ngx.say("ok") + } + } +--- response_body +ok +--- error_log eval +[ + qr/\[info\] .*? kv1 kv1\/k1: hello1, cas: 1/, + qr/\[info\] .*? kv1 kv1\/k2: hello2, cas: 1/, +] +--- no_error_log +[error] +[crit] +[emerg] +[alert] diff --git a/t/04-openresty/ffi/shm/102-kv_set.t b/t/04-openresty/ffi/shm/102-kv_set.t new file mode 100644 index 000000000..21b9a70a2 --- /dev/null +++ b/t/04-openresty/ffi/shm/102-kv_set.t @@ -0,0 +1,57 @@ +# vim:set ft= ts=4 sts=4 sw=4 et fdm=marker: + +use strict; +use lib '.'; +use t::TestWasmX; +use t::TestWasmX::Lua; + +skip_no_openresty(); + +plan_tests(7); +run_tests(); + +__DATA__ + +=== TEST 1: shm_kv - set() +--- valgrind +--- shm_kv: kv1 1m +--- wasm_modules: on_phases +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + local str_fmt = string.format + local k = "a_key" + local v = "a_value" + local cas = 0 + + local written, err = shm.kv1:set(k, v, cas) + if not written then + ngx.say(err) + return + end + + ngx.log(ngx.INFO, str_fmt("written: %d", written)) + assert(written == 1) + + local retrieved_v, cas, err = shm.kv1:get(k) + if not retrieved_v then + ngx.say(err) + return + end + + ngx.log(ngx.INFO, str_fmt("kv1 %s: %s, cas: %d", k, retrieved_v, cas)) + + ngx.say("ok") + } + } +--- response_body +ok +--- error_log eval +qr/\[info\] .*? kv1 a_key: a_value, cas: 1/ +--- no_error_log +[error] +[crit] +[emerg] +[alert] diff --git a/t/04-openresty/ffi/shm/201-metrics_get.t b/t/04-openresty/ffi/shm/201-metrics_get.t new file mode 100644 index 000000000..3e3765f8c --- /dev/null +++ b/t/04-openresty/ffi/shm/201-metrics_get.t @@ -0,0 +1,145 @@ +# vim:set ft= ts=4 sts=4 sw=4 et fdm=marker: + +use strict; +use lib '.'; +use t::TestWasmX; +use t::TestWasmX::Lua; + +skip_no_openresty(); + +plan_tests(6); +run_tests(); + +__DATA__ + +=== TEST 1: shm_metrics - get, by name +--- valgrind +--- main_config + wasm {} +--- config + location /t { + access_by_lua_block { + local pretty = require "pl.pretty" + local shm = require "resty.wasmx.shm" + + local buf = {} + local c1 = shm.metrics:define("c1", shm.metrics.COUNTER) + local g1 = shm.metrics:define("g1", shm.metrics.GAUGE) + local h1 = shm.metrics:define("h1", shm.metrics.HISTOGRAM) + + shm.metrics:increment(c1) + shm.metrics:record(g1, 10) + shm.metrics:record(h1, 100) + + for i, name in ipairs({ "c1", "g1", "h1" }) do + local m, err = shm.metrics:get_by_name(name) + + if not m then + ngx.say(err) + end + + buf[#buf + 1] = string.format("%s: %s", name, pretty.write(m, "")) + end + + ngx.say(table.concat(buf, "\n")) + } + } +--- response_body +c1: {type="counter",value=1} +g1: {type="gauge",value=10} +h1: {type="histogram",value={[4294967295]=0,[128]=1}} +--- no_error_log +[error] +[crit] +[emerg] +[alert] + + + +=== TEST 2: shm_metrics - get, by name (prefix = false) +--- valgrind +--- wasm_modules: hostcalls +--- config + location /t { + proxy_wasm hostcalls 'on_configure=define_metrics \ + on=request_headers \ + n_increments=13 \ + test=/t/metrics/increment_counters \ + metrics=c1'; + + proxy_wasm hostcalls 'on_configure=define_metrics \ + on=request_headers \ + test=/t/metrics/toggle_gauges \ + metrics=g1'; + + proxy_wasm hostcalls 'on_configure=define_metrics \ + on=request_headers \ + test=/t/metrics/record_histograms \ + metrics=h1 \ + value=10'; + + proxy_wasm hostcalls 'on_configure=define_metrics \ + on=request_headers \ + test=/t/metrics/record_histograms \ + metrics=h1 \ + value=100'; + + access_by_lua_block { + local pretty = require "pl.pretty" + local shm = require "resty.wasmx.shm" + + local buf = {} + + shm.metrics:lock() + + for name in shm.metrics:iterate_keys() do + local m = shm.metrics:get_by_name(name, { prefix = false }) + + if not m then + ngx.say(err) + end + + buf[#buf + 1] = string.format("%s: %s", name, pretty.write(m, "")) + end + + shm.metrics:unlock() + + ngx.say(table.concat(buf, "\n")) + } + } +--- response_body +pw.hostcalls.c1: {type="counter",value=13} +pw.hostcalls.g1: {type="gauge",value=1} +pw.hostcalls.h1: {type="histogram",value={[4294967295]=0,[128]=1,[16]=1}} +--- no_error_log +[error] +[crit] +[emerg] +[alert] + + + +=== TEST 3: shm_metrics - get, by name (invalid name) +--- valgrind +--- main_config + wasm {} +--- config + location /t { + access_by_lua_block { + local pretty = require "pl.pretty" + local shm = require "resty.wasmx.shm" + + local m, err = shm.metrics:get_by_name("invalid_name") + + if not m then + ngx.say(err) + end + } + } +--- response_body +metric not found +--- no_error_log +[error] +[crit] +[emerg] +[alert] diff --git a/t/04-openresty/ffi/shm/202-metrics_define.t b/t/04-openresty/ffi/shm/202-metrics_define.t new file mode 100644 index 000000000..9c9f3b7d8 --- /dev/null +++ b/t/04-openresty/ffi/shm/202-metrics_define.t @@ -0,0 +1,109 @@ +# vim:set ft= ts=4 sts=4 sw=4 et fdm=marker: + +use strict; +use lib '.'; +use t::TestWasmX; +use t::TestWasmX::Lua; + +skip_no_openresty(); + +plan_tests(9); +run_tests(); + +__DATA__ + +=== TEST 1: shm_metrics - define() +--- valgrind +--- main_config + wasm {} +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + local metrics = shm.metrics + + metrics:define("c1", metrics.COUNTER) + metrics:define("g1", metrics.GAUGE) + metrics:define("h1", metrics.HISTOGRAM) + + ngx.say("ok") + } + } +--- response_body +ok +--- error_log eval +[ + qr/.*? \[info\] .*? defined counter "lua.c1" with id \d+/, + qr/.*? \[info\] .*? defined gauge "lua.g1" with id \d+/, + qr/.*? \[info\] .*? defined histogram "lua.h1" with id \d+/, +] +--- no_error_log +[error] +[crit] +[emerg] +[alert] + + + +=== TEST 2: shm_metrics - define(), name too long +--- valgrind +--- main_config + wasm {} +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + local str_rep = string.rep + local metrics = shm.metrics + + local long_name = str_rep("x", 500) + local mid, err = metrics:define(long_name, metrics.HISTOGRAM) + + assert(mid == nil) + assert(err == "failed defining metric, metric name too long") + + ngx.say("ok") + } + } +--- response_body +ok +--- no_error_log +[error] +[crit] +[emerg] +[alert] +defined histogram +[stub] +[stub] + + + +=== TEST 3: shm_metrics - define(), invalid metric type +--- valgrind +--- main_config + wasm {} +--- config + location /t { + access_by_lua_block { + local shm = require "resty.wasmx.shm" + + local metrics = shm.metrics + + metrics:define("c1", 10) + + ngx.say("fail") + } + } +--- error_code: 500 +--- error_log eval +qr/\[error\] .*? metric_type must be either resty.wasmx.shm.metrics.COUNTER, resty.wasmx.shm.metrics.GAUGE, or resty.wasmx.shm.metrics.HISTOGRAM/ +--- no_error_log +[crit] +[emerg] +[alert] +defined counter +[stub] +[stub] +[stub] diff --git a/t/04-openresty/ffi/shm/203-metrics_record.t b/t/04-openresty/ffi/shm/203-metrics_record.t new file mode 100644 index 000000000..1439eade0 --- /dev/null +++ b/t/04-openresty/ffi/shm/203-metrics_record.t @@ -0,0 +1,80 @@ +# vim:set ft= ts=4 sts=4 sw=4 et fdm=marker: + +use strict; +use lib '.'; +use t::TestWasmX; +use t::TestWasmX::Lua; + +skip_no_openresty(); + +plan_tests(6); +run_tests(); + +__DATA__ + +=== TEST 1: shm_metrics - record() +--- valgrind +--- main_config + wasm {} +--- config + location /t { + access_by_lua_block { + local pretty = require "pl.pretty" + local shm = require "resty.wasmx.shm" + + local pwrite = pretty.write + local str_format = string.format + + local buf = "" + local g1 = shm.metrics:define("g1", shm.metrics.GAUGE) + local h1 = shm.metrics:define("h1", shm.metrics.HISTOGRAM) + + shm.metrics:record(g1, 10) + shm.metrics:record(h1, 100) + + buf = str_format("%s\ng1: %s", buf, pwrite(shm.metrics:get(g1), "")) + buf = str_format("%s\nh1: %s", buf, pwrite(shm.metrics:get(h1), "")) + + ngx.say(buf:sub(2)) + } + } +--- response_body +g1: {type="gauge",value=10} +h1: {type="histogram",value={[4294967295]=0,[128]=1}} +--- no_error_log +[error] +[crit] +[emerg] +[alert] + + + +=== TEST 2: shm_metrics - record(), inexistent metric +--- valgrind +--- main_config + wasm {} +--- config + location /t { + access_by_lua_block { + local wasmx = require "resty.wasmx" + local shm = require "resty.wasmx.shm" + + local FFI_DECLINED = wasmx.FFI_DECLINED + local metrics = shm.metrics + + local ok, err = metrics:record(1, 10) + + if not ok then + return ngx.say(err) + end + + ngx.say("fail") + } + } +--- response_body +metric not found +--- no_error_log +[error] +[crit] +[emerg] +[alert] diff --git a/t/04-openresty/ffi/shm/204-metrics_increment.t b/t/04-openresty/ffi/shm/204-metrics_increment.t new file mode 100644 index 000000000..50e1871a6 --- /dev/null +++ b/t/04-openresty/ffi/shm/204-metrics_increment.t @@ -0,0 +1,74 @@ +# vim:set ft= ts=4 sts=4 sw=4 et fdm=marker: + +use strict; +use lib '.'; +use t::TestWasmX; +use t::TestWasmX::Lua; + +skip_no_openresty(); + +plan_tests(6); +run_tests(); + +__DATA__ + +=== TEST 1: shm_metrics - increment() +--- valgrind +--- main_config + wasm {} +--- config + location /t { + access_by_lua_block { + local pretty = require "pl.pretty" + local shm = require "resty.wasmx.shm" + + local pwrite = pretty.write + local str_format = string.format + + local c1 = shm.metrics:define("c1", shm.metrics.COUNTER) + + shm.metrics:increment(c1) + shm.metrics:increment(c1, 10) + + ngx.say(str_format("c1: %s", pwrite(shm.metrics:get(c1), ""))) + } + } +--- response_body +c1: {type="counter",value=11} +--- no_error_log +[error] +[crit] +[emerg] +[alert] + + + +=== TEST 2: shm_metrics - increment(), inexistent metric +--- valgrind +--- main_config + wasm {} +--- config + location /t { + access_by_lua_block { + local wasmx = require "resty.wasmx" + local shm = require "resty.wasmx.shm" + + local FFI_DECLINED = wasmx.FFI_DECLINED + local metrics = shm.metrics + + local ok, err = metrics:increment(1, 10) + + if not ok then + return ngx.say(err) + end + + ngx.say("fail") + } + } +--- response_body +metric not found +--- no_error_log +[error] +[crit] +[emerg] +[alert]