From 6c7f8512a4365166e4dd98986a64799d797514ad Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sat, 12 Oct 2024 11:57:18 +0800 Subject: [PATCH] clean sync postgres --- .../services/sync/strategies/postgres.lua | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua index 0abd2da3882a..7d35b30bc290 100644 --- a/kong/clustering/services/sync/strategies/postgres.lua +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -36,7 +36,7 @@ local PURGE_QUERY = [[ function _M:init_worker() - ngx.timer.every(CLEANUP_TIME_DELAY, function(premature) + local function cleanup_handler(premature) if premature then ngx_log(ngx_DEBUG, "[incremental] worker exiting, killing incremental cleanup timer") @@ -54,7 +54,9 @@ function _M:init_worker() ngx_log(ngx_DEBUG, "[incremental] successfully purged old data from incremental delta table") - end) + end + + assert(ngx.timer.every(CLEANUP_TIME_DELAY, cleanup_handler)) end @@ -74,13 +76,15 @@ local NEW_VERSION_QUERY = [[ -- { type = "route", "id" = "0a5bac5c-b795-4981-95d2-919ba3390b7e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", } -- } function _M:insert_delta(deltas) + local escape_literal = self.connector:escape_literal + local buf = buffer.new() for _, d in ipairs(deltas) do buf:putf("(new_version, %s, %s, %s, %s)", - self.connector:escape_literal(d.type), - self.connector:escape_literal(d.id), - self.connector:escape_literal(d.ws_id), - self.connector:escape_literal(cjson_encode(d.row))) + escape_literal(d.type), + escape_literal(d.id), + escape_literal(d.ws_id), + escape_literal(cjson_encode(d.row))) end local sql = string_format(NEW_VERSION_QUERY, buf:get()) @@ -97,7 +101,7 @@ function _M:get_latest_version() return nil, err end - return res[1].max_version + return res[1] and res[1].max_version end