Skip to content

Commit

Permalink
feat(shm/ffi) expose kv and metrics write operations to Lua land
Browse files Browse the repository at this point in the history
This commit also changes shm-related FFI functions from returning
NGX_DECLINED to returning NGX_ABORT, when the necessary conditions for
the function completion are not satisfied.
  • Loading branch information
casimiro committed Aug 12, 2024
1 parent 6cc6f37 commit dc68e4a
Show file tree
Hide file tree
Showing 10 changed files with 771 additions and 131 deletions.
242 changes: 224 additions & 18 deletions lib/resty/wasmx/shm.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ local ngx_log = ngx.log
local initialized = false
local FFI_DECLINED = wasmx.FFI_DECLINED
local FFI_ERROR = wasmx.FFI_ERROR
local FFI_OK = wasmx.FFI_OK
local WASM_SHM_KEY = {} -- ensures shm is only locally accessible


local get_zones_handler

local types = {
Expand All @@ -33,6 +35,13 @@ local types = {
}
}

local metric_type_set = {
[types.ffi_metric.COUNTER] = 1,
[types.ffi_metric.GAUGE] = 2,
[types.ffi_metric.HISTOGRAM] = 3,
}


local _M = {}


Expand Down Expand Up @@ -105,13 +114,23 @@ ffi.cdef [[
int ngx_wa_ffi_shm_get_keys(ngx_wa_shm_t *shm,
ngx_uint_t n,
ngx_str_t **keys);

int ngx_wa_ffi_shm_get_kv_value(ngx_wa_shm_t *shm,
ngx_str_t *k,
ngx_str_t **v,
uint32_t *cas);
int ngx_wa_ffi_shm_get_metric(ngx_str_t *k,
int ngx_wa_ffi_shm_set_kv_value(ngx_wa_shm_t *shm,
ngx_str_t *k, ngx_str_t *v,
uint32_t cas, int *written);

int ngx_wa_ffi_shm_define_metric(ngx_str_t *name,
ngx_wa_metric_type_e m_type);
int ngx_wa_ffi_shm_record_metric(uint32_t metric_id, int value);
int ngx_wa_ffi_shm_increment_metric(uint32_t metric_id, int value);
int ngx_wa_ffi_shm_get_metric(uint32_t metric_id, ngx_str_t *name,
u_char *mbuf, size_t mbs,
u_char *hbuf, size_t hbs);

int ngx_wa_ffi_shm_one_slot_metric_size();
int ngx_wa_ffi_shm_max_histogram_size();
]]
Expand Down Expand Up @@ -157,17 +176,49 @@ local function get_kv_value(zone, key)
local cname = ffi_new("ngx_str_t", { data = key, len = #key })
local cvalue = ffi_new("ngx_str_t *[1]")
local ccas = ffi_new("uint32_t [1]")
local pccas = ffi_cast("uint32_t *", ccas)

local rc = C.ngx_wa_ffi_shm_get_kv_value(shm, cname, cvalue, ccas)
local rc = C.ngx_wa_ffi_shm_get_kv_value(shm, cname, cvalue, pccas)
if rc == FFI_ERROR then
return nil, "failed retrieving kv value"
return nil, nil, "failed retrieving kv value"
end

if rc == FFI_DECLINED then
return nil, nil, "key not found"
end

return ffi_str(cvalue[0].data, cvalue[0].len), tonumber(ccas[0])
end


local function set_kv_value(zone, key, value, cas)
if type(key) ~= "string" then
error("key must be a string", 2)
end

if type(value) ~= "string" then
error("value must be a string", 2)
end

if type(cas) ~= "number" then
error("cas must be a number", 2)
end

local shm = zone[WASM_SHM_KEY]
local cname = ffi_new("ngx_str_t", { data = key, len = #key })
local cvalue = ffi_new("ngx_str_t", { data = value, len = #value })
local written = ffi_new("uint32_t [1]")

local rc = C.ngx_wa_ffi_shm_set_kv_value(shm, cname, cvalue, cas, written)
if rc ~= FFI_OK then
return nil, "failed setting kv value"
end

if rc == FFI_DECLINED then
return nil, "key not found"
end

return ffi_str(cvalue[0].data, cvalue[0].len)
return tonumber(written[0])
end


Expand All @@ -179,27 +230,33 @@ local function new_zero_filled_buffer(ctype, size)
end


local function get_metric(zone, name)
if type(name) ~= "string" then
error("name must be a string", 2)
end
local function new_zero_filled_buffer(ctype, size)
local buf = ffi_new(ctype, size)
ffi_fill(buf, size)

local cname = ffi_new("ngx_str_t", { data = name, len = #name })
return buf
end


local function new_metric_buffer()
local mbs = C.ngx_wa_ffi_shm_one_slot_metric_size()
local hbs = C.ngx_wa_ffi_shm_max_histogram_size()
local mbuf = new_zero_filled_buffer("u_char [?]", mbs)

return mbuf, mbs
end

local function new_histogram_buffer()
local hbs = C.ngx_wa_ffi_shm_max_histogram_size()
local hbuf = new_zero_filled_buffer("u_char [?]", hbs)

local rc = C.ngx_wa_ffi_shm_get_metric(cname, mbuf, mbs, hbuf, hbs)
if rc == FFI_ERROR then
return nil, "failed retrieving metric"
end
return hbuf, hbs
end

if rc == FFI_DECLINED then
return nil, "metric not found"
end

local cmetric = ffi_cast("ngx_wa_metric_t *", mbuf)
local function parse_cmetric(cmetric)
if type(cmetric) ~= "cdata" then
error("cmetric must be a cdata", 2)
end

if cmetric.metric_type == types.ffi_metric.COUNTER then
return { type = "counter", value = tonumber(cmetric.slots[0].counter) }
Expand All @@ -208,6 +265,7 @@ local function get_metric(zone, name)
return { type = "gauge", value = tonumber(cmetric.slots[0].gauge.value) }

elseif cmetric.metric_type == types.ffi_metric.HISTOGRAM then
local hbuf = cmetric.slots[0].histogram
local ch = ffi_cast("ngx_wa_metrics_histogram_t *", hbuf)
local h = { type = "histogram", value = {} }

Expand All @@ -223,6 +281,146 @@ local function get_metric(zone, name)

return h
end

return nil, "unknown metric type"
end


local function get_metric(zone, metric_id)
if type(metric_id) ~= "number" then
error("mid must be a number", 2)
end

local mbuf, mbs = new_metric_buffer()
local hbuf, hbs = new_histogram_buffer()

local rc = C.ngx_wa_ffi_shm_get_metric(metric_id, nil, mbuf, mbs, hbuf, hbs)
if rc == FFI_ERROR then
return nil, "failed retrieving metric"
end

if rc == FFI_DECLINED then
return nil, "metric not found"
end

return parse_cmetric(ffi_cast("ngx_wa_metric_t *", mbuf))
end


local function get_metric_by_name(zone, name, opts)
local prefix = true

if type(name) ~= "string" then
error("name must be a string", 2)
end

if opts ~= nil then
if type(opts) ~= "table" then
error("opts must be a table", 2)
end

if opts.prefix ~= nil then
if type(opts.prefix) ~= "boolean" then
error("opts.prefix must be a boolean")
end

prefix = opts.prefix
end
end

if prefix then
name = "lua." .. name
end

local cname = ffi_new("ngx_str_t", { data = name, len = #name })
local mbuf, mbs = new_metric_buffer()
local hbuf, hbs = new_histogram_buffer()

local rc = C.ngx_wa_ffi_shm_get_metric(0, cname, mbuf, mbs, hbuf, hbs)
if rc == FFI_ERROR then
return nil, "failed retrieving metric"
end

if rc == FFI_DECLINED then
return nil, "metric not found"
end

return parse_cmetric(ffi_cast("ngx_wa_metric_t *", mbuf))
end


local function define_metric(zone, name, metric_type)
if type(name) ~= "string" or name == "" then
error("name must be a non-empty string", 2)
end

if metric_type_set[metric_type] == nil then
error("metric_type must be either \
resty.wasmx.shm.metrics.COUNTER, \
resty.wasmx.shm.metrics.GAUGE or \
resty.wasmx.shm.metrics.HISTOGRAM", 2)
end

local cname = ffi_new("ngx_str_t", { data = name, len = #name })

local m_id = C.ngx_wa_ffi_shm_define_metric(cname, metric_type)
if m_id == FFI_ERROR then
return nil, "failed defining metric"
end

return m_id
end


local function record_metric(zone, metric_id, value)
if type(metric_id) ~= "number" then
error("metric_id must be a number", 2)
end

if type(value) ~= "number" then
error("value must be a number", 2)
end


local rc = C.ngx_wa_ffi_shm_record_metric(metric_id, value)
if rc == FFI_ERROR then
return nil, "failed recording metric"
end

if rc == FFI_DECLINED then
return nil, "metric not found"
end

return true
end


local function increment_metric(zone, metric_id, value)
local increment = 1

if type(metric_id) ~= "number" then
error("metric_id must be a number", 2)
end

if value ~= nil then
if type(value) ~= "number" or value < 1 then
error("value must be greater than zero", 2)
end

increment = value
end


local rc = C.ngx_wa_ffi_shm_increment_metric(metric_id, increment)
if rc == FFI_ERROR then
return nil, "failed incrementing metric"
end

if rc == FFI_DECLINED then
return nil, "metric not found"
end

return true
end


Expand All @@ -234,13 +432,21 @@ function(shm)
if shm.type == types.ffi_shm.SHM_TYPE_KV then
_M[zone_name].get_keys = get_keys
_M[zone_name].get = get_kv_value
_M[zone_name].set = set_kv_value

elseif shm.type == types.ffi_shm.SHM_TYPE_QUEUE then
-- NYI

elseif shm.type == types.ffi_shm.SHM_TYPE_METRICS then
_M[zone_name].define = define_metric
_M[zone_name].record = record_metric
_M[zone_name].increment = increment_metric
_M[zone_name].get_keys = get_keys
_M[zone_name].get = get_metric
_M[zone_name].get_by_name = get_metric_by_name
_M[zone_name].COUNTER = types.ffi_metric.COUNTER
_M[zone_name].GAUGE = types.ffi_metric.GAUGE
_M[zone_name].HISTOGRAM = types.ffi_metric.HISTOGRAM
end
end)

Expand Down
Loading

0 comments on commit dc68e4a

Please sign in to comment.