Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(balancer): record the cache flag of each connection which used during balancer retry #12760

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/unreleased/kong/recore-cached-in-retry.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: "**Balancer**: record the cache flag of each connection which used during balancer retry."
type: feature
scope: Core
5 changes: 5 additions & 0 deletions kong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1279,6 +1279,11 @@ function Kong.balancer()
local previous_try = tries[try_count - 1]
previous_try.state, previous_try.code = get_last_failure()

if ngx.config.subsystem == "http" then
tzssangglass marked this conversation as resolved.
Show resolved Hide resolved
local peer_conn = require "resty.kong.peer_conn"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any reason to require this library in runtime code path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require at the top of the file will cause it to fail when running it in the stream.

previous_try.cached = peer_conn.get_last_peer_connection_cached()
end

-- Report HTTP status for health checks
local balancer_instance = balancer_data.balancer
if balancer_instance then
Expand Down
158 changes: 158 additions & 0 deletions spec/02-integration/05-proxy/10-balancer/08-retry_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
local helpers = require "spec.helpers"
local cjson = require "cjson"

local function get_log(typ, n)
local entries
helpers.wait_until(function()
local client = assert(helpers.http_client(helpers.mock_upstream_host,
helpers.mock_upstream_port))
local res = client:get("/read_log/" .. typ, {
headers = {
Accept = "application/json"
}
})
local raw = assert.res_status(200, res)
local body = cjson.decode(raw)

entries = body.entries
return #entries > 0
end, 10)

if n then
tzssangglass marked this conversation as resolved.
Show resolved Hide resolved
assert(#entries == n, "expected " .. n .. " log entries, but got " .. #entries)
end
return entries
end

for _, strategy in helpers.each_strategy() do
describe("Balancer: respect max retries [#" .. strategy .. "]", function()
local service

lazy_setup(function()
local bp = helpers.get_db_utils(strategy, {
"routes",
"services",
"plugins",
})

service = bp.services:insert {
name = "retry_service",
host = "127.0.0.1",
port = 62351,
retries = 5,
}

local route = bp.routes:insert {
service = service,
paths = { "/hello" },
strip_path = false,
}

bp.plugins:insert {
route = { id = route.id },
name = "http-log",
config = {
queue = {
max_batch_size = 1,
max_coalescing_delay = 0.1,
},
http_endpoint = "http://" .. helpers.mock_upstream_host
.. ":"
.. helpers.mock_upstream_port
.. "/post_log/http"
}
}

local fixtures = {
http_mock = {}
}

fixtures.http_mock.my_server_block = [[
server {
listen 0.0.0.0:62351;
location /hello {
content_by_lua_block {
local request_counter = ngx.shared.request_counter
local first_request = request_counter:get("first_request")
if first_request == nil then
request_counter:set("first_request", "yes")
ngx.say("hello")
else
ngx.exit(ngx.HTTP_CLOSE)
end
}
}
}
]]

assert(helpers.start_kong({
database = strategy,
nginx_conf = "spec/fixtures/custom_nginx.template",
nginx_http_lua_shared_dict = "request_counter 1m",
log_level = "info",
}, nil, nil, fixtures))
end)

lazy_teardown(function()
helpers.stop_kong()
end)

it("exceeded limit", function()
local proxy_client1 = helpers.proxy_client()
local res = assert(proxy_client1:send {
method = "GET",
path = "/hello",
})

assert.res_status(200, res)

proxy_client1:close()

local proxy_client2 = helpers.proxy_client()

res = assert(proxy_client2:send {
method = "GET",
path = "/hello",
})

assert.res_status(502, res)

-- wait for the http-log plugin to flush the log
ngx.sleep(1)

local entries = get_log("http", 2)

-- first request successful, connection with upstream put into upstream connection pool.
assert.equal(#entries[1].tries, 1)
-- without balancer retry, so no cached flag
assert.equal(entries[1].tries[1].cached, nil)

-- second request
-- first balancer try reused the cached connection from upstream connection pool(put by first request) x1
-- but the upstream closed the connection to this cache, causing it to enter the balancer retry process
-- and performing an additional compensatory try

-- additional compensatory try x 1 (create new connection to upstream)
-- balancer retry x 5 (create new connection to upstream)

for i, try in ipairs(entries[2].tries) do
if i == 1 then
assert.equal(try.cached, true)
goto continue
end

-- last try without cached flag
if i == #entries[2].tries then
assert.equal(try.cached, nil)
goto continue
end

assert.equal(try.cached, false)
::continue::
end

assert.equal(#entries[2].tries, 7)
assert.equal(entries[2].upstream_status, "502, 502, 502, 502, 502, 502, 502")
end)
end)
end
Loading