Skip to content

Commit

Permalink
save WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dndx committed Jan 7, 2024
1 parent cbc0c23 commit a542854
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 407 deletions.
272 changes: 56 additions & 216 deletions kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ local yield = require("kong.tools.yield").yield
local marshall = require("kong.db.declarative.marshaller").marshall
local schema_topological_sort = require("kong.db.schema.topological_sort")
local nkeys = require("table.nkeys")
local sha256_hex = require("kong.tools.utils").sha256_hex
local pk_string = declarative_config.pk_string

local assert = assert
local sort = table.sort
local type = type
local pairs = pairs
local next = next
local insert = table.insert
local string_format = string.format
local null = ngx.null
local get_phase = ngx.get_phase

Expand Down Expand Up @@ -120,21 +123,26 @@ local function get_current_hash()
end


local function find_default_ws(entities)
local function find_ws(entities, name)
for _, v in pairs(entities.workspaces or {}) do
if v.name == "default" then
if v.name == name then
return v.id
end
end
end


local function unique_field_key(schema_name, ws_id, field, value, unique_across_ws)
if unique_across_ws then
ws_id = ""
local function unique_field_key(schema_name, ws_id, field, value)
return string_format("%s|%s|%s|%s", schema_name, ws_id, field, sha256_hex(value))
end


local function foreign_field_key(schema_name, ws_id, field, foreign_id, pk)
if pk then
return string_format("%s|%s|%s|%s|%s", schema_name, ws_id, field, foreign_id, pk)
end

return schema_name .. "|" .. ws_id .. "|" .. field .. ":" .. value
return string_format("%s|%s|%s|%s|", schema_name, ws_id, field, foreign_id)
end


Expand All @@ -158,102 +166,40 @@ end
-- _transform: true,
-- }
local function load_into_cache(entities, meta, hash)
-- Array of strings with this format:
-- "<tag_name>|<entity_name>|<uuid>".
-- For example, a service tagged "admin" would produce
-- "admin|services|<the service uuid>"
local tags = {}
meta = meta or {}
local default_workspace_id = assert(find_ws(entities, "default"))
local should_transform = meta._transform == nil and true or meta._transform

local default_workspace = assert(find_default_ws(entities))
local fallback_workspace = default_workspace

assert(type(fallback_workspace) == "string")
assert(type(default_workspace_id) == "string")

if not hash or hash == "" or config_is_empty(entities) then
hash = DECLARATIVE_EMPTY_CONFIG_HASH
end

-- Keys: tag name, like "admin"
-- Values: array of encoded tags, similar to the `tags` variable,
-- but filtered for a given tag
local tags_by_name = {}

local db = kong.db

local t = txn.begin(128)
local t = txn.begin(512)
t:db_drop(false)

local phase = get_phase()
local transform = meta._transform == nil and true or meta._transform

for entity_name, items in pairs(entities) do
yield(false, phase)

local dao = db[entity_name]
local dao = kong.db[entity_name]
if not dao then
return nil, "unknown entity: " .. entity_name
end
local schema = dao.schema

-- Keys: tag_name, eg "admin"
-- Values: dictionary of keys associated to this tag,
-- for a specific entity type
-- i.e. "all the services associated to the 'admin' tag"
-- The ids are keys, and the values are `true`
local taggings = {}

local uniques = {}
local page_for = {}
local foreign_fields = {}
for fname, fdata in schema:each_field() do
local is_foreign = fdata.type == "foreign"
local fdata_reference = fdata.reference

if fdata.unique then
if is_foreign then
if #db[fdata_reference].schema.primary_key == 1 then
insert(uniques, fname)
end

else
insert(uniques, fname)
end
end
if is_foreign then
page_for[fdata_reference] = {}
foreign_fields[fname] = fdata_reference
end
end

local keys_by_ws = {
-- map of keys for global queries
["*"] = {}
}
for id, item in pairs(items) do
-- When loading the entities, when we load the default_ws, we
-- set it to the current. But this only works in the worker that
-- is doing the loading (0), other ones still won't have it
local ws_id = default_workspace_id

yield(true, phase)

assert(type(fallback_workspace) == "string")

local ws_id = ""
if schema.workspaceable then
local item_ws_id = item.ws_id
if item_ws_id == null or item_ws_id == nil then
item_ws_id = fallback_workspace
end
item.ws_id = item_ws_id
ws_id = item_ws_id
if schema.workspaceable and item.ws_id == null or item.ws_id == nil then
item.ws_id = ws_id
end

assert(type(ws_id) == "string")

local cache_key = dao:cache_key(id, nil, nil, nil, nil, item.ws_id)
local pk = pk_string(schema, item)

local item_key = string_format("%s|%s|*|%s", entity_name, ws_id, pk)

item = remove_nulls(item)

if transform then
local err
item, err = schema:transform(item)
Expand All @@ -267,156 +213,51 @@ local function load_into_cache(entities, meta, hash)
return nil, err
end

t:set(cache_key, item_marshalled)

local global_query_cache_key = dao:cache_key(id, nil, nil, nil, nil, "*")
t:set(global_query_cache_key, item_marshalled)

-- insert individual entry for global query
insert(keys_by_ws["*"], cache_key)

-- insert individual entry for workspaced query
if ws_id ~= "" then
keys_by_ws[ws_id] = keys_by_ws[ws_id] or {}
local keys = keys_by_ws[ws_id]
insert(keys, cache_key)
end
t:set(item_key, item_marshalled)

-- select_by_cache_key
if schema.cache_key then
local cache_key = dao:cache_key(item)
t:set(cache_key, item_marshalled)
end

