Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update docs to showcase cleanup of Connector object #914

Open
lauraseidler opened this issue Nov 24, 2023 · 13 comments
Open

Update docs to showcase cleanup of Connector object #914

lauraseidler opened this issue Nov 24, 2023 · 13 comments
Assignees
Labels
priority: p2 Moderately-important priority. Fix may not be included in next release. type: cleanup An internal cleanup or hygiene concern. type: docs Improvement to the documentation for an API.

Comments

@lauraseidler
Copy link

Bug Description

We use the connector with IAM Auth + Cloud SQL for Postgres. It generally works okay, but we are occasionally seeing errors on shut down of our application server that look like this:

ERROR:google.cloud.sql.connector.instance:['XXX']: An error occurred while performing refresh. Scheduling another refresh attempt immediately

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/google/cloud/sql/connector/instance.py", line 376, in _refresh_task
    refresh_data = await refresh_task
  File "/usr/local/lib/python3.10/site-packages/google/cloud/sql/connector/instance.py", line 311, in _perform_refresh
    metadata = await metadata_task
  File "/usr/local/lib/python3.10/site-packages/google/cloud/sql/connector/refresh_utils.py", line 106, in _get_metadata
    resp = await client_session.get(url, headers=headers, raise_for_status=True)
  File "/usr/local/lib/python3.10/site-packages/aiohttp/client.py", line 586, in _request
    await resp.start(conn)
  File "/usr/local/lib/python3.10/site-packages/aiohttp/client_reqrep.py", line 905, in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
  File "/usr/local/lib/python3.10/site-packages/aiohttp/streams.py", line 616, in read
    await self._waiter
aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe

ERROR:asyncio:Task exception was never retrieved

future: <Task finished name='Task-507' coro=<Instance._schedule_refresh.<locals>._refresh_task() done, defined at /usr/local/lib/python3.10/site-packages/google/cloud/sql/connector/instance.py:365> exception=ClientOSError(32, 'Broken pipe')>

The exception itself may vary - mostly aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe, but we've also seen aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected.

To me, this looks like an async task is not checked for exceptions, even though at least the one in the referenced line looks okay:

refresh_data = await refresh_task

Since this means these errors are only logged when the application shuts down, this makes it somewhat hard to debug what's causing these connection issues, and if they are causing actual issues or the connection is re-established successfully.

Example code (or command)

No response

Stacktrace

No response

Steps to reproduce?

  1. Use Python connector as documented here https://cloud.google.com/sql/docs/postgres/iam-logins#log-in-with-automatic
  2. Run SQL queries for a while (we're doing it within Cloud Run, and it's only reproducible under load, not locally)
  3. Shut down application

Environment

  1. OS type and version: Debian 11.7 (python:3.10.11-slim docker image)
  2. Python version: 3.10.11
  3. Cloud SQL Python Connector version: 1.4.3

Additional Details

No response

@lauraseidler lauraseidler added the type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. label Nov 24, 2023
@jackwotherspoon
Copy link
Collaborator

Hi @lauraseidler, thanks for opening an issue on the Cloud SQL Python Connector 😄

Let me see if I am understanding the issue correctly...

we are occasionally seeing errors on shut down of our application server

So all the queries are running successfully without any errors, you are only seeing the errors surfaced when the application shuts down? This would probably hint at the Connector not cleaning up its resources gracefully, hence the tasks not being cancelled or awaited. There are two ways to explicitly cleanup resources and make sure the Connector exits gracefully.

  1. You can call connector.close() yourself on shutdown.

Details in our README:
image

  1. You can leverage using the Connector as a context manager for it to clean up itself.

Details in our README:

# initialize Cloud SQL Python Connector as context manager
with Connector() as connector:

When you say "our application shuts down" can you provide me a bit more details into what this looks like specifically for your use-case, do you just mean when Cloud Run scales down? This will be useful for me to try and reproduce the issue you are seeing?

Let me know if this helps and if it does I can update the sample or documentation as needed. 😄

