Skip to content

Commit

Permalink
self:push_result
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw committed Dec 20, 2024
1 parent be7e356 commit 2c983c1
Showing 1 changed file with 34 additions and 14 deletions.
48 changes: 34 additions & 14 deletions kong/clustering/rpc/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ local utils = require("kong.clustering.rpc.utils")
local queue = require("kong.clustering.rpc.queue")
local jsonrpc = require("kong.clustering.rpc.json_rpc_v2")
local constants = require("kong.constants")
--local isarray = require("table.isarray")


local assert = assert
Expand Down Expand Up @@ -73,10 +74,10 @@ function _M._dispatch(premature, self, cb, payload)
return
end

res, err = self.outgoing:push(new_error(payload.id, jsonrpc.SERVER_ERROR,
err))
res, err = self:push_result(new_error(payload.id, jsonrpc.SERVER_ERROR, err),
"[rpc] unable to push RPC call error: ")
if not res then
ngx_log(ngx_WARN, "[rpc] unable to push RPC call error: ", err)
ngx_log(ngx_WARN, err)
end

return
Expand All @@ -89,17 +90,27 @@ function _M._dispatch(premature, self, cb, payload)
end

-- success
res, err = self.outgoing:push({
res, err = self:push_result({
jsonrpc = jsonrpc.VERSION,
id = payload.id,
result = res,
})
}, "[rpc] unable to push RPC call result: ")
if not res then
ngx_log(ngx_WARN, "[rpc] unable to push RPC call result: ", err)
ngx_log(ngx_WARN, err)
end
end


function _M:push_result(msg, err_prefix)
local res, err = self.outgoing:push(msg)
if not res then
return nil, err_prefix .. err
end

return true
end


-- start reader and writer thread and event loop
function _M:start()
self.read_thread = ngx.thread.spawn(function()
Expand All @@ -120,9 +131,9 @@ function _M:start()
end

if waited > CLUSTERING_PING_INTERVAL then
local res, err = self.outgoing:push(PING_TYPE)
local res, err = self:push_result(PING_TYPE, "unable to send ping: ")
if not res then
return nil, "unable to send ping: " .. err
return nil, err
end
end

Expand All @@ -133,9 +144,9 @@ function _M:start()
last_seen = ngx_time()

if typ == "ping" then
local res, err = self.outgoing:push(PONG_TYPE)
local res, err = self:push_result(PONG_TYPE, "unable to handle ping: ")
if not res then
return nil, "unable to handle ping: " .. err
return nil, err
end

goto continue
Expand All @@ -154,6 +165,13 @@ function _M:start()
assert(typ == "binary")

local payload = decompress_payload(data)

-- rpc batching
--if isarray(payload) then
-- for _, v in ipairs(payload) do
-- end
--end -- isarray

assert(payload.jsonrpc == jsonrpc.VERSION)

if payload.method then
Expand All @@ -163,9 +181,10 @@ function _M:start()

local dispatch_cb = self.manager.callbacks.callbacks[payload.method]
if not dispatch_cb and payload.id then
local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND))
local res, err = self:push_result(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND),
"unable to send \"METHOD_NOT_FOUND\" error back to client: ")
if not res then
return nil, "unable to send \"METHOD_NOT_FOUND\" error back to client: " .. err
return nil, err
end

goto continue
Expand All @@ -176,9 +195,10 @@ function _M:start()
self.node_id, payload.id or 0, payload.method),
0, _M._dispatch, self, dispatch_cb, payload)
if not res and payload.id then
local reso, erro = self.outgoing:push(new_error(payload.id, jsonrpc.INTERNAL_ERROR))
local reso, erro = self:push_result(new_error(payload.id, jsonrpc.INTERNAL_ERROR),
"unable to send \"INTERNAL_ERROR\" error back to client: ")
if not reso then
return nil, "unable to send \"INTERNAL_ERROR\" error back to client: " .. erro
return nil, erro
end

return nil, "unable to dispatch JSON-RPC callback: " .. err
Expand Down

0 comments on commit 2c983c1

Please sign in to comment.