From 78577cca8c717e7561e59385e23ec979f0de3ec8 Mon Sep 17 00:00:00 2001 From: Chrono Date: Fri, 29 Nov 2024 14:51:42 +0800 Subject: [PATCH] refactor(clustreing/rpc): constant for json rpc version (#13902) https://konghq.atlassian.net/browse/KAG-5870 --- kong/clustering/rpc/concentrator.lua | 8 ++++---- kong/clustering/rpc/future.lua | 3 ++- kong/clustering/rpc/json_rpc_v2.lua | 4 +++- kong/clustering/rpc/manager.lua | 9 +++++---- kong/clustering/rpc/socket.lua | 6 +++--- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index c1370a58e19..68bb0bc3388 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -116,7 +116,7 @@ function _M:_event_loop(lconn) if n.channel == rpc_resp_channel_name then -- an response for a previous RPC call we asked for local payload = cjson_decode(n.payload) - assert(payload.jsonrpc == "2.0") + assert(payload.jsonrpc == jsonrpc.VERSION) -- response local cb = self.interest[payload.id] @@ -158,7 +158,7 @@ function _M:_event_loop(lconn) if res then -- call success res, err = self:_enqueue_rpc_response(reply_to, { - jsonrpc = "2.0", + jsonrpc = jsonrpc.VERSION, id = payload.id, result = res, }) @@ -169,7 +169,7 @@ function _M:_event_loop(lconn) else -- call failure res, err = self:_enqueue_rpc_response(reply_to, { - jsonrpc = "2.0", + jsonrpc = jsonrpc.VERSION, id = payload.id, error = { code = jsonrpc.SERVER_ERROR, @@ -292,7 +292,7 @@ function _M:call(node_id, method, params, callback) self.interest[id] = callback return self:_enqueue_rpc_request(node_id, { - jsonrpc = "2.0", + jsonrpc = jsonrpc.VERSION, method = method, params = params, id = id, diff --git a/kong/clustering/rpc/future.lua b/kong/clustering/rpc/future.lua index 230d8bf0998..68ed82720f0 100644 --- a/kong/clustering/rpc/future.lua +++ b/kong/clustering/rpc/future.lua @@ -3,6 +3,7 @@ local _MT = { __index = _M, } local semaphore = require("ngx.semaphore") +local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local STATE_NEW = 1 @@ -34,7 +35,7 @@ function _M:start() self.state = STATE_IN_PROGRESS local callback = function(resp) - assert(resp.jsonrpc == "2.0") + assert(resp.jsonrpc == jsonrpc.VERSION) if resp.result then -- succeeded diff --git a/kong/clustering/rpc/json_rpc_v2.lua b/kong/clustering/rpc/json_rpc_v2.lua index 7d155e314a3..8bb6fa11e71 100644 --- a/kong/clustering/rpc/json_rpc_v2.lua +++ b/kong/clustering/rpc/json_rpc_v2.lua @@ -9,6 +9,8 @@ local _M = { INVALID_PARAMS = -32602, INTERNAL_ERROR = -32603, SERVER_ERROR = -32000, + + VERSION = "2.0", } @@ -40,7 +42,7 @@ function _M.new_error(id, code, msg) end return { - jsonrpc = "2.0", + jsonrpc = _M.VERSION, id = id, error = { code = code, diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 2d2d22901ad..c3925c5073c 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -7,6 +7,7 @@ local client = require("resty.websocket.client") local socket = require("kong.clustering.rpc.socket") local future = require("kong.clustering.rpc.future") local utils = require("kong.clustering.rpc.utils") +local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local callbacks = require("kong.clustering.rpc.callbacks") local clustering_tls = require("kong.clustering.tls") local constants = require("kong.constants") @@ -154,7 +155,7 @@ function _M:_handle_meta_call(c) end local payload = cjson_decode(data) - assert(payload.jsonrpc == "2.0") + assert(payload.jsonrpc == jsonrpc.VERSION) if payload.method ~= RPC_MATA_V1 .. ".hello" then return nil, "wrong RPC meta call: " .. tostring(payload.method) @@ -181,7 +182,7 @@ function _M:_handle_meta_call(c) assert(type(info.kong_conf) == "table") local payload = { - jsonrpc = "2.0", + jsonrpc = jsonrpc.VERSION, result = { rpc_capabilities = self.callbacks:get_capabilities_list(), -- now we only support snappy @@ -232,7 +233,7 @@ function _M:_meta_call(c, meta_cap, node_id) } local payload = { - jsonrpc = "2.0", + jsonrpc = jsonrpc.VERSION, method = meta_cap .. ".hello", params = { info }, id = 1, @@ -253,7 +254,7 @@ function _M:_meta_call(c, meta_cap, node_id) end local payload = cjson_decode(data) - assert(payload.jsonrpc == "2.0") + assert(payload.jsonrpc == jsonrpc.VERSION) -- now we only support snappy if payload.result.rpc_frame_encoding ~= RPC_SNAPPY_FRAMED then diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index ca8a80d622c..045ca8c7557 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -79,7 +79,7 @@ function _M._dispatch(premature, self, cb, payload) -- success res, err = self.outgoing:push({ - jsonrpc = "2.0", + jsonrpc = jsonrpc.VERSION, id = payload.id, result = res, }) @@ -143,7 +143,7 @@ function _M:start() assert(typ == "binary") local payload = decompress_payload(data) - assert(payload.jsonrpc == "2.0") + assert(payload.jsonrpc == jsonrpc.VERSION) if payload.method then -- invoke @@ -276,7 +276,7 @@ function _M:call(node_id, method, params, callback) self.interest[id] = callback return self.outgoing:push({ - jsonrpc = "2.0", + jsonrpc = jsonrpc.VERSION, method = method, params = params, id = id,