From c90cb675d31345ae6d1c61725f08b46ea27524ec Mon Sep 17 00:00:00 2001 From: "Qirui(Keery) Nie" Date: Mon, 18 Sep 2023 11:16:10 +0800 Subject: [PATCH] fix(postgres): close socket actively when timeout happens during query (#11480) Currently, we do set/keep socket keepalive after every Postgres SQL query, based on keepalive timeout configured or lua_socket_keepalive_timeout(default 60s). This could go wrong under some cases, when a query encounters read timeout when trying to receive data from a database with high load, the query ends on Kong's side but the query result may be sent back after timeout happens, and the result data will be lingering inside the socket buffer, and the socket itself get reused for subsequent query, then the subsequent query might get the incorrect result from the previous query. The PR checks the query result's err string, and if any error happens, it'll try to close the socket actively so that the subsequent query will establish new clean ones. Fix FTI-5322 (cherry picked from commit d2da4dbb372db3687f1dfae33ba422c384b61024) --- CHANGELOG/unreleased/kong/11480.yaml | 7 ++++ kong/db/strategies/postgres/connector.lua | 34 ++++++++++++------ spec/02-integration/03-db/01-db_spec.lua | 44 +++++++++++++++++++++++ 3 files changed, 75 insertions(+), 10 deletions(-) create mode 100644 CHANGELOG/unreleased/kong/11480.yaml diff --git a/CHANGELOG/unreleased/kong/11480.yaml b/CHANGELOG/unreleased/kong/11480.yaml new file mode 100644 index 000000000000..96f396355587 --- /dev/null +++ b/CHANGELOG/unreleased/kong/11480.yaml @@ -0,0 +1,7 @@ +message: Fix a problem that abnormal socket connection will be reused when querying Postgres database. +type: bugfix +scope: Core +prs: + - 11480 +jiras: + - "FTI-5322" diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index cd3750ce7654..ba4b73ef9043 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -532,6 +532,7 @@ function _mt:query(sql, operation) operation = "write" end + local conn, is_new_conn local res, err, partial, num_queries local ok @@ -540,24 +541,37 @@ function _mt:query(sql, operation) return nil, "error acquiring query semaphore: " .. err end - local conn = self:get_stored_connection(operation) - if conn then - res, err, partial, num_queries = conn:query(sql) - - else - local connection + conn = self:get_stored_connection(operation) + if not conn then local config = operation == "write" and self.config or self.config_ro - connection, err = connect(config) - if not connection then + conn, err = connect(config) + if not conn then self:release_query_semaphore_resource(operation) return nil, err end + is_new_conn = true + end + + res, err, partial, num_queries = conn:query(sql) - res, err, partial, num_queries = connection:query(sql) + -- if err is string then either it is a SQL error + -- or it is a socket error, here we abort connections + -- that encounter errors instead of reusing them, for + -- safety reason + if err and type(err) == "string" then + ngx.log(ngx.DEBUG, "SQL query throw error: ", err, ", close connection") + local _, err = conn:disconnect() + if err then + -- We're at the end of the query - just logging if + -- we cannot cleanup the connection + ngx.log(ngx.ERR, "failed to disconnect: ", err) + end + self.store_connection(nil, operation) + elseif is_new_conn then local keepalive_timeout = self:get_keepalive_timeout(operation) - setkeepalive(connection, keepalive_timeout) + setkeepalive(conn, keepalive_timeout) end self:release_query_semaphore_resource(operation) diff --git a/spec/02-integration/03-db/01-db_spec.lua b/spec/02-integration/03-db/01-db_spec.lua index 91d9c7015880..d776af963396 100644 --- a/spec/02-integration/03-db/01-db_spec.lua +++ b/spec/02-integration/03-db/01-db_spec.lua @@ -401,6 +401,50 @@ for _, strategy in helpers.each_strategy() do end) end) + describe("#testme :query() [#" .. strategy .. "]", function() + lazy_setup(function() + helpers.get_db_utils(strategy, {}) + end) + + postgres_only("establish new connection when error occurred", function() + ngx.IS_CLI = false + + local conf = utils.cycle_aware_deep_copy(helpers.test_conf) + conf.pg_ro_host = conf.pg_host + conf.pg_ro_user = conf.pg_user + + local db, err = DB.new(conf, strategy) + + assert.is_nil(err) + assert.is_table(db) + assert(db:init_connector()) + assert(db:connect()) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + local old_conn = db.connector:get_stored_connection("write") + assert.not_nil(old_conn) + + local res, err = db.connector:query("SELECT * FROM not_exist_table;") + assert.is_nil(res) + assert.not_nil(err) + + local new_conn = db.connector:get_stored_connection("write") + assert.is_nil(new_conn) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + assert(db:close()) + end) + end) describe(":setkeepalive() [#" .. strategy .. "]", function() lazy_setup(function()