Skip to content

Commit

Permalink
feat(sync): full sync pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
StarlightIbuki committed Dec 9, 2024
1 parent c3a5ed6 commit c10dde3
Show file tree
Hide file tree
Showing 7 changed files with 580 additions and 0 deletions.
5 changes: 5 additions & 0 deletions kong-3.10.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ build = {
["kong.db.strategies.off.connector"] = "kong/db/strategies/off/connector.lua",
["kong.db.strategies.off.tags"] = "kong/db/strategies/off/tags.lua",

["kong.db.resumable_chunker] = "kong/db/resumable_chunker/init.lua",
["kong.db.resumable_chunker.chain] = "kong/db/resumable_chunker/chain.lua",
["kong.db.resumable_chunker.strategy] = "kong/db/resumable_chunker/strategy.lua",
["kong.db.resumable_chunker.utils] = "kong/db/resumable_chunker/utils.lua",

["kong.db.migrations.state"] = "kong/db/migrations/state.lua",
["kong.db.migrations.subsystems"] = "kong/db/migrations/subsystems.lua",
["kong.db.migrations.core"] = "kong/db/migrations/core/init.lua",
Expand Down
41 changes: 41 additions & 0 deletions kong/db/resumable_chunker/chain.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
local EMPTY = require("kong.tools.table").EMPTY
local inplace_merge = require("kong.db.resumable_chunker.utils").inplace_merge

local _M = {}
local _MT = { __index = _M }

local BEGIN = { 1, nil }

function _M.from_chain(list, options)
options = options or EMPTY
list.options = options
return setmetatable(list, _MT)
end

function _M:next(size, offset)
size = size or self.options.size
offset = offset or BEGIN
local ind, inner_ind = offset[1], offset[2]

if not self[ind] then
return EMPTY
end

local rows, len = nil, 0
repeat
local next_row, err
next_row, err, inner_ind = self[ind]:next(size - len, inner_ind)
if not next_row then
return nil, err, { ind, inner_ind }
end
rows, len = inplace_merge(rows, next_row)

if not inner_ind then -- end of the current chain. continue with the next one
ind = ind + 1
end
until len >= size or not self[ind]

return rows or EMPTY, nil, self[ind] and { ind, inner_ind } or nil
end

return _M
25 changes: 25 additions & 0 deletions kong/db/resumable_chunker/dao.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
local _M = {}

function _M.from_dao(dao, options)
return setmetatable({
dao = dao,
name = dao.schema.name,
options = options,
}, {
__index = _M,
})
end

function _M:next(size, offset)
local rows, err, err_t, offset = self.dao:page(size, offset, self.options)

if rows then
for _, row in ipairs(rows) do
row.__type = self.name
end
end

return rows, err, offset
end

return _M
54 changes: 54 additions & 0 deletions kong/db/resumable_chunker/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
local schema_topological_sort = require("kong.db.schema.topological_sort")
local from_chain = require("kong.db.resumable_chunker.chain").from_chain
local from_dao = require("kong.db.resumable_chunker.dao").from_dao
local inplace_merge = require("kong.db.resumable_chunker.utils").inplace_merge
local EMPTY = require("kong.tools.table").EMPTY


local _M = {}
local _MT = { __index = _M }

-- TODO: handling disabled entities
-- it may require a change to the dao or even the strategy (by filtering the rows when querying)
function _M.from_db(db, options)
options = options or EMPTY
local schemas, n = {}, 0

local skip_ws = options.skip_ws

for a, dao in pairs(db.daos) do
local schema = dao.schema
if schema.db_export ~= false and not (skip_ws and schema.name == "workspaces") then
n = n + 1
schemas[n] = schema
end
end

local sorted_schemas, err = schema_topological_sort(schemas)
if not sorted_schemas then
return nil, err
end

local sorted_daos = {}
for i, schema in ipairs(sorted_schemas) do
sorted_daos[i] = db.daos[schema.name]
end

return _M.from_daos(sorted_daos, options)
end

function _M.from_daos(sorted_daos, options)
options = options or EMPTY

local chains, n = {}, 0
for _, dao in ipairs(sorted_daos) do
local chain = from_dao(dao, options)
n = n + 1
chains[n] = chain
end

return from_chain(chains, options)
end


return _M
20 changes: 20 additions & 0 deletions kong/db/resumable_chunker/utils.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- to avoid unnecessary table creation
local function inplace_merge(lst, lst2)
if lst == nil then
return lst2, #lst2
end


local n = #lst
local m = #lst2
for i = 1, m do
n = n + 1
lst[n] = lst2[i]
end

return lst, n
end

return {
inplace_merge = inplace_merge,
}
184 changes: 184 additions & 0 deletions spec/01-unit/01-db/12-resumable_chunker_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
local resumable_chunker = require("kong.db.resumable_chunker")

local function insert_dao(db, daos, name)
local dao = {}
table.insert(daos, dao)
db[name] = dao
dao.schema = { name = name }
return dao
end

local function mock_field(db, daos, name, tbl)
local dao = insert_dao(db, daos, name)

local rows = {}
for _, row in ipairs(tbl) do
table.insert(rows, { field = row })
end

function dao.page(self, size, offset)
offset = offset or 1
local ret = {}
for i = 1, size do
local row = rows[offset]
if not row then
return ret, nil, nil, nil
end
ret[i] = row
offset = offset + 1
end

return ret, nil, nil, rows[offset] and offset or nil
end
end

local function mock_error_field(db, daos, name)
local dao = insert_dao(db, daos, name)

function dao.page(self, size, offset)
return nil, "error: " .. name
end
end

local function process_row(rows)
for i, row in ipairs(rows) do
rows[i] = row.field
end
return rows
end

describe("resumable_chunker.from_daos", function()
it("handling empty table", function ()
local db, daos = {}, {}
local chunker = resumable_chunker.from_daos(daos)
local rows, err, offset = chunker:next(1)
assert.same({}, rows)
assert.is_nil(err)
assert.is_nil(offset)

mock_field(db, daos, "field", {})
local chunker = resumable_chunker.from_daos(daos)
local rows, err, offset = chunker:next(1)
assert.same({}, rows)
assert.is_nil(err)
assert.is_nil(offset)
end)

it("handling exact size", function ()
local strategy = {}
local db, daos = {}, {}
mock_field(db, daos, "field", { 1, 2, 3, 4, 5 })
local chunker = resumable_chunker.from_daos(daos)
local rows, err, offset = chunker:next(5)
assert.are.same({ 1, 2, 3, 4, 5 }, process_row(rows))
assert.is_nil(err)
assert.is_nil(offset)
end)

it("handling less than size", function ()
local strategy = {}
local db, daos = {}, {}
mock_field(db, daos, "field", { 1, 2, 3, 4, 5 })
local chunker = resumable_chunker.from_daos(daos)
local rows, err, offset = chunker:next(6)
assert.are.same({ 1, 2, 3, 4, 5 }, process_row(rows))
assert.is_nil(err)
assert.is_nil(offset)
end)

it("handling more than size", function ()
local strategy = {}
local db, daos = {}, {}
mock_field(db, daos, "field", { 1, 2, 3, 4, 5 })
local chunker = resumable_chunker.from_daos(daos)
local rows, err, offset = chunker:next(4)
assert.are.same({ 1, 2, 3, 4 }, process_row(rows))
assert.is_nil(err)
assert.truthy(offset)
local rows, err, offset = chunker:next(4, offset)
assert.are.same({ 5 }, process_row(rows))
assert.is_nil(err)
assert.is_nil(offset)
end)

it("handling multiple table", function ()
local strategy = {}
local db, daos = {}, {}
mock_field(db, daos, "field1", { 1, 2, 3, 4, 5 })
mock_field(db, daos, "field2", { 6, 7, 8, 9, 10 })
local chunker = resumable_chunker.from_daos(daos)
local rows, err, offset = chunker:next(6)
assert.are.same({ 1, 2, 3, 4, 5, 6 }, process_row(rows))
assert.is_nil(err)
assert.truthy(offset)
local rows, err, offset = chunker:next(6, offset)
assert.are.same({ 7, 8, 9, 10 }, process_row(rows))
assert.is_nil(err)
assert.is_nil(offset)
end)

it("handling exhausted table", function ()
local strategy = {}
local db, daos = {}, {}
mock_field(db, daos, "field1", { 1, 2, 3, 4, 5 })
mock_field(db, daos, "field2", { 6, 7, 8, 9, 10 })
local chunker = resumable_chunker.from_daos(daos)
local rows, err, offset = chunker:next(11)
assert.are.same({ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, process_row(rows))
assert.is_nil(err)
assert.is_nil(offset)
end)

it("handling error", function ()
local strategy = {}
local db, daos = {}, {}
mock_field(db, daos, "field", { 1, 2, 3, 4, 5 })
mock_error_field(db, daos, "error")
local chunker = resumable_chunker.from_daos(daos)
local rows, err, offset = chunker:next(4)
assert.are.same({ 1, 2, 3, 4 }, process_row(rows))
assert.is_nil(err)
assert.truthy(offset)
local rows, err, offset = chunker:next(4, offset)
assert.is_nil(rows)
assert.are.same("error: error", err)
end)

it("resumable", function ()
local strategy = {}
local db, daos = {}, {}
mock_field(db, daos, "field1", { 1, 2, 3, 4, 5 })
mock_field(db, daos, "field2", { 6, 7, 8, 9, 10 })
local chunker = resumable_chunker.from_daos(daos)
local rows, err, offset = chunker:next(6)
assert.are.same({ 1, 2, 3, 4, 5, 6 }, process_row(rows))
assert.is_nil(err)
assert.truthy(offset)
local offset1 = offset
local rows, err, offset = chunker:next(6, offset)
assert.are.same({ 7, 8, 9, 10 }, process_row(rows))
assert.is_nil(err)
assert.is_nil(offset)
local rows, err, offset = chunker:next(5)
assert.are.same({ 1, 2, 3, 4, 5 }, process_row(rows))
assert.is_nil(err)
assert.truthy(offset)
local offset2 = offset
local rows, err, offset = chunker:next(6, offset)
assert.are.same({ 6, 7, 8, 9, 10 }, process_row(rows))
assert.is_nil(err)
assert.is_nil(offset)
local rows, err, offset = chunker:next(3, offset1)
assert.are.same({ 7, 8, 9 }, process_row(rows))
assert.is_nil(err)
assert.truthy(offset)
local rows, err, offset = chunker:next(3, offset2)
assert.are.same({ 6, 7, 8 }, process_row(rows))
assert.is_nil(err)
assert.truthy(offset)
local rows, err, offset = chunker:next(3, offset)
assert.are.same({ 9, 10 }, process_row(rows))
assert.is_nil(err)
assert.is_nil(offset)
end)
end)
Loading

0 comments on commit c10dde3

Please sign in to comment.