for i = 1, #uniques do
local unique = uniques[i]
local unique_key = item[unique]
if unique_key then
if type(unique_key) == "table" then
local _
-- this assumes that foreign keys are not composite
_, unique_key = next(unique_key)
end

local key = unique_field_key(entity_name, ws_id, unique, unique_key,
schema.fields[unique].unique_across_ws)

t:set(key, item_marshalled)
end
end

for fname, ref in pairs(foreign_fields) do
local item_fname = item[fname]
if item_fname then
local fschema = db[ref].schema

local fid = declarative_config.pk_string(fschema, item_fname)

-- insert paged search entry for global query
page_for[ref]["*"] = page_for[ref]["*"] or {}
page_for[ref]["*"][fid] = page_for[ref]["*"][fid] or {}
insert(page_for[ref]["*"][fid], cache_key)

-- insert paged search entry for workspaced query
page_for[ref][ws_id] = page_for[ref][ws_id] or {}
page_for[ref][ws_id][fid] = page_for[ref][ws_id][fid] or {}
insert(page_for[ref][ws_id][fid], cache_key)
end
local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key)
t:set(key, item_key)
end

local item_tags = item.tags
if item_tags then
local ws = schema.workspaceable and ws_id or ""
for i = 1, #item_tags do
local tag_name = item_tags[i]
insert(tags, tag_name .. "|" .. entity_name .. "|" .. id)

tags_by_name[tag_name] = tags_by_name[tag_name] or {}
insert(tags_by_name[tag_name], tag_name .. "|" .. entity_name .. "|" .. id)

taggings[tag_name] = taggings[tag_name] or {}
taggings[tag_name][ws] = taggings[tag_name][ws] or {}
taggings[tag_name][ws][cache_key] = true
end
end
end

for ws_id, keys in pairs(keys_by_ws) do
local entity_prefix = entity_name .. "|" .. (schema.workspaceable and ws_id or "")

local keys, err = marshall(keys)
if not keys then
return nil, err
end

t:set(entity_prefix .. "|@list", keys)

for ref, wss in pairs(page_for) do
local fids = wss[ws_id]
if fids then
for fid, entries in pairs(fids) do
local key = entity_prefix .. "|" .. ref .. "|" .. fid .. "|@list"
for fname, fdata in schema:each_field() do
local is_foreign = fdata.type == "foreign"
local fdata_reference = fdata.reference
local value = item[fname]

if value then
if fdata.unique then
-- unique and not a foreign key, or is a foreign key, but non-composite
if type(value) == "table" then
assert(is_foreign)
value = pk_string(kong.db[fdata_reference].schema, value)
end

local entries, err = marshall(entries)
if not entries then
return nil, err
if fdata.unique_across_ws then
ws_id = default_workspace_id
end

t:set(key, entries)
end
end
end
end
local key = unique_field_key(entity_name, ws_id, fname, value)
t:set(key, item_key)

-- taggings:admin|services|ws_id|@list -> uuids of services tagged "admin" on workspace ws_id
for tag_name, workspaces_dict in pairs(taggings) do
for ws_id, keys_dict in pairs(workspaces_dict) do
local key = "taggings:" .. tag_name .. "|" .. entity_name .. "|" .. ws_id .. "|@list"

-- transform the dict into a sorted array
local arr = {}
local len = 0
for id in pairs(keys_dict) do
len = len + 1
arr[len] = id
end
-- stay consistent with pagination
sort(arr)
elseif is_foreign then
-- not unique and is foreign, generate page_for_foo indexes
assert(type(value) == "table")
value = pk_string(kong.db[fdata_reference].schema, value)

local arr, err = marshall(arr)
if not arr then
return nil, err
local key = foreign_field_key(entity_name, ws_id, fname, value, pk)
t:set(key, item_key)
end
end

t:set(key, arr)
end
end
end

for tag_name, tags in pairs(tags_by_name) do
yield(true, phase)

-- tags:admin|@list -> all tags tagged "admin", regardless of the entity type
-- each tag is encoded as a string with the format "admin|services|uuid", where uuid is the service uuid
local key = "tags:" .. tag_name .. "|@list"
local tags, err = marshall(tags)
if not tags then
return nil, err
end

t:set(key, tags)
end

-- tags||@list -> all tags, with no distinction of tag name or entity type.
-- each tag is encoded as a string with the format "admin|services|uuid", where uuid is the service uuid
local tags, err = marshall(tags)
if not tags then
return nil, err
end

t:set("tags||@list", tags)
t:set(DECLARATIVE_HASH_KEY, hash)

kong.default_workspace = default_workspace
kong.default_workspace = default_workspace_id

local ok, err = t:commit()
if not ok then
Expand All @@ -426,9 +267,7 @@ local function load_into_cache(entities, meta, hash)
kong.core_cache:purge()
kong.cache:purge()

yield(false, phase)

return true, nil, default_workspace
return true, nil, default_workspace_id
end


Expand Down Expand Up @@ -537,6 +376,7 @@ end
return {
get_current_hash = get_current_hash,
unique_field_key = unique_field_key,
foreign_field_key = foreign_field_key,

load_into_db = load_into_db,
load_into_cache = load_into_cache,
Expand Down
1 change: 1 addition & 0 deletions kong/db/declarative/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ _M.sanitize_output = declarative_export.sanitize_output
-- import
_M.get_current_hash = declarative_import.get_current_hash
_M.unique_field_key = declarative_import.unique_field_key
_M.foreign_field_key = declarative_import.foreign_field_key
_M.load_into_db = declarative_import.load_into_db
_M.load_into_cache = declarative_import.load_into_cache
_M.load_into_cache_with_events = declarative_import.load_into_cache_with_events
Expand Down
Loading

0 comments on commit a542854

Please sign in to comment.