From c80c5e77a68f91b102cf24510c75014662398e50 Mon Sep 17 00:00:00 2001 From: Jo-Ann Meunier Date: Mon, 7 Aug 2023 11:13:00 -0400 Subject: [PATCH] Bump psycopg3 version && add timeouts on blocking functions --- .../base/data/agent_requirements.in | 2 +- .../datadog_checks/postgres/connections.py | 10 ++++++++-- postgres/datadog_checks/postgres/postgres.py | 2 +- postgres/pyproject.toml | 2 +- postgres/tests/test_connections.py | 12 ++++++------ postgres/tests/test_statements.py | 2 +- postgres/tests/utils.py | 18 ++++++++++++++---- 7 files changed, 32 insertions(+), 16 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/data/agent_requirements.in b/datadog_checks_base/datadog_checks/base/data/agent_requirements.in index 562bd8c86f12b..81a16972e538e 100644 --- a/datadog_checks_base/datadog_checks/base/data/agent_requirements.in +++ b/datadog_checks_base/datadog_checks/base/data/agent_requirements.in @@ -57,7 +57,7 @@ protobuf==3.17.3; python_version < '3.0' protobuf==3.20.2; python_version > '3.0' psutil==5.9.0 psycopg2-binary==2.8.6; sys_platform != 'darwin' or platform_machine != 'arm64' -psycopg[binary]==3.1.9; python_version > '3.0' +psycopg[binary]==3.1.10; python_version > '3.0' pyasn1==0.4.6 pycryptodomex==3.10.1 pydantic==2.0.2; python_version > '3.0' diff --git a/postgres/datadog_checks/postgres/connections.py b/postgres/datadog_checks/postgres/connections.py index eab57945bb899..fa56b2918ace5 100644 --- a/postgres/datadog_checks/postgres/connections.py +++ b/postgres/datadog_checks/postgres/connections.py @@ -171,10 +171,16 @@ def prune_connections(self): self._stats.connection_pruned += 1 self._terminate_connection_unsafe(dbname) - def close_all_connections(self): + def close_all_connections(self, timeout=None): + """ + Will block until all connections are terminated, unless the pre-configured timeout is hit + :param timeout: + :return: + """ success = True + start_time = time.time() with self._mu: - while self._conns: + while self._conns and (timeout is None or time.time() - start_time < timeout): dbname = next(iter(self._conns)) if not self._terminate_connection_unsafe(dbname): success = False diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index b105b9f241dc5..f23da48430bec 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -705,7 +705,7 @@ def get_main_db(self): return conn def _close_db_pool(self): - self.db_pool.close_all_connections() + self.db_pool.close_all_connections(timeout=self._config.min_collection_interval) def _collect_custom_queries(self, tags): """ diff --git a/postgres/pyproject.toml b/postgres/pyproject.toml index 3ebab24ffef69..a30d50f17ae2b 100644 --- a/postgres/pyproject.toml +++ b/postgres/pyproject.toml @@ -40,7 +40,7 @@ deps = [ "boto3==1.27.0; python_version > '3.0'", "cachetools==3.1.1; python_version < '3.0'", "cachetools==5.3.1; python_version > '3.0'", - "psycopg[binary]==3.1.9; python_version > '3.0'", + "psycopg[binary]==3.1.10; python_version > '3.0'", "semver==3.0.1; python_version > '3.0'", ] diff --git a/postgres/tests/test_connections.py b/postgres/tests/test_connections.py index 685019944fdd0..69310cceb64d3 100644 --- a/postgres/tests/test_connections.py +++ b/postgres/tests/test_connections.py @@ -49,7 +49,7 @@ def test_conn_pool(pg_instance): db = pool._get_connection_raw('postgres', 999 * 1000) assert len(pool._conns) == 1 assert pool._stats.connection_opened == 2 - success = pool.close_all_connections() + success = pool.close_all_connections(timeout=5) assert success assert len(pool._conns) == 0 assert pool._stats.connection_closed == 2 @@ -95,11 +95,11 @@ def exec_connection(pool, wg, dbname): wg.add(1) thread.start() # wait for all connections to be opened - wg.wait() + wg.wait(timeout=5) assert pool._stats.connection_opened == conn_count assert len(get_activity(pool2, unique_id)) == conn_count - pool.close_all_connections() + pool.close_all_connections(timeout=5) assert pool._stats.connection_closed == conn_count assert pool._stats.connection_closed_failed == 0 @@ -149,7 +149,7 @@ def get_many_connections(count, ttl): assert len(rows) == 1 assert list(rows[0].values())[0] == dbname - pool.close_all_connections() + pool.close_all_connections(timeout=5) pool._stats.reset() @@ -290,7 +290,7 @@ def pretend_to_run_query(pool, dbname): thread.start() # wait until all connections are opened and active - wg.wait() + wg.wait(timeout=5) assert pool._stats.connection_opened == limit # ask for one more connection @@ -309,7 +309,7 @@ def pretend_to_run_query(pool, dbname): assert pool._stats.connection_closed == 1 # close the rest - pool.close_all_connections() + pool.close_all_connections(timeout=5) assert pool._stats.connection_closed == limit + 1 diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 921a5f03372c7..f9a3cfa1d411c 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -1049,7 +1049,7 @@ def execute_in_thread(q): # wait for query to complete, but commit has not been called, # so it should remain open and idle - wg.wait() + wg.wait(timeout=5) # Wait collection interval to make sure dbm events are reported time.sleep(dbm_instance['query_activity']['collection_interval']) diff --git a/postgres/tests/utils.py b/postgres/tests/utils.py index b36cbf62369be..d8d7a12b95fc1 100644 --- a/postgres/tests/utils.py +++ b/postgres/tests/utils.py @@ -71,11 +71,12 @@ def run_one_check(check, db_instance): check.metadata_samples._job_loop_future.result() -# WaitGroup is used like go's sync.WaitGroup +# WaitGroup is used like go's sync.WaitGroup, but it supports a timeout class WaitGroup(object): def __init__(self): self.count = 0 self.cv = threading.Condition() + self.timeout_event = threading.Event() def add(self, n): self.cv.acquire() @@ -89,8 +90,17 @@ def done(self): self.cv.notify_all() self.cv.release() - def wait(self): + def wait(self, timeout=None): self.cv.acquire() - while self.count > 0: - self.cv.wait() + if timeout is None: + while self.count > 0: + self.cv.wait() + else: + def timeout_callback(): + self.timeout_event.set() + timer = threading.Timer(timeout, timeout_callback) + timer.start() + while self.count > 0 and not self.timeout_event.is_set(): + self.cv.wait() + timer.cancel() self.cv.release()