@jackwotherspoon jackwotherspoon added the priority: p2 Moderately-important priority. Fix may not be included in next release. label Nov 24, 2023
@lauraseidler
Copy link
Author

Hi @jackwotherspoon,

I think we're currently not explicitly (or implicitly) closing the connector object, only the connection itself. I hadn't really checked the README that far (my bad), as the first part looked identical to the GCP documentation - but that one doesn't mention this part, so we never included it. It sounds like this might indeed be the issue and it would make sense to me, so I will try and see if it changes things and report back, thanks!

When you say "our application shuts down" can you provide me a bit more details into what this looks like specifically for your use-case, do you just mean when Cloud Run scales down? This will be useful for me to try and reproduce the issue you are seeing?

Yes, when Cloud Run scales down. This is especially noticeable when we roll out a new version and the old version has been running for a while, and errors have "accumulated" over multiple instances that get scaled down in rapid succession.

@jackwotherspoon
Copy link
Collaborator

Hi @lauraseidler, let me know if closing the Connector resolves the error messages you were previously seeing. 👍

as the first part looked identical to the GCP documentation

If closing the Connector does resolve the issue I will update this issue to track updating the code sample used in the docs to include closing the Connector so that this is not encountered in the future by other users 😄

@NickNaskida
Copy link

NickNaskida commented Dec 1, 2023

@jackwotherspoon Hello, I am having similar issues with my FastAPI application.

