Skip to content

Commit

Permalink
Bump psycopg3 version && add timeouts on blocking functions
Browse files Browse the repository at this point in the history
  • Loading branch information
jmeunier28 committed Aug 7, 2023
1 parent 8bb548d commit c80c5e7
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protobuf==3.17.3; python_version < '3.0'
protobuf==3.20.2; python_version > '3.0'
psutil==5.9.0
psycopg2-binary==2.8.6; sys_platform != 'darwin' or platform_machine != 'arm64'
psycopg[binary]==3.1.9; python_version > '3.0'
psycopg[binary]==3.1.10; python_version > '3.0'
pyasn1==0.4.6
pycryptodomex==3.10.1
pydantic==2.0.2; python_version > '3.0'
Expand Down
10 changes: 8 additions & 2 deletions postgres/datadog_checks/postgres/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,16 @@ def prune_connections(self):
self._stats.connection_pruned += 1
self._terminate_connection_unsafe(dbname)

def close_all_connections(self):
def close_all_connections(self, timeout=None):
"""
Will block until all connections are terminated, unless the pre-configured timeout is hit
:param timeout:
:return:
"""
success = True
start_time = time.time()
with self._mu:
while self._conns:
while self._conns and (timeout is None or time.time() - start_time < timeout):
dbname = next(iter(self._conns))
if not self._terminate_connection_unsafe(dbname):
success = False
Expand Down
2 changes: 1 addition & 1 deletion postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ def get_main_db(self):
return conn

def _close_db_pool(self):
self.db_pool.close_all_connections()
self.db_pool.close_all_connections(timeout=self._config.min_collection_interval)

def _collect_custom_queries(self, tags):
"""
Expand Down
2 changes: 1 addition & 1 deletion postgres/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ deps = [
"boto3==1.27.0; python_version > '3.0'",
"cachetools==3.1.1; python_version < '3.0'",
"cachetools==5.3.1; python_version > '3.0'",
"psycopg[binary]==3.1.9; python_version > '3.0'",
"psycopg[binary]==3.1.10; python_version > '3.0'",
"semver==3.0.1; python_version > '3.0'",
]

Expand Down
12 changes: 6 additions & 6 deletions postgres/tests/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_conn_pool(pg_instance):
db = pool._get_connection_raw('postgres', 999 * 1000)
assert len(pool._conns) == 1
assert pool._stats.connection_opened == 2
success = pool.close_all_connections()
success = pool.close_all_connections(timeout=5)
assert success
assert len(pool._conns) == 0
assert pool._stats.connection_closed == 2
Expand Down Expand Up @@ -95,11 +95,11 @@ def exec_connection(pool, wg, dbname):
wg.add(1)
thread.start()
# wait for all connections to be opened
wg.wait()
wg.wait(timeout=5)
assert pool._stats.connection_opened == conn_count
assert len(get_activity(pool2, unique_id)) == conn_count

pool.close_all_connections()
pool.close_all_connections(timeout=5)
assert pool._stats.connection_closed == conn_count
assert pool._stats.connection_closed_failed == 0

Expand Down Expand Up @@ -149,7 +149,7 @@ def get_many_connections(count, ttl):
assert len(rows) == 1
assert list(rows[0].values())[0] == dbname

pool.close_all_connections()
pool.close_all_connections(timeout=5)

pool._stats.reset()

Expand Down Expand Up @@ -290,7 +290,7 @@ def pretend_to_run_query(pool, dbname):
thread.start()

# wait until all connections are opened and active
wg.wait()
wg.wait(timeout=5)
assert pool._stats.connection_opened == limit

# ask for one more connection
Expand All @@ -309,7 +309,7 @@ def pretend_to_run_query(pool, dbname):
assert pool._stats.connection_closed == 1

# close the rest
pool.close_all_connections()
pool.close_all_connections(timeout=5)
assert pool._stats.connection_closed == limit + 1


Expand Down
2 changes: 1 addition & 1 deletion postgres/tests/test_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ def execute_in_thread(q):

# wait for query to complete, but commit has not been called,
# so it should remain open and idle
wg.wait()
wg.wait(timeout=5)

# Wait collection interval to make sure dbm events are reported
time.sleep(dbm_instance['query_activity']['collection_interval'])
Expand Down
18 changes: 14 additions & 4 deletions postgres/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ def run_one_check(check, db_instance):
check.metadata_samples._job_loop_future.result()


# WaitGroup is used like go's sync.WaitGroup
# WaitGroup is used like go's sync.WaitGroup, but it supports a timeout
class WaitGroup(object):
def __init__(self):
self.count = 0
self.cv = threading.Condition()
self.timeout_event = threading.Event()

def add(self, n):
self.cv.acquire()
Expand All @@ -89,8 +90,17 @@ def done(self):
self.cv.notify_all()
self.cv.release()

def wait(self):
def wait(self, timeout=None):
self.cv.acquire()
while self.count > 0:
self.cv.wait()
if timeout is None:
while self.count > 0:
self.cv.wait()
else:
def timeout_callback():
self.timeout_event.set()
timer = threading.Timer(timeout, timeout_callback)
timer.start()
while self.count > 0 and not self.timeout_event.is_set():
self.cv.wait()
timer.cancel()
self.cv.release()

0 comments on commit c80c5e7

Please sign in to comment.