Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dao): add ability to select and delete expired entities #13296

Merged
merged 4 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ local workspaces = require "kong.workspaces"
local new_tab = require "table.new"
local DAO_MAX_TTL = require("kong.constants").DATABASE.DAO_MAX_TTL
local is_valid_uuid = require("kong.tools.uuid").is_valid_uuid
local deep_copy = require("kong.tools.table").deep_copy

local setmetatable = setmetatable
local tostring = tostring
Expand Down Expand Up @@ -290,6 +291,12 @@ local function validate_options_value(self, options)
end
end

if options.skip_ttl ~= nil then
if type(options.skip_ttl) ~= "boolean" then
errors.skip_ttl = "must be a boolean"
end
end

if next(errors) then
return nil, errors
end
Expand Down Expand Up @@ -896,8 +903,9 @@ local function generate_foreign_key_methods(schema)
return nil, err, err_t
end

local show_ws_id = { show_ws_id = true }
local entity, err, err_t = self["select_by_" .. name](self, unique_value, show_ws_id)
local select_options = deep_copy(options or {})
select_options["show_ws_id"] = true
local entity, err, err_t = self["select_by_" .. name](self, unique_value, select_options)
if err then
return nil, err, err_t
end
Expand All @@ -906,7 +914,7 @@ local function generate_foreign_key_methods(schema)
return true
end

local cascade_entries = find_cascade_delete_entities(self, entity, show_ws_id)
local cascade_entries = find_cascade_delete_entities(self, entity, select_options)

local ok, err_t = run_hook("dao:delete_by:pre",
entity,
Expand Down Expand Up @@ -1293,8 +1301,9 @@ function DAO:delete(pk_or_entity, options)
return nil, tostring(err_t), err_t
end

local show_ws_id = { show_ws_id = true }
local entity, err, err_t = self:select(primary_key, show_ws_id)
local select_options = deep_copy(options or {})
select_options["show_ws_id"] = true
local entity, err, err_t = self:select(primary_key, select_options)
if err then
return nil, err, err_t
end
Expand All @@ -1311,7 +1320,7 @@ function DAO:delete(pk_or_entity, options)
end
end

local cascade_entries = find_cascade_delete_entities(self, primary_key, show_ws_id)
local cascade_entries = find_cascade_delete_entities(self, primary_key, select_options)

local ws_id = entity.ws_id
local _
Expand Down
80 changes: 76 additions & 4 deletions kong/db/strategies/postgres/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ local function execute(strategy, statement_name, attributes, options)

local is_update = options and options.update
local has_ttl = strategy.schema.ttl

local skip_ttl = options and options.skip_ttl
if has_ws_id then
assert(ws_id == nil or type(ws_id) == "string")
argv[0] = escape_literal(connector, ws_id, "ws_id")
Expand All @@ -433,7 +433,7 @@ local function execute(strategy, statement_name, attributes, options)
for i = 1, argc do
local name = argn[i]
local value
if has_ttl and name == "ttl" then
if has_ttl and name == "ttl" and not skip_ttl then
value = (options and options.ttl)
and get_ttl_value(strategy, attributes, options)

Expand Down Expand Up @@ -576,7 +576,12 @@ end


function _mt:select(primary_key, options)
local res, err = execute(self, "select", self.collapse(primary_key), options)
local statement_name = "select"
if self.schema.ttl and options and options.skip_ttl then
statement_name = "select_skip_ttl"
end

local res, err = execute(self, statement_name, self.collapse(primary_key), options)
if res then
local row = res[1]
if row then
Expand All @@ -592,6 +597,11 @@ end

function _mt:select_by_field(field_name, unique_value, options)
local statement_name = "select_by_" .. field_name

if self.schema.ttl and options and options.skip_ttl then
statement_name = statement_name .. "_skip_ttl"
end

local filter = {
[field_name] = unique_value,
}
Expand Down Expand Up @@ -695,7 +705,11 @@ end


function _mt:delete(primary_key, options)
local res, err = execute(self, "delete", self.collapse(primary_key), options)
local statement_name = "delete"
if self.schema.ttl and options and options.skip_ttl then
statement_name = "delete_skip_ttl"
end
local res, err = execute(self, statement_name, self.collapse(primary_key), options)
if res then
if res.affected_rows == 0 then
return nil, nil
Expand All @@ -710,6 +724,9 @@ end

function _mt:delete_by_field(field_name, unique_value, options)
local statement_name = "delete_by_" .. field_name
if self.schema.ttl and options and options.skip_ttl then
statement_name = statement_name .. "_skip_ttl"
end
local filter = {
[field_name] = unique_value,
}
Expand Down Expand Up @@ -1189,6 +1206,19 @@ function _M.new(connector, schema, errors)
}
})

