Skip to content

Commit

Permalink
perf(clustering): store entity only once in lmdb
Browse files Browse the repository at this point in the history
Signed-off-by: Aapo Talvensaari <[email protected]>
  • Loading branch information
bungle committed Nov 15, 2023
1 parent c90a5aa commit 683b06b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 17 deletions.
6 changes: 3 additions & 3 deletions kong/clustering/protocol.lua
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,11 @@ local function process_entities(msg)
validate_entity(schema, entity)
local entity_marshalled = assert(marshall(entity))
LMDB_TXN:set(pk_key, entity_marshalled)
LMDB_TXN:set(dao:cache_key(id, nil, nil, nil, nil, "*"), entity_marshalled)
LMDB_TXN:set(dao:cache_key(id, nil, nil, nil, nil, "*"), pk_key)
if schema.cache_key then
local cache_key = dao:cache_key(entity)
if cache_key ~= pk_key then
LMDB_TXN:set(cache_key, entity_marshalled)
LMDB_TXN:set(cache_key, pk_key)
end
end

Expand All @@ -470,7 +470,7 @@ local function process_entities(msg)
_, unique_key = next(unique_key)
end
LMDB_TXN:set(ENTITY_NAME .. "|" .. (schema.fields[unique].unique_across_ws and "" or ws)
.. "|" .. unique .. ":" .. unique_key, entity_marshalled)
.. "|" .. unique .. ":" .. unique_key, pk_key)
end
end
end
Expand Down
20 changes: 10 additions & 10 deletions kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ local function load_into_cache(entities, meta, hash)

assert(type(ws_id) == "string")

local cache_key = dao:cache_key(id, nil, nil, nil, nil, item.ws_id)
local item_cache_key = dao:cache_key(id, nil, nil, nil, nil, item.ws_id)

item = remove_nulls(item)
if transform then
Expand All @@ -273,24 +273,24 @@ local function load_into_cache(entities, meta, hash)
return nil, err
end

t:set(cache_key, item_marshalled)
t:set(item_cache_key, item_marshalled)

local global_query_cache_key = dao:cache_key(id, nil, nil, nil, nil, "*")
t:set(global_query_cache_key, item_marshalled)
t:set(global_query_cache_key, item_cache_key)

-- insert individual entry for global query
insert(keys_by_ws["*"], cache_key)
insert(keys_by_ws["*"], item_cache_key)

-- insert individual entry for workspaced query
if ws_id ~= "" then
keys_by_ws[ws_id] = keys_by_ws[ws_id] or {}
local keys = keys_by_ws[ws_id]
insert(keys, cache_key)
insert(keys, item_cache_key)
end

if schema.cache_key then
local cache_key = dao:cache_key(item)
t:set(cache_key, item_marshalled)
t:set(cache_key, item_cache_key)
end

for i = 1, #uniques do
Expand All @@ -306,7 +306,7 @@ local function load_into_cache(entities, meta, hash)
local key = unique_field_key(entity_name, ws_id, unique, unique_key,
schema.fields[unique].unique_across_ws)

t:set(key, item_marshalled)
t:set(key, item_cache_key)
end
end

Expand All @@ -320,12 +320,12 @@ local function load_into_cache(entities, meta, hash)
-- insert paged search entry for global query
page_for[ref]["*"] = page_for[ref]["*"] or {}
page_for[ref]["*"][fid] = page_for[ref]["*"][fid] or {}
insert(page_for[ref]["*"][fid], cache_key)
insert(page_for[ref]["*"][fid], item_cache_key)

-- insert paged search entry for workspaced query
page_for[ref][ws_id] = page_for[ref][ws_id] or {}
page_for[ref][ws_id][fid] = page_for[ref][ws_id][fid] or {}
insert(page_for[ref][ws_id][fid], cache_key)
insert(page_for[ref][ws_id][fid], item_cache_key)
end
end

Expand All @@ -341,7 +341,7 @@ local function load_into_cache(entities, meta, hash)

taggings[tag_name] = taggings[tag_name] or {}
taggings[tag_name][ws] = taggings[tag_name][ws] or {}
taggings[tag_name][ws][cache_key] = true
taggings[tag_name][ws][item_cache_key] = true
end
end
end
Expand Down
21 changes: 17 additions & 4 deletions kong/db/strategies/off/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ local function select_by_key(schema, key)
end


local function select_by_ref(schema, ref)
local key, err = lmdb_get(ref)
if not key then
return nil, err
end

return select_by_key(schema, key)
end


local function page(self, size, offset, options)
local schema = self.schema
local ws_id = ws(schema, options)
Expand All @@ -257,6 +267,9 @@ local function select(self, pk, options)
local ws_id = ws(schema, options)
local id = declarative_config.pk_string(schema, pk)
local key = schema.name .. ":" .. id .. ":::::" .. ws_id
if ws_id == "*" then
return select_by_ref(schema, key)
end
return select_by_key(schema, key)
end

Expand All @@ -270,19 +283,19 @@ local function select_by_field(self, field, value, options)
local schema = self.schema
local ws_id = ws(schema, options)

local key
local ref
if field ~= "cache_key" then
local unique_across_ws = schema.fields[field].unique_across_ws
-- only accept global query by field if field is unique across workspaces
assert(not options or options.workspace ~= null or unique_across_ws)

key = unique_field_key(schema.name, ws_id, field, value, unique_across_ws)
ref = unique_field_key(schema.name, ws_id, field, value, unique_across_ws)
else
-- if select_by_cache_key, use the provided cache_key as key directly
key = value
ref = value
end

return select_by_key(schema, key)
return select_by_ref(schema, ref)
end


Expand Down

0 comments on commit 683b06b

Please sign in to comment.