Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support native asyncpg connection pools #1182

Merged
merged 2 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 81 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,42 @@ The `create_async_connector` allows all the same input arguments as the
Once a `Connector` object is returned by `create_async_connector` you can call
its `connect_async` method, just as you would the `connect` method:

#### Asyncpg Connection Pool

```python
import asyncpg
from google.cloud.sql.connector import Connector, create_async_connector

async def main():
# initialize Connector object for connections to Cloud SQL
connector = create_async_connector()

# creation function to generate asyncpg connections as the 'connect' arg
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect_async(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
**kwargs, # ... additional asyncpg args
)

# initialize connection pool
pool = await asyncpg.create_pool(
"my-project:my-region:my-instance", connect=getconn
)

# acquire connection and query Cloud SQL database
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")

# close Connector
await connector.close_async()
```

#### SQLAlchemy Async Engine

```python
import asyncpg

Expand All @@ -511,7 +547,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.sql.connector import Connector, create_async_connector

async def init_connection_pool(connector: Connector) -> AsyncEngine:
# initialize Connector object for connections to Cloud SQL
# creation function to generate asyncpg connections as 'async_creator' arg
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect_async(
"project:region:instance", # Cloud SQL instance connection name
Expand Down Expand Up @@ -564,6 +600,40 @@ calls to `connector.close_async()` to cleanup resources.
> This alternative requires that the running event loop be
> passed in as the `loop` argument to `Connector()`.

#### Asyncpg Connection Pool

```python
import asyncpg
from google.cloud.sql.connector import Connector, create_async_connector

async def main():
# initialize Connector object for connections to Cloud SQL
loop = asyncio.get_running_loop()
async with Connector(loop=loop) as connector:

# creation function to generate asyncpg connections as the 'connect' arg
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect_async(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
**kwargs, # ... additional asyncpg args
)

# create connection pool
pool = await asyncpg.create_pool(
"my-project:my-region:my-instance", connect=getconn
)

# acquire connection and query Cloud SQL database
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")
```

#### SQLAlchemy Async Engine

```python
import asyncio
import asyncpg
Expand All @@ -574,17 +644,17 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.sql.connector import Connector

async def init_connection_pool(connector: Connector) -> AsyncEngine:
# initialize Connector object for connections to Cloud SQL
# creation function to generate asyncpg connections as 'async_creator' arg
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect_async(
"project:region:instance", # Cloud SQL instance connection name
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn
conn: asyncpg.Connection = await connector.connect_async(
"project:region:instance", # Cloud SQL instance connection name
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn

# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"pymysql": ["PyMySQL>=1.1.0"],
"pg8000": ["pg8000>=1.31.1"],
"pytds": ["python-tds>=1.15.0"],
"asyncpg": ["asyncpg>=0.29.0"],
"asyncpg": ["asyncpg>=0.30.0"],
},
python_requires=">=3.9",
include_package_data=True,
Expand Down
101 changes: 98 additions & 3 deletions tests/system/test_asyncpg_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import asyncio
import os
from typing import Tuple
from typing import Any, Tuple

import asyncpg
import sqlalchemy
Expand Down Expand Up @@ -88,7 +88,68 @@ async def getconn() -> asyncpg.Connection:
return engine, connector


async def test_connection_with_asyncpg() -> None:
async def create_asyncpg_pool(
instance_connection_name: str,
user: str,
password: str,
db: str,
refresh_strategy: str = "background",
) -> Tuple[asyncpg.Pool, Connector]:
"""Creates a native asyncpg connection pool for a Cloud SQL instance and
returns the pool and the connector. Callers are responsible for closing the
pool and the connector.

A sample invocation looks like:

pool, connector = await create_asyncpg_pool(
inst_conn_name,
user,
password,
db,
)
async with pool.acquire() as conn:
hello = await conn.fetch("SELECT 'Hello World!'")
# do something with query result
await connector.close_async()

Args:
instance_connection_name (str):
The instance connection name specifies the instance relative to the
project and region. For example: "my-project:my-region:my-instance"
user (str):
The database user name, e.g., postgres
password (str):
The database user's password, e.g., secret-password
db (str):
The name of the database, e.g., mydb
refresh_strategy (Optional[str]):
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy"
or "background". For serverless environments use "lazy" to avoid
errors resulting from CPU being throttled.
"""
loop = asyncio.get_running_loop()
connector = Connector(loop=loop, refresh_strategy=refresh_strategy)

async def getconn(
instance_connection_name: str, **kwargs: Any
) -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect_async(
instance_connection_name,
"asyncpg",
user=user,
password=password,
db=db,
ip_type="public", # can also be "private" or "psc",
**kwargs
)
return conn

# create native asyncpg pool (requires asyncpg version >=0.30.0)
pool = await asyncpg.create_pool(instance_connection_name, connect=getconn)
return pool, connector


async def test_sqlalchemy_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
user = os.environ["POSTGRES_USER"]
Expand All @@ -104,7 +165,7 @@ async def test_connection_with_asyncpg() -> None:
await connector.close_async()


async def test_lazy_connection_with_asyncpg() -> None:
async def test_lazy_sqlalchemy_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
user = os.environ["POSTGRES_USER"]
Expand All @@ -120,3 +181,37 @@ async def test_lazy_connection_with_asyncpg() -> None:
assert res[0] == 1

await connector.close_async()


async def test_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
user = os.environ["POSTGRES_USER"]
password = os.environ["POSTGRES_PASS"]
db = os.environ["POSTGRES_DB"]

pool, connector = await create_asyncpg_pool(inst_conn_name, user, password, db)

async with pool.acquire() as conn:
res = await conn.fetch("SELECT 1")
assert res[0][0] == 1

await connector.close_async()


async def test_lazy_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
user = os.environ["POSTGRES_USER"]
password = os.environ["POSTGRES_PASS"]
db = os.environ["POSTGRES_DB"]

pool, connector = await create_asyncpg_pool(
inst_conn_name, user, password, db, "lazy"
)

async with pool.acquire() as conn:
res = await conn.fetch("SELECT 1")
assert res[0][0] == 1

await connector.close_async()
Loading