From 7c7a2f6f23cea161bf19b48104002e3289357318 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Fri, 30 Aug 2024 14:07:55 -0400 Subject: [PATCH 1/2] feat(db): more parallel awareness for Postgres DB class also return if initialize()/close() actually init/closed the pool --- bento_lib/db/pg_async.py | 72 ++++++++++++++++++++++++++------ poetry.lock | 14 +++---- pyproject.toml | 2 +- tests/test_db.py | 90 +++++++++++++++++++++++++++++++++++----- 4 files changed, 146 insertions(+), 32 deletions(-) diff --git a/bento_lib/db/pg_async.py b/bento_lib/db/pg_async.py index 4d50f2b..9bc2c58 100644 --- a/bento_lib/db/pg_async.py +++ b/bento_lib/db/pg_async.py @@ -1,15 +1,21 @@ +import asyncio + import aiofiles import asyncpg import contextlib from pathlib import Path from typing import AsyncIterator - __all__ = [ + "PgAsyncDatabaseException", "PgAsyncDatabase", ] +class PgAsyncDatabaseException(Exception): + pass + + class PgAsyncDatabase: def __init__(self, db_uri: str, schema_path: Path): @@ -17,31 +23,71 @@ def __init__(self, db_uri: str, schema_path: Path): self._schema_path: Path = schema_path self._pool: asyncpg.Pool | None = None + self._pool_init_task: asyncio.Task | None = None + self._pool_closing_task: asyncio.Task | None = None - async def initialize(self, pool_size: int = 10): - conn: asyncpg.Connection + async def initialize(self, pool_size: int = 10) -> bool: + if self._pool_closing_task: + raise PgAsyncDatabaseException("Cannot open the pool while it is closing") if not self._pool: # Initialize the connection pool if needed - self._pool = await asyncpg.create_pool(self._db_uri, min_size=pool_size, max_size=pool_size) + if not self._pool_init_task: + async def _init(): + pool = await asyncpg.create_pool(self._db_uri, min_size=pool_size, max_size=pool_size) + + # If we freshly initialized the connection pool, connect to the database and execute the schema + # script. Don't use our own self.connect() method, since that'll end up in a circular task await. + async with aiofiles.open(self._schema_path, "r") as sf: + conn: asyncpg.Connection + async with pool.acquire() as conn: + async with conn.transaction(): + await conn.execute(await sf.read()) + + self._pool = pool + self._pool_init_task = None + + return True # Freshly initialized the pool + executed the schema script - # Connect to the database and execute the schema script - async with aiofiles.open(self._schema_path, "r") as sf: - async with self.connect() as conn: - async with conn.transaction(): - await conn.execute(await sf.read()) + self._pool_init_task = asyncio.create_task(_init()) + + # self._pool_init_task is now guaranteed to not be None - can be awaited + return await self._pool_init_task + + return False # Pool already initialized + + async def close(self) -> bool: + if self._pool_init_task: + raise PgAsyncDatabaseException("Cannot close the pool while it is opening") - async def close(self): if self._pool: - await self._pool.close() - self._pool = None + if not self._pool_closing_task: + async def _close(): + await self._pool.close() + # must come after the "await" in this function, so that we can properly re-use the task that is + # checked for IF self._pool is not None: + self._pool = None + self._pool_closing_task = None + return True # Just closed the pool + + self._pool_closing_task = asyncio.create_task(_close()) + + # self._pool_closing_task is now guaranteed to not be None - can be awaited + return await self._pool_closing_task + + return False # Pool already closed @contextlib.asynccontextmanager async def connect(self, existing_conn: asyncpg.Connection | None = None) -> AsyncIterator[asyncpg.Connection]: # TODO: raise raise DatabaseError("Pool is not available") when FastAPI has lifespan dependencies # + manage pool lifespan in lifespan fn. + # If we're currently closing, wait for closing to finish before trying to re-open + if self._pool_closing_task: + await self._pool_closing_task + if self._pool is None: - await self.initialize() # initialize if this is the first time we're using the pool + # initialize if this is the first time we're using the pool, or wait for existing initialization to finish: + await self.initialize() if existing_conn is not None: yield existing_conn diff --git a/poetry.lock b/poetry.lock index aae0ae6..c5912e9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -323,13 +323,13 @@ files = [ [[package]] name = "certifi" -version = "2024.7.4" +version = "2024.8.30" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.6" files = [ - {file = "certifi-2024.7.4-py3-none-any.whl", hash = "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"}, - {file = "certifi-2024.7.4.tar.gz", hash = "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"}, + {file = "certifi-2024.8.30-py3-none-any.whl", hash = "sha256:922820b53db7a7257ffbda3f597266d435245903d80737e34f8a45ff3e3230d8"}, + {file = "certifi-2024.8.30.tar.gz", hash = "sha256:bec941d2aa8195e248a60b31ff9f0558284cf01a52591ceda73ea9afffd69fd9"}, ] [[package]] @@ -2170,13 +2170,13 @@ urllib3 = ">=2" [[package]] name = "types-setuptools" -version = "73.0.0.20240822" +version = "74.0.0.20240830" description = "Typing stubs for setuptools" optional = false python-versions = ">=3.8" files = [ - {file = "types-setuptools-73.0.0.20240822.tar.gz", hash = "sha256:3a060681098eb3fbc2fea0a86f7f6af6aa1ca71906039d88d891ea2cecdd4dbf"}, - {file = "types_setuptools-73.0.0.20240822-py3-none-any.whl", hash = "sha256:b9eba9b68546031317a0fa506d4973641d987d74f79e7dd8369ad4f7a93dea17"}, + {file = "types-setuptools-74.0.0.20240830.tar.gz", hash = "sha256:2019cb0ef0fc11c3550946057e1fe78cb934284b856cec5f4ab45b43d9288685"}, + {file = "types_setuptools-74.0.0.20240830-py3-none-any.whl", hash = "sha256:3ccb05f218012b6d1643f15c3395fced6a290a32bc49338b0e93ece73585a5a2"}, ] [[package]] @@ -2389,4 +2389,4 @@ flask = ["flask"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "f200cb6724c656a96a0e8d8ccf4618c3b3e600fa4af7d20d15edaaaf05de455b" +content-hash = "6852f3d30571b20d67b107a43918c9552c3b0683e13e2ed45744583d992b7551" diff --git a/pyproject.toml b/pyproject.toml index f73d15e..c9a0a13 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,7 @@ flake8 = "^7.0.0" httpx = "^0.27.0" mypy = "~1.11.0" pytest = "^8.3.2" -pytest-asyncio = "^0.23.5" +pytest-asyncio = "^0.23.8" pytest-cov = "^5.0.0" pytest-django = "^4.8.0" python-dateutil = "^2.8.2" diff --git a/tests/test_db.py b/tests/test_db.py index c1863c7..f3603ca 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,8 +1,9 @@ +import asyncio import pathlib import asyncpg import pytest import pytest_asyncio -from bento_lib.db.pg_async import PgAsyncDatabase +from bento_lib.db.pg_async import PgAsyncDatabaseException, PgAsyncDatabase from typing import AsyncGenerator @@ -18,6 +19,14 @@ async def get_test_db() -> AsyncGenerator[PgAsyncDatabase, None]: db_fixture = pytest_asyncio.fixture(get_test_db, name="pg_async_db") +async def get_test_db_no_init() -> AsyncGenerator[PgAsyncDatabase, None]: + db_instance = PgAsyncDatabase("postgresql://postgres@localhost:5432/postgres", TEST_SCHEMA) + yield db_instance + + +db_fixture_no_init = pytest_asyncio.fixture(get_test_db_no_init, name="pg_async_db_no_init") + + @pytest_asyncio.fixture async def db_cleanup(pg_async_db: PgAsyncDatabase): yield @@ -27,28 +36,87 @@ async def db_cleanup(pg_async_db: PgAsyncDatabase): await pg_async_db.close() +@pytest_asyncio.fixture +async def db_cleanup_no_init(pg_async_db_no_init: PgAsyncDatabase): + yield + conn: asyncpg.Connection + async with pg_async_db_no_init.connect() as conn: + await conn.execute("DROP TABLE IF EXISTS test_table") + await pg_async_db_no_init.close() + + # noinspection PyUnusedLocal @pytest.mark.asyncio -async def test_pg_async_db_open_close(pg_async_db: PgAsyncDatabase, db_cleanup): - await pg_async_db.close() +async def test_pg_async_db_close_auto_open(pg_async_db: PgAsyncDatabase, db_cleanup): + r = await pg_async_db.close() + assert r # did in fact close the pool assert pg_async_db._pool is None # duplicate request: should be idempotent - await pg_async_db.close() + r = await pg_async_db.close() + assert not r # didn't close the pool, since it was already closed. assert pg_async_db._pool is None - # should not be able to connect conn: asyncpg.Connection async with pg_async_db.connect() as conn: assert pg_async_db._pool is not None # Connection auto-initialized async with pg_async_db.connect(existing_conn=conn) as conn2: assert conn == conn2 # Re-using existing connection should be possible - # try re-opening - await pg_async_db.initialize() - assert pg_async_db._pool is not None - old_pool = pg_async_db._pool + +@pytest.mark.asyncio +async def test_pg_async_db_open(pg_async_db_no_init: PgAsyncDatabase, db_cleanup_no_init): + # try opening + r = await pg_async_db_no_init.initialize(pool_size=1) + assert r + assert pg_async_db_no_init._pool is not None + old_pool = pg_async_db_no_init._pool # duplicate request: should be idempotent - await pg_async_db.initialize() - assert pg_async_db._pool == old_pool # same instance + r = await pg_async_db_no_init.initialize() + assert not r # didn't actually initialize the pool; re-used the old object + assert pg_async_db_no_init._pool == old_pool # same instance + + +@pytest.mark.asyncio +async def test_pg_async_db_parallel_open(pg_async_db_no_init: PgAsyncDatabase, db_cleanup): + # start opening in one coroutine, check with the other - should re-use task + c = pg_async_db_no_init.initialize(pool_size=1) + c2 = pg_async_db_no_init.initialize(pool_size=1) + assert await asyncio.gather(c, c2) == [True, True] + + +@pytest.mark.asyncio +async def test_pg_async_db_parallel_close(pg_async_db: PgAsyncDatabase, db_cleanup): + # start closing in one coroutine, check with the other - should re-use task + c = pg_async_db.close() + c2 = pg_async_db.close() + assert await asyncio.gather(c, c2) == [True, True] # should both internally use the same coroutine & return True + + +@pytest.mark.asyncio +async def test_pg_async_db_parallel_exc_close_while_opening(pg_async_db_no_init: PgAsyncDatabase, db_cleanup): + # while opening, try closing - should trigger error + with pytest.raises(PgAsyncDatabaseException) as e: + await asyncio.gather(pg_async_db_no_init.initialize(), pg_async_db_no_init.close()) + + assert str(e.value) == "Cannot close the pool while it is opening" + + +@pytest.mark.asyncio +async def test_pg_async_db_parallel_exc_open_while_closing(pg_async_db: PgAsyncDatabase, db_cleanup): + # while closing, try opening - should trigger error + with pytest.raises(PgAsyncDatabaseException) as e: + await asyncio.gather(pg_async_db.close(), pg_async_db.initialize()) + + assert str(e.value) == "Cannot open the pool while it is closing" + + +@pytest.mark.asyncio +async def test_pg_async_db_parallel_exc_close_then_connect(pg_async_db: PgAsyncDatabase, db_cleanup): + # connect should wait for the pool to close, then re-open it + async def _c(): + async with pg_async_db.connect(): + pass + await asyncio.gather(pg_async_db.close(), _c()) + assert pg_async_db._pool is not None From 4b39b677940f1ddaf4384c353eba55c096c04528 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Fri, 30 Aug 2024 14:08:22 -0400 Subject: [PATCH 2/2] chore: bump version to 12.2.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c9a0a13..1a2f01e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "bento-lib" -version = "12.1.1" +version = "12.2.0" description = "A set of common utilities and helpers for Bento platform services." authors = [ "David Lougheed ",