Skip to content

Commit

Permalink
fix(postgres): close socket actively when timeout happens during query (
Browse files Browse the repository at this point in the history
#11480)

Currently, we do set/keep socket keepalive after every Postgres SQL query, based on keepalive timeout configured or lua_socket_keepalive_timeout(default 60s).
This could go wrong under some cases, when a query encounters read timeout when trying to receive data from a database with high load, the query ends on Kong's side but the query result may be sent back after timeout happens, and the result data will be lingering inside the socket buffer, and the socket itself get reused for subsequent query, then the subsequent query might get the incorrect result from the previous query.

The PR checks the query result's err string, and if any error happens, it'll try to close the socket actively so that the subsequent query will establish new clean ones.

Fix FTI-5322

(cherry picked from commit d2da4db)
  • Loading branch information
windmgc committed Sep 18, 2023
1 parent a3b85c7 commit c90cb67
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 10 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG/unreleased/kong/11480.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
message: Fix a problem that abnormal socket connection will be reused when querying Postgres database.
type: bugfix
scope: Core
prs:
- 11480
jiras:
- "FTI-5322"
34 changes: 24 additions & 10 deletions kong/db/strategies/postgres/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ function _mt:query(sql, operation)
operation = "write"
end

local conn, is_new_conn
local res, err, partial, num_queries

local ok
Expand All @@ -540,24 +541,37 @@ function _mt:query(sql, operation)
return nil, "error acquiring query semaphore: " .. err
end

local conn = self:get_stored_connection(operation)
if conn then
res, err, partial, num_queries = conn:query(sql)

else
local connection
conn = self:get_stored_connection(operation)
if not conn then
local config = operation == "write" and self.config or self.config_ro

connection, err = connect(config)
if not connection then
conn, err = connect(config)
if not conn then
self:release_query_semaphore_resource(operation)
return nil, err
end
is_new_conn = true
end

res, err, partial, num_queries = conn:query(sql)

res, err, partial, num_queries = connection:query(sql)
-- if err is string then either it is a SQL error
-- or it is a socket error, here we abort connections
-- that encounter errors instead of reusing them, for
-- safety reason
if err and type(err) == "string" then
ngx.log(ngx.DEBUG, "SQL query throw error: ", err, ", close connection")
local _, err = conn:disconnect()
if err then
-- We're at the end of the query - just logging if
-- we cannot cleanup the connection
ngx.log(ngx.ERR, "failed to disconnect: ", err)
end
self.store_connection(nil, operation)

elseif is_new_conn then
local keepalive_timeout = self:get_keepalive_timeout(operation)
setkeepalive(connection, keepalive_timeout)
setkeepalive(conn, keepalive_timeout)
end

self:release_query_semaphore_resource(operation)
Expand Down
44 changes: 44 additions & 0 deletions spec/02-integration/03-db/01-db_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,50 @@ for _, strategy in helpers.each_strategy() do
end)
end)

describe("#testme :query() [#" .. strategy .. "]", function()
lazy_setup(function()
helpers.get_db_utils(strategy, {})
end)

postgres_only("establish new connection when error occurred", function()
ngx.IS_CLI = false

local conf = utils.cycle_aware_deep_copy(helpers.test_conf)
conf.pg_ro_host = conf.pg_host
conf.pg_ro_user = conf.pg_user

local db, err = DB.new(conf, strategy)

assert.is_nil(err)
assert.is_table(db)
assert(db:init_connector())
assert(db:connect())

local res, err = db.connector:query("SELECT now();")
assert.not_nil(res)
assert.is_nil(err)

local old_conn = db.connector:get_stored_connection("write")
assert.not_nil(old_conn)

local res, err = db.connector:query("SELECT * FROM not_exist_table;")
assert.is_nil(res)
assert.not_nil(err)

local new_conn = db.connector:get_stored_connection("write")
assert.is_nil(new_conn)

local res, err = db.connector:query("SELECT now();")
assert.not_nil(res)
assert.is_nil(err)

local res, err = db.connector:query("SELECT now();")
assert.not_nil(res)
assert.is_nil(err)

assert(db:close())
end)
end)

describe(":setkeepalive() [#" .. strategy .. "]", function()
lazy_setup(function()
Expand Down

0 comments on commit c90cb67

Please sign in to comment.