diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 7881b1661ffe..ceaa781507b9 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -5,7 +5,6 @@ local _MT = { __index = _M, } local server = require("resty.websocket.server") local client = require("resty.websocket.client") local socket = require("kong.clustering.rpc.socket") -local concentrator = require("kong.clustering.rpc.concentrator") local future = require("kong.clustering.rpc.future") local utils = require("kong.clustering.rpc.utils") local callbacks = require("kong.clustering.rpc.callbacks") @@ -42,7 +41,6 @@ function _M.new(conf, node_id) -- clients[node_id]: { socket1 => true, socket2 => true, ... } clients = {}, client_capabilities = {}, - client_ips = {}, -- store DP node's ip addr node_id = node_id, conf = conf, cluster_cert = assert(clustering_tls.get_cluster_cert(conf)), @@ -50,7 +48,10 @@ function _M.new(conf, node_id) callbacks = callbacks.new(), } - self.concentrator = concentrator.new(self, kong.db) + if conf.role == "control_plane" then + self.concentrator = require("kong.clustering.rpc.concentrator").new(self, kong.db) + self.client_ips = {}, -- store DP node's ip addr + end return setmetatable(self, _MT) end @@ -59,7 +60,10 @@ end function _M:_add_socket(socket, capabilities_list) local sockets = self.clients[socket.node_id] if not sockets then - assert(self.concentrator:_enqueue_subscribe(socket.node_id)) + if self.concentrator then + assert(self.concentrator:_enqueue_subscribe(socket.node_id)) + end + sockets = setmetatable({}, { __mode = "k", }) self.clients[socket.node_id] = sockets end @@ -87,7 +91,10 @@ function _M:_remove_socket(socket) self.clients[node_id] = nil self.client_ips[node_id] = nil self.client_capabilities[node_id] = nil - assert(self.concentrator:_enqueue_unsubscribe(node_id)) + + if self.concentrator then + assert(self.concentrator:_enqueue_unsubscribe(node_id)) + end end end