From e4b21cd0b5bec691a2c2cc9fc53aa9e6205f4d52 Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Fri, 25 Oct 2024 13:43:36 +0000 Subject: [PATCH 1/2] feat: support native asyncpg connection pools --- README.md | 92 ++++++++++++++++++--- setup.py | 2 +- tests/system/test_asyncpg_connection.py | 101 +++++++++++++++++++++++- 3 files changed, 180 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index c3f4c47a..251d42ed 100644 --- a/README.md +++ b/README.md @@ -502,6 +502,8 @@ The `create_async_connector` allows all the same input arguments as the Once a `Connector` object is returned by `create_async_connector` you can call its `connect_async` method, just as you would the `connect` method: +#### SQLAlchemy Async Engine + ```python import asyncpg @@ -511,7 +513,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from google.cloud.sql.connector import Connector, create_async_connector async def init_connection_pool(connector: Connector) -> AsyncEngine: - # initialize Connector object for connections to Cloud SQL + # creation function to generate asyncpg connections as 'async_creator' arg async def getconn() -> asyncpg.Connection: conn: asyncpg.Connection = await connector.connect_async( "project:region:instance", # Cloud SQL instance connection name @@ -549,6 +551,40 @@ async def main(): await pool.dispose() ``` +#### Asyncpg Connection Pool + +```python +import asyncpg +from google.cloud.sql.connector import Connector, create_async_connector + +async def main(): + # initialize Connector object for connections to Cloud SQL + connector = create_async_connector() + + # creation function to generate asyncpg connections as the 'connect' arg + async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection: + return await connector.connect_async( + instance_connection_name, + "asyncpg", + user="my-user", + password="my-password", + db="my-db", + **kwargs, # ... additional asyncpg args + ) + + # initialize connection pool + pool = await asyncpg.create_pool( + "my-project:my-region:my-instance", connect=getconn + ) + + # acquire connection and query Cloud SQL database + async with pool.acquire() as conn: + res = await conn.fetch("SELECT NOW()") + + # close Connector + await connector.close_async() +``` + For more details on additional database arguments with an `asyncpg.Connection` , please visit the [official documentation](https://magicstack.github.io/asyncpg/current/api/index.html). @@ -564,6 +600,8 @@ calls to `connector.close_async()` to cleanup resources. > This alternative requires that the running event loop be > passed in as the `loop` argument to `Connector()`. +#### SQLAlchemy Async Engine + ```python import asyncio import asyncpg @@ -574,17 +612,17 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from google.cloud.sql.connector import Connector async def init_connection_pool(connector: Connector) -> AsyncEngine: - # initialize Connector object for connections to Cloud SQL + # creation function to generate asyncpg connections as 'async_creator' arg async def getconn() -> asyncpg.Connection: - conn: asyncpg.Connection = await connector.connect_async( - "project:region:instance", # Cloud SQL instance connection name - "asyncpg", - user="my-user", - password="my-password", - db="my-db-name" - # ... additional database driver args - ) - return conn + conn: asyncpg.Connection = await connector.connect_async( + "project:region:instance", # Cloud SQL instance connection name + "asyncpg", + user="my-user", + password="my-password", + db="my-db-name" + # ... additional database driver args + ) + return conn # The Cloud SQL Python Connector can be used along with SQLAlchemy using the # 'async_creator' argument to 'create_async_engine' @@ -609,6 +647,38 @@ async def main(): await pool.dispose() ``` +#### Asyncpg Connection Pool + +```python +import asyncpg +from google.cloud.sql.connector import Connector, create_async_connector + +async def main(): + # initialize Connector object for connections to Cloud SQL + loop = asyncio.get_running_loop() + async with Connector(loop=loop) as connector: + + # creation function to generate asyncpg connections as the 'connect' arg + async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection: + return await connector.connect_async( + instance_connection_name, + "asyncpg", + user="my-user", + password="my-password", + db="my-db", + **kwargs, # ... additional asyncpg args + ) + + # create connection pool + pool = await asyncpg.create_pool( + "my-project:my-region:my-instance", connect=getconn + ) + + # acquire connection and query Cloud SQL database + async with pool.acquire() as conn: + res = await conn.fetch("SELECT NOW()") +``` + ### Debug Logging The Cloud SQL Python Connector uses the standard [Python logging module][python-logging] diff --git a/setup.py b/setup.py index bdf7a27c..bb70449a 100644 --- a/setup.py +++ b/setup.py @@ -80,7 +80,7 @@ "pymysql": ["PyMySQL>=1.1.0"], "pg8000": ["pg8000>=1.31.1"], "pytds": ["python-tds>=1.15.0"], - "asyncpg": ["asyncpg>=0.29.0"], + "asyncpg": ["asyncpg>=0.30.0"], }, python_requires=">=3.9", include_package_data=True, diff --git a/tests/system/test_asyncpg_connection.py b/tests/system/test_asyncpg_connection.py index 20715c65..98a86a1a 100644 --- a/tests/system/test_asyncpg_connection.py +++ b/tests/system/test_asyncpg_connection.py @@ -16,7 +16,7 @@ import asyncio import os -from typing import Tuple +from typing import Any, Tuple import asyncpg import sqlalchemy @@ -88,7 +88,68 @@ async def getconn() -> asyncpg.Connection: return engine, connector -async def test_connection_with_asyncpg() -> None: +async def create_asyncpg_pool( + instance_connection_name: str, + user: str, + password: str, + db: str, + refresh_strategy: str = "background", +) -> Tuple[asyncpg.Pool, Connector]: + """Creates a native asyncpg connection pool for a Cloud SQL instance and + returns the pool and the connector. Callers are responsible for closing the + pool and the connector. + + A sample invocation looks like: + + pool, connector = await create_asyncpg_pool( + inst_conn_name, + user, + password, + db, + ) + async with pool.acquire() as conn: + hello = await conn.fetch("SELECT 'Hello World!'") + # do something with query result + await connector.close_async() + + Args: + instance_connection_name (str): + The instance connection name specifies the instance relative to the + project and region. For example: "my-project:my-region:my-instance" + user (str): + The database user name, e.g., postgres + password (str): + The database user's password, e.g., secret-password + db (str): + The name of the database, e.g., mydb + refresh_strategy (Optional[str]): + Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" + or "background". For serverless environments use "lazy" to avoid + errors resulting from CPU being throttled. + """ + loop = asyncio.get_running_loop() + connector = Connector(loop=loop, refresh_strategy=refresh_strategy) + + async def getconn( + instance_connection_name: str, **kwargs: Any + ) -> asyncpg.Connection: + conn: asyncpg.Connection = await connector.connect_async( + instance_connection_name, + "asyncpg", + user=user, + password=password, + db=db, + ip_type="public", # can also be "private" or "psc", + **kwargs + ) + return conn + + # create native asyncpg pool (requires asyncpg version >=0.30.0) + pool = await asyncpg.create_pool(instance_connection_name, connect=getconn) + return pool, connector + + +async def test_sqlalchemy_connection_with_asyncpg() -> None: """Basic test to get time from database.""" inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] user = os.environ["POSTGRES_USER"] @@ -104,7 +165,7 @@ async def test_connection_with_asyncpg() -> None: await connector.close_async() -async def test_lazy_connection_with_asyncpg() -> None: +async def test_lazy_sqlalchemy_connection_with_asyncpg() -> None: """Basic test to get time from database.""" inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] user = os.environ["POSTGRES_USER"] @@ -120,3 +181,37 @@ async def test_lazy_connection_with_asyncpg() -> None: assert res[0] == 1 await connector.close_async() + + +async def test_connection_with_asyncpg() -> None: + """Basic test to get time from database.""" + inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] + user = os.environ["POSTGRES_USER"] + password = os.environ["POSTGRES_PASS"] + db = os.environ["POSTGRES_DB"] + + pool, connector = await create_asyncpg_pool(inst_conn_name, user, password, db) + + async with pool.acquire() as conn: + res = await conn.fetch("SELECT 1") + assert res[0][0] == 1 + + await connector.close_async() + + +async def test_lazy_connection_with_asyncpg() -> None: + """Basic test to get time from database.""" + inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] + user = os.environ["POSTGRES_USER"] + password = os.environ["POSTGRES_PASS"] + db = os.environ["POSTGRES_DB"] + + pool, connector = await create_asyncpg_pool( + inst_conn_name, user, password, db, "lazy" + ) + + async with pool.acquire() as conn: + res = await conn.fetch("SELECT 1") + assert res[0][0] == 1 + + await connector.close_async() From 37649e9400e86fa0a6999f8b4a54a2a7bf395341 Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Fri, 25 Oct 2024 15:21:43 +0000 Subject: [PATCH 2/2] chore: lead with asyncpg pool usage over SQLAlchemy --- README.md | 132 +++++++++++++++++++++++++++--------------------------- 1 file changed, 66 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index 251d42ed..28553f97 100644 --- a/README.md +++ b/README.md @@ -502,6 +502,40 @@ The `create_async_connector` allows all the same input arguments as the Once a `Connector` object is returned by `create_async_connector` you can call its `connect_async` method, just as you would the `connect` method: +#### Asyncpg Connection Pool + +```python +import asyncpg +from google.cloud.sql.connector import Connector, create_async_connector + +async def main(): + # initialize Connector object for connections to Cloud SQL + connector = create_async_connector() + + # creation function to generate asyncpg connections as the 'connect' arg + async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection: + return await connector.connect_async( + instance_connection_name, + "asyncpg", + user="my-user", + password="my-password", + db="my-db", + **kwargs, # ... additional asyncpg args + ) + + # initialize connection pool + pool = await asyncpg.create_pool( + "my-project:my-region:my-instance", connect=getconn + ) + + # acquire connection and query Cloud SQL database + async with pool.acquire() as conn: + res = await conn.fetch("SELECT NOW()") + + # close Connector + await connector.close_async() +``` + #### SQLAlchemy Async Engine ```python @@ -551,40 +585,6 @@ async def main(): await pool.dispose() ``` -#### Asyncpg Connection Pool - -```python -import asyncpg -from google.cloud.sql.connector import Connector, create_async_connector - -async def main(): - # initialize Connector object for connections to Cloud SQL - connector = create_async_connector() - - # creation function to generate asyncpg connections as the 'connect' arg - async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection: - return await connector.connect_async( - instance_connection_name, - "asyncpg", - user="my-user", - password="my-password", - db="my-db", - **kwargs, # ... additional asyncpg args - ) - - # initialize connection pool - pool = await asyncpg.create_pool( - "my-project:my-region:my-instance", connect=getconn - ) - - # acquire connection and query Cloud SQL database - async with pool.acquire() as conn: - res = await conn.fetch("SELECT NOW()") - - # close Connector - await connector.close_async() -``` - For more details on additional database arguments with an `asyncpg.Connection` , please visit the [official documentation](https://magicstack.github.io/asyncpg/current/api/index.html). @@ -600,6 +600,38 @@ calls to `connector.close_async()` to cleanup resources. > This alternative requires that the running event loop be > passed in as the `loop` argument to `Connector()`. +#### Asyncpg Connection Pool + +```python +import asyncpg +from google.cloud.sql.connector import Connector, create_async_connector + +async def main(): + # initialize Connector object for connections to Cloud SQL + loop = asyncio.get_running_loop() + async with Connector(loop=loop) as connector: + + # creation function to generate asyncpg connections as the 'connect' arg + async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection: + return await connector.connect_async( + instance_connection_name, + "asyncpg", + user="my-user", + password="my-password", + db="my-db", + **kwargs, # ... additional asyncpg args + ) + + # create connection pool + pool = await asyncpg.create_pool( + "my-project:my-region:my-instance", connect=getconn + ) + + # acquire connection and query Cloud SQL database + async with pool.acquire() as conn: + res = await conn.fetch("SELECT NOW()") +``` + #### SQLAlchemy Async Engine ```python @@ -647,38 +679,6 @@ async def main(): await pool.dispose() ``` -#### Asyncpg Connection Pool - -```python -import asyncpg -from google.cloud.sql.connector import Connector, create_async_connector - -async def main(): - # initialize Connector object for connections to Cloud SQL - loop = asyncio.get_running_loop() - async with Connector(loop=loop) as connector: - - # creation function to generate asyncpg connections as the 'connect' arg - async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection: - return await connector.connect_async( - instance_connection_name, - "asyncpg", - user="my-user", - password="my-password", - db="my-db", - **kwargs, # ... additional asyncpg args - ) - - # create connection pool - pool = await asyncpg.create_pool( - "my-project:my-region:my-instance", connect=getconn - ) - - # acquire connection and query Cloud SQL database - async with pool.acquire() as conn: - res = await conn.fetch("SELECT NOW()") -``` - ### Debug Logging The Cloud SQL Python Connector uses the standard [Python logging module][python-logging]