diff --git a/kong/db/dao/init.lua b/kong/db/dao/init.lua index f347f03d932c..694dca28eac6 100644 --- a/kong/db/dao/init.lua +++ b/kong/db/dao/init.lua @@ -1462,6 +1462,29 @@ function DAO:row_to_entity(row, options) end +local function post_worker_events(event_data) + -- public worker events propagation + + local schema = event_data.schema + local entity_channel = schema.table or schema.name + local entity_operation_channel = fmt("%s:%s", entity_channel, event_data.operation) + + -- crud:routes + local ok, err = kong.worker_events.post_local("crud", entity_channel, event_data) + if not ok then + log(ERR, "[events] could not broadcast crud event: ", err) + return + end + + -- crud:routes:create + ok, err = kong.worker_events.post_local("crud", entity_operation_channel, event_data) + if not ok then + log(ERR, "[events] could not broadcast crud event: ", err) + return + end +end + + function DAO:post_crud_event(operation, entity, old_entity, options) invalidate(operation, options.workspace, self.schema.name, entity, old_entity) @@ -1470,23 +1493,27 @@ function DAO:post_crud_event(operation, entity, old_entity, options) return end - if self.events then - local entity_without_nulls - if entity then - entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(entity, true)) - end + local entity_without_nulls + if entity then + entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(entity, true)) + end - local old_entity_without_nulls - if old_entity then - old_entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(old_entity, true)) - end + local old_entity_without_nulls + if old_entity then + old_entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(old_entity, true)) + end - local ok, err = self.events.post_local("dao:crud", operation, { + if self.events then + local event_data = { operation = operation, schema = self.schema, entity = entity_without_nulls, old_entity = old_entity_without_nulls, - }) + } + + post_worker_events(event_data) + + local ok, err = self.events.post_local("dao:crud", operation, event_data) if not ok then log(ERR, "[db] failed to propagate CRUD operation: ", err) end diff --git a/kong/runloop/events.lua b/kong/runloop/events.lua index 6143df732435..2a33ee805714 100644 --- a/kong/runloop/events.lua +++ b/kong/runloop/events.lua @@ -162,47 +162,6 @@ local function cluster_balancer_upstreams_handler(data) end -local function dao_crud_handler(data) - local schema = data.schema - if not schema then - log(ERR, "[events] missing schema in crud subscriber") - return - end - - local operation = data.operation - if not operation then - log(ERR, "[events] missing operation in crud subscriber") - return - end - - -- public worker events propagation - - local entity_channel = schema.table or schema.name - local entity_operation_channel = fmt("%s:%s", entity_channel, operation) - - -- crud:routes - local ok, err = worker_events.post_local("crud", entity_channel, data) - if not ok then - log(ERR, "[events] could not broadcast crud event: ", err) - return - end - - -- crud:routes:create - ok, err = worker_events.post_local("crud", entity_operation_channel, data) - if not ok then - log(ERR, "[events] could not broadcast crud event: ", err) - return - end -end - - - - -local LOCAL_HANDLERS = { - { "dao:crud", nil , dao_crud_handler }, -} - - local BALANCER_HANDLERS = { { "crud" , "targets" , crud_targets_handler }, { "crud" , "upstreams" , crud_upstreams_handler }, @@ -232,17 +191,6 @@ local function subscribe_cluster_events(source, handler) end -local function register_local_events() - for _, v in ipairs(LOCAL_HANDLERS) do - local source = v[1] - local event = v[2] - local handler = v[3] - - subscribe_worker_events(source, event, handler) - end -end - - local function register_balancer_events() for _, v in ipairs(BALANCER_HANDLERS) do local source = v[1] @@ -268,9 +216,6 @@ local function register_for_db() cluster_events = kong.cluster_events -- events dispatcher - - register_local_events() - register_balancer_events() end