Task exception was never retrieved
future: <Task finished name='Task-6523' coro=<Instance._schedule_refresh.._refresh_task() done, defined at /usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py:378> exception=TimeoutError()>
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py", line 389, in _refresh_task
refresh_data = await refresh_task
^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py", line 313, in _perform_refresh
metadata = await metadata_task
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/refresh_utils.py", line 103, in _get_metadata
resp = await client_session.get(url, headers=headers, raise_for_status=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/aiohttp/client.py", line 467, in _request
with timer:
File "/usr/local/lib/python3.11/site-packages/aiohttp/helpers.py", line 721, in exit
raise asyncio.TimeoutError from None
TimeoutError

and

[2023-12-01 07:07:10,627] ERROR logger=asyncio module=base_events func=default_exception_handler() L1771 message=Task exception was never retrieved
future: <Task finished name='Task-7093' coro=<Instance._schedule_refresh.._refresh_task() done, defined at /usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py:378> exception=RetryError(<Future at 0x3e8b0c830a10 state=finished raised SSLError>)>
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py", line 389, in _refresh_task
refresh_data = await refresh_task
^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py", line 313, in _perform_refresh
metadata = await metadata_task
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/refresh_utils.py", line 103, in _get_metadata
resp = await client_session.get(url, headers=headers, raise_for_status=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/aiohttp/client.py", line 467, in _request
with timer:
File "/usr/local/lib/python3.11/site-packages/aiohttp/helpers.py", line 721, in exit
raise asyncio.TimeoutError from None
TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 467, in _make_request
self._validate_conn(conn)
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 1096, in _validate_conn
conn.connect()
File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 642, in connect
sock_and_verified = _ssl_wrap_socket_and_match_hostname(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 782, in ssl_wrap_socket_and_match_hostname
ssl_sock = ssl_wrap_socket(
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/util/ssl
.py", line 470, in ssl_wrap_socket
ssl_sock = ssl_wrap_socket_impl(sock, context, tls_in_tls, server_hostname)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/util/ssl
.py", line 514, in _ssl_wrap_socket_impl
return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/ssl.py", line 517, in wrap_socket
return self.sslsocket_class._create(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/ssl.py", line 1108, in _create
self.do_handshake()
File "/usr/local/lib/python3.11/ssl.py", line 1379, in do_handshake
self._sslobj.do_handshake()
ssl.SSLEOFError: [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1006)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 790, in urlopen
response = self._make_request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 491, in _make_request
raise new_e
urllib3.exceptions.SSLError: [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1006)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/requests/adapters.py", line 486, in send
resp = conn.urlopen(
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 844, in urlopen

and

[2023-12-01 05:56:47,122] ERROR logger=asyncio module=base_events func=default_exception_handler() L1771 message=Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x3e8b0d31dbd0>

Here is my code for db connector and session:

from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio.session import AsyncSession

import asyncpg
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.sql.connector import Connector, create_async_connector

from src.core.config import settings


async def init_connection_pool(connector: Connector) -> AsyncEngine:
    async def getconn() -> asyncpg.Connection:
        conn: asyncpg.Connection = await connector.connect_async(
            settings.POSTGRES_CONN_NAME,
            "asyncpg",
            user=settings.POSTGRES_USER,
            password=settings.POSTGRES_PASSWORD,
            db=settings.POSTGRES_DB,
        )
        return conn

    pool = create_async_engine(
        "postgresql+asyncpg://",
        async_creator=getconn,
        pool_size=20,
        max_overflow=10,
        pool_timeout=10,
        pool_recycle=1200,
    )
    return pool


async def get_session():
    # initialize Connector object for connections to Cloud SQL
    connector = await create_async_connector()

    # initialize connection pool
    engine = await init_connection_pool(connector)

    async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()

I am running this app on Cloud Run and receive a lot of these errors. Is there any way to get rid of them?

@jackwotherspoon
Copy link
Collaborator

@NickNaskida Yes it seems you are running into the same issue where the Connector is not cleaning itself up properly.

To close the Connector properly when configured for an async driver you can do one of two things.

  1. Call connector.close_async() (when instantiating Connector via create_async_connector)
async def get_session():
    # initialize Connector object for connections to Cloud SQL
    connector = await create_async_connector()

    # initialize connection pool
    engine = await init_connection_pool(connector)

    async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()
    # explicitly close connector
    await connector.close_async()
  1. Use Connector as async context manager to have implicit clean up
async def get_session():
    # initialize Connector as async context manager
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        # initialize connection pool
        engine = await init_connection_pool(connector)

        async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
        async with async_session() as session:
            try:
                yield session
            finally:
                await session.close()

@jackwotherspoon jackwotherspoon added type: cleanup An internal cleanup or hygiene concern. type: docs Improvement to the documentation for an API. and removed type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Dec 1, 2023
@jackwotherspoon jackwotherspoon changed the title Async task exceptions not always handled Update docs to showcase cleanup of Connector object Dec 1, 2023
@jackwotherspoon
Copy link
Collaborator

I've updated this issue to now reflect updating the code samples to properly document cleaning up the Connector object. Please provide additional details if the above comments turn out not to resolve the underlying issues.

@NickNaskida
Copy link

NickNaskida commented Dec 2, 2023

@jackwotherspoon thanks for the quick response. I applied your changes yesterday and some of the issues were resolved, however, this error was logged out today

[2023-12-02 08:00:11,804] ERROR logger=asyncio module=base_events func=default_exception_handler() L1771 message=Task exception was never retrieved
future: <Task finished name='Task-2881' coro=<_get_ephemeral() done, defined at /usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/refresh_utils.py:128> exception=ClientOSError(1, '[SSL: APPLICATION_DATA_AFTER_CLOSE_NOTIFY] application data after close notify (_ssl.c:2706)')>
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/refresh_utils.py", line 201, in _get_ephemeral
resp = await client_session.post(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/aiohttp/client.py", line 560, in _request
await resp.start(conn)
File "/usr/local/lib/python3.11/site-packages/aiohttp/client_reqrep.py", line 899, in start
message, payload = await protocol.read() # type: ignore[union-attr]
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 616, in read
await self._waiter
aiohttp.client_exceptions.ClientOSError: [Errno 1] [SSL: APPLICATION_DATA_AFTER_CLOSE_NOTIFY] application data after close notify (_ssl.c:2706)

My aiohttp version is 3.8.5. Let me know if you need any more details


UPDATE: I upgraded aiohttp to version 3.9.1 and it seems that issue has gone away. Will report if it will appear in next days

@NickNaskida
Copy link

NickNaskida commented Dec 3, 2023

Hey @jackwotherspoon, so the issue that I posted above still exists even after updating aiohttp. I use the first approach that you suggested above (connector.close_async())

I think this issue is somehow also related to too many idle connections on my database. I currently have this problem that I didn't manage to solve. Because of this I usually get this error: remaining connection slots are reserved for non-replication superuser connections.

my engine config

pool = create_async_engine(
    "postgresql+asyncpg://",
    async_creator=getconn,
    pool_size=5,
    max_overflow=10,
    pool_timeout=10,
    pool_recycle=1200,
)
return pool

I believe this happens because the get_session function is called on every request in my app, and because of it engine is recreated every time:

async def get_session():
    # initialize Connector object for connections to Cloud SQL
    connector = await create_async_connector()

    # initialize connection pool
    engine = await init_connection_pool(connector)

    async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()

    # explicitly close connector
    await connector.close_async()

@jackwotherspoon
Copy link
Collaborator

I usually get this error: remaining connection slots are reserved for non-replication superuser connections

@NickNaskida The error you are seeing is indeed most likely you are hitting the max number of idle connections allowed by Cloud SQL. This is normally due to confusion around the use of connection pooling and it not being properly configured as you pointed out. This is most likely no longer related to the clean up of the Connector object and because of this I recommend we open a separate issue on this repo to discuss further if need be. This will allow others to find a solution and not get confused 😄

I believe this happens because the get_session function is called on every request in my app, and because of it engine is recreated every time.

You would be correct, sorry I should have caught that previously but my FastAPI knowledge is limited. If you are calling get_session on every request and creating a new engine then you aren't getting the actual benefits of connection pooling. You want to cache your connection pool engine and share it across requests/sessions so that connections can be re-used and pooled properly. Your current approach is a 1:1 request to connection pool engine mapping which would be why you are hitting the max number of connections as shown by the error you are getting. To fix this you will want to move the Connector and engine initialization out of the get_session and maybe set them as global vars.

A couple tips for Cloud Run to optimize performance with the Cloud SQL Connectors is to lazy init the database engine (past issue on this) as well as set your Cloud Run service to "always on" (picture below) as the connector runs background tasks that can be affected negatively by cold starts.

image

The lazy init strategy works really with FastAPI's lifespan event 🤞 :

from contextlib import asynccontextmanager

from fastapi import FastAPI
from sqlalchemy.orm import sessionmaker

# global engine variable to be shared across sessions
engine = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global engine
    # initialize Connector object for connections to Cloud SQL
    connector = await create_async_connector()
    # init the engine
    engine = await init_connection_pool(connector)
    yield
    # clean up the Cloud SQL Connector
    await connector.close_async()

app = FastAPI(lifespan=lifespan)

async def get_session():
    global engine
    async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()

This will initialize an engine and connector for the lifespan of the FastAPI app.

@NickNaskida
Copy link

@jackwotherspoon That was it! Thank you very much!

PS. You should definitely add this to README & docs because I searched a lot of stuff on the web during the weekend and didn't find anything like this.

@jackwotherspoon
Copy link
Collaborator

Improved the connector.close method in #985 to be called upon garbage collection, should help resolve these errors when users don't explicitly call close.

@nioncode
Copy link

nioncode commented Sep 5, 2024

Seems like the garbage collection improvement by @jackwotherspoon does not fix the issue (at least not for us). We had to close the connector manually and do it via atexit, since we did not find a good place to use it as a context manager within our flask + sqlalchemy setup:

        connector = Connector()
        atexit.register(lambda: connector.close())

@jackwotherspoon
Copy link
Collaborator

@nioncode We actually reverted most of the garbage collection PR in #1010 as there seemed to be a race condition that could cause the cleanup to hang and re-opened #1011 to re-look at properly cleaning up on garbage collection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority: p2 Moderately-important priority. Fix may not be included in next release. type: cleanup An internal cleanup or hygiene concern. type: docs Improvement to the documentation for an API.
Projects
None yet
Development

No branches or pull requests

4 participants