add_statement("delete_skip_ttl", {
operation = "write",
argn = primary_key_names,
argv = primary_key_args,
code = {
"DELETE\n",
" FROM ", table_name_escaped, "\n",
where_clause(
" WHERE ", "(" .. pk_escaped .. ") = (" .. primary_key_placeholders .. ")",
ws_id_select_where), ";"
}
})

add_statement("select", {
operation = "read",
expr = select_expressions,
Expand All @@ -1205,6 +1235,21 @@ function _M.new(connector, schema, errors)
}
})

add_statement("select_skip_ttl", {
operation = "read",
expr = select_expressions,
argn = primary_key_names,
argv = primary_key_args,
code = {
"SELECT ", select_expressions, "\n",
" FROM ", table_name_escaped, "\n",
where_clause(
" WHERE ", "(" .. pk_escaped .. ") = (" .. primary_key_placeholders .. ")",
ws_id_select_where),
" LIMIT 1;"
}
})

add_statement_for_export("page_first", {
operation = "read",
argn = { LIMIT },
Expand Down Expand Up @@ -1387,6 +1432,20 @@ function _M.new(connector, schema, errors)
},
})

add_statement("select_by_" .. field_name .. "_skip_ttl", {
operation = "read",
argn = single_names,
argv = single_args,
code = {
"SELECT ", select_expressions, "\n",
" FROM ", table_name_escaped, "\n",
where_clause(
" WHERE ", unique_escaped .. " = $1",
ws_id_select_where),
" LIMIT 1;"
},
})

local update_by_args_names = {}
for _, update_name in ipairs(update_names) do
insert(update_by_args_names, update_name)
Expand Down Expand Up @@ -1442,6 +1501,19 @@ function _M.new(connector, schema, errors)
ws_id_select_where), ";"
}
})

add_statement("delete_by_" .. field_name .. "_skip_ttl", {
operation = "write",
argn = single_names,
argv = single_args,
code = {
"DELETE\n",
" FROM ", table_name_escaped, "\n",
where_clause(
" WHERE ", unique_escaped .. " = $1",
ws_id_select_where), ";"
}
})
end
end

Expand Down
32 changes: 32 additions & 0 deletions spec/02-integration/03-db/14-dao_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ for _, strategy in helpers.all_strategies() do
"services",
"consumers",
"acls",
"keyauth_credentials",
})
_G.kong.db = db

Expand Down Expand Up @@ -98,6 +99,7 @@ for _, strategy in helpers.all_strategies() do
db.consumers:truncate()
db.plugins:truncate()
db.services:truncate()
db.keyauth_credentials:truncate()
end)

it("select_by_cache_key()", function()
Expand Down Expand Up @@ -185,6 +187,36 @@ for _, strategy in helpers.all_strategies() do
assert.same(new_plugin_config.config.redis.host, read_plugin.config.redis.host)
assert.same(new_plugin_config.config.redis.host, read_plugin.config.redis_host) -- legacy field is included
end)

it("keyauth_credentials can be deleted or selected before run ttl cleanup in background timer", function()
local key = uuid()
local original_keyauth_credentials = bp.keyauth_credentials:insert({
consumer = { id = consumer.id },
key = key,
}, { ttl = 5 })

-- wait for 5 seconds.
ngx.sleep(5)

-- select or delete keyauth_credentials after ttl expired.
local expired_keyauth_credentials
helpers.wait_until(function()
expired_keyauth_credentials = kong.db.keyauth_credentials:select_by_key(key)
return not expired_keyauth_credentials
end, 1)
assert.is_nil(expired_keyauth_credentials)
kong.db.keyauth_credentials:delete_by_key(key)

-- select or delete keyauth_credentials with skip_ttl=true after ttl expired.
expired_keyauth_credentials = kong.db.keyauth_credentials:select_by_key(key, { skip_ttl = true })
assert.not_nil(expired_keyauth_credentials)
assert.same(expired_keyauth_credentials.id, original_keyauth_credentials.id)
kong.db.keyauth_credentials:delete_by_key(key, { skip_ttl = true })

-- check again
expired_keyauth_credentials = kong.db.keyauth_credentials:select_by_key(key, { skip_ttl = true })
assert.is_nil(expired_keyauth_credentials)
end)
end)
end

Loading