diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index d596f80691b..2ebf4b6fcaa 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -187,11 +187,11 @@ function _M:connect(premature, node_id, host, path, cert, key) do local s = socket.new(self, c, node_id) + assert(s:start()) -- capability advertisement local fut = future.new(s, "kong.meta.v1.capability_advertisement", { self.callbacks:get_capabilities(), }) assert(fut:start()) - assert(s:start()) ok, err = fut:wait(5) if not ok then diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index b8715dc10c5..5c718693e0b 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -25,7 +25,9 @@ local ngx_time = ngx.time local ngx_log = ngx.log -local PING_WAIT = constants.CLUSTERING_PING_INTERVAL * 1.5 +local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL +local PING_WAIT = CLUSTERING_PING_INTERVAL * 1.5 +local PING_TYPE = "PING" local PONG_TYPE = "PONG" local ngx_WARN = ngx.WARN local ngx_DEBUG = ngx.DEBUG @@ -70,6 +72,13 @@ function _M:start() end local waited = ngx_time() - last_seen + if waited > CLUSTERING_PING_INTERVAL then + local res, err = self.outgoing:push(PING_TYPE) + if not res then + return nil, "unable to send ping: " .. err + end + end + if waited > PING_WAIT then return nil, "did not receive ping frame from other end within " .. PING_WAIT .. " seconds" @@ -90,6 +99,12 @@ function _M:start() goto continue end + if typ == "pong" then + ngx_log(ngx_DEBUG, "[rpc] got PONG frame") + + goto continue + end + if typ == "close" then return true end @@ -184,30 +199,33 @@ function _M:start() end if payload then - if payload == PONG_TYPE then - local _, err = self.wb:send_pong() + if payload == PING_TYPE then + local _, err = self.wb:send_ping() if err then - if not is_timeout(err) then - return nil, "failed to send pong frame to data plane: " .. err - end + return nil, "failed to send PING frame to peer: " .. err - ngx_log(ngx_WARN, "[rpc] failed to send PONG frame to peer: ", err) + else + ngx_log(ngx_DEBUG, "[rpc] sent PING frame to peer") + end + + elseif payload == PONG_TYPE then + local _, err = self.wb:send_pong() + if err then + return nil, "failed to send PONG frame to peer: " .. err else ngx_log(ngx_DEBUG, "[rpc] sent PONG frame to peer") end - -- pong ok - goto continue - end + else + assert(type(payload) == "table") - local bytes, err = self.wb:send_binary(cjson_encode(payload)) - if not bytes then - return nil, err + local bytes, err = self.wb:send_binary(cjson_encode(payload)) + if not bytes then + return nil, err + end end end - - ::continue:: end end)