Skip to content

Commit

Permalink
post worker events synchronously instead of from dao:crud handler
Browse files Browse the repository at this point in the history
  • Loading branch information
hanshuebner committed Nov 30, 2023
1 parent 36abf74 commit 2f8eb74
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 66 deletions.
49 changes: 38 additions & 11 deletions kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,29 @@ function DAO:row_to_entity(row, options)
end


local function post_worker_events(event_data)
-- public worker events propagation

local schema = event_data.schema
local entity_channel = schema.table or schema.name
local entity_operation_channel = fmt("%s:%s", entity_channel, event_data.operation)

-- crud:routes
local ok, err = kong.worker_events.post_local("crud", entity_channel, event_data)
if not ok then
log(ERR, "[events] could not broadcast crud event: ", err)
return
end

-- crud:routes:create
ok, err = kong.worker_events.post_local("crud", entity_operation_channel, event_data)
if not ok then
log(ERR, "[events] could not broadcast crud event: ", err)
return
end
end


function DAO:post_crud_event(operation, entity, old_entity, options)
invalidate(operation, options.workspace, self.schema.name, entity, old_entity)

Expand All @@ -1470,23 +1493,27 @@ function DAO:post_crud_event(operation, entity, old_entity, options)
return
end

if self.events then
local entity_without_nulls
if entity then
entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(entity, true))
end
local entity_without_nulls
if entity then
entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(entity, true))
end

local old_entity_without_nulls
if old_entity then
old_entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(old_entity, true))
end
local old_entity_without_nulls
if old_entity then
old_entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(old_entity, true))
end

local ok, err = self.events.post_local("dao:crud", operation, {
if self.events then
local event_data = {
operation = operation,
schema = self.schema,
entity = entity_without_nulls,
old_entity = old_entity_without_nulls,
})
}

post_worker_events(event_data)

local ok, err = self.events.post_local("dao:crud", operation, event_data)
if not ok then
log(ERR, "[db] failed to propagate CRUD operation: ", err)
end
Expand Down
55 changes: 0 additions & 55 deletions kong/runloop/events.lua
Original file line number Diff line number Diff line change
Expand Up @@ -162,47 +162,6 @@ local function cluster_balancer_upstreams_handler(data)
end


local function dao_crud_handler(data)
local schema = data.schema
if not schema then
log(ERR, "[events] missing schema in crud subscriber")
return
end

local operation = data.operation
if not operation then
log(ERR, "[events] missing operation in crud subscriber")
return
end

-- public worker events propagation

local entity_channel = schema.table or schema.name
local entity_operation_channel = fmt("%s:%s", entity_channel, operation)

-- crud:routes
local ok, err = worker_events.post_local("crud", entity_channel, data)
if not ok then
log(ERR, "[events] could not broadcast crud event: ", err)
return
end

-- crud:routes:create
ok, err = worker_events.post_local("crud", entity_operation_channel, data)
if not ok then
log(ERR, "[events] could not broadcast crud event: ", err)
return
end
end




local LOCAL_HANDLERS = {
{ "dao:crud", nil , dao_crud_handler },
}


local BALANCER_HANDLERS = {
{ "crud" , "targets" , crud_targets_handler },
{ "crud" , "upstreams" , crud_upstreams_handler },
Expand Down Expand Up @@ -232,17 +191,6 @@ local function subscribe_cluster_events(source, handler)
end


local function register_local_events()
for _, v in ipairs(LOCAL_HANDLERS) do
local source = v[1]
local event = v[2]
local handler = v[3]

subscribe_worker_events(source, event, handler)
end
end


local function register_balancer_events()
for _, v in ipairs(BALANCER_HANDLERS) do
local source = v[1]
Expand All @@ -268,9 +216,6 @@ local function register_for_db()
cluster_events = kong.cluster_events

-- events dispatcher

register_local_events()

register_balancer_events()
end

Expand Down

0 comments on commit 2f8eb74

Please sign in to comment.