From a88dbdd177f7e90369831c8e0f55ab87ddb61f30 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 17 Jan 2024 11:50:29 +0100 Subject: [PATCH 1/2] Discard connection after task completion Running the test without removing the session yields: ``` celery.app.trace ERROR 2024-01-17 11:49:42,502 [pN:main,p:81158,tN:Thread-2 (start)] Task integration.test_celery_tasks.use_session[0f89df2c-4960-4aac-8e1a-ce431dae6749] raised unexpected: PendingRollbackError("Can't reconnect until invalid transaction is rolled back.") Traceback (most recent call last): File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/celery/app/trace.py", line 477, in trace_task R = retval = fun(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/lib/galaxy/celery/__init__.py", line 181, in wrapper rval = app.magic_partial(func)(*args, **kwds) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/lagom/wrapping.py", line 28, in _bound_func return inner_func(*bound_args, **bound_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/lagom/wrapping.py", line 45, in _error_handling_func return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/test/integration/test_celery_tasks.py", line 42, in use_session sa_session().query(HistoryDatasetAssociation).get(1) File "", line 2, in get File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 468, in warned return fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 947, in get return self._get_impl(ident, loading.load_on_pk_identity) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 951, in _get_impl return self.session._get_impl( ^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2975, in _get_impl return db_load_fn( ^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity session.execute( File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1717, in execute result = conn._execute_20(statement, params or {}, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1808, in _execute_context conn = self._revalidate_connection() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 650, in _revalidate_connection self._invalid_transaction() File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 622, in _invalid_transaction raise exc.PendingRollbackError( sqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled back. (Background on this error at: https://sqlalche.me/e/14/8s2b) FAILEDINFO: 127.0.0.1:59776 - "GET /api/jobs?state=running HTTP/1.1" 200 OK ``` The fix here is to set a unique scope for the session registry. This means that the task is guranteed to get a new, unshared session, and this session gets removed after completing the task. The docstring of session.remove says: ``` This will first call :meth:`.Session.close` method on the current :class:`.Session`, which releases any existing transactional/connection resources still being held; transactions specifically are rolled back. The :class:`.Session` is then discarded. Upon next usage within the same scope, the :class:`.scoped_session` will produce a new :class:`.Session` object. ``` which I think is what we want after every celery task (for now, at least). It might make sense to discard the session for any long-running task that doesn't require database access in the future. Fix scope --- lib/galaxy/celery/__init__.py | 8 ++++++++ test/integration/test_celery_tasks.py | 15 +++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/lib/galaxy/celery/__init__.py b/lib/galaxy/celery/__init__.py index 84afb8ee18b2..3bd1c48dfa54 100644 --- a/lib/galaxy/celery/__init__.py +++ b/lib/galaxy/celery/__init__.py @@ -1,4 +1,5 @@ import os +import uuid from functools import ( lru_cache, wraps, @@ -167,6 +168,10 @@ def wrapper(*args, **kwds): app = get_galaxy_app() assert app + # Ensure sqlalchemy session registry scope is specific to this instance of the celery task + scoped_id = str(uuid.uuid4()) + app.model.set_request_id(scoped_id) + desc = func.__name__ if action is not None: desc += f" to {action}" @@ -184,6 +189,9 @@ def wrapper(*args, **kwds): except Exception: log.warning(f"Celery task execution failed for {desc} {timer}") raise + finally: + # Close and remove any open session this task has created + app.model.unset_request_id(scoped_id) return wrapper diff --git a/test/integration/test_celery_tasks.py b/test/integration/test_celery_tasks.py index 30658dc5b3b9..f9c9d1d1f20f 100644 --- a/test/integration/test_celery_tasks.py +++ b/test/integration/test_celery_tasks.py @@ -9,6 +9,7 @@ purge_hda, ) from galaxy.model import HistoryDatasetAssociation +from galaxy.model.scoped_session import galaxy_scoped_session from galaxy.schema import PdfDocumentType from galaxy.schema.schema import CreatePagePayload from galaxy.schema.tasks import GeneratePdfDownload @@ -31,6 +32,16 @@ def process_page(request: CreatePagePayload): return f"content_format is {request.content_format} with annotation {request.annotation}" +@galaxy_task +def invalidate_connection(sa_session: galaxy_scoped_session): + sa_session().connection().invalidate() + + +@galaxy_task +def use_session(sa_session: galaxy_scoped_session): + sa_session().query(HistoryDatasetAssociation).get(1) + + class TestCeleryTasksIntegration(IntegrationTestCase): dataset_populator: DatasetPopulator @@ -38,6 +49,10 @@ def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) + def test_recover_from_invalid_connection(self): + invalidate_connection.delay().get() + use_session.delay().get() + def test_random_simple_task_to_verify_framework_for_testing(self): assert mul.delay(4, 4).get(timeout=10) == 16 From ef26fc46858f3127a74ae10715b8ba9aa05ff0a4 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 17 Jan 2024 12:22:08 +0100 Subject: [PATCH 2/2] Use MockApp instead of celery_injected_app_container Sorry, I guess the idea was to not have to build a full MockApp ? Realisitically all tasks access the database at this point. I suppose we could make the decorators more specific if necessary ? --- test/unit/app/test_tasks.py | 51 +++++++++++++------------------------ 1 file changed, 18 insertions(+), 33 deletions(-) diff --git a/test/unit/app/test_tasks.py b/test/unit/app/test_tasks.py index 72ee650c22b3..5e0f2fddff7b 100644 --- a/test/unit/app/test_tasks.py +++ b/test/unit/app/test_tasks.py @@ -1,12 +1,7 @@ -from contextlib import contextmanager -from typing import ( - Iterator, - List, -) +from typing import List -from galaxy.celery import set_thread_app +from galaxy.app_unittest_utils.galaxy_mock import MockApp from galaxy.celery.tasks import clean_object_store_caches -from galaxy.di import Container from galaxy.objectstore import BaseObjectStore from galaxy.objectstore.caching import CacheTarget @@ -20,34 +15,24 @@ def cache_targets(self) -> List[CacheTarget]: def test_clean_object_store_caches(tmp_path): - with celery_injected_app_container() as container: - cache_targets: List[CacheTarget] = [] - container[BaseObjectStore] = MockObjectStore(cache_targets) # type: ignore[assignment] + container = MockApp() + cache_targets: List[CacheTarget] = [] + container[BaseObjectStore] = MockObjectStore(cache_targets) # type: ignore[assignment] - # similar code used in object store unit tests - cache_dir = tmp_path - path = cache_dir / "a_file_0" - path.write_text("this is an example file") + # similar code used in object store unit tests + cache_dir = tmp_path + path = cache_dir / "a_file_0" + path.write_text("this is an example file") - # works fine on an empty list of cache targets... - clean_object_store_caches() + # works fine on an empty list of cache targets... + clean_object_store_caches() - assert path.exists() + assert path.exists() - # place the file in mock object store's cache targets and - # run the task again and the above file should be gone. - cache_targets.append(CacheTarget(cache_dir, 1, 0.000000001)) - # works fine on an empty list of cache targets... - clean_object_store_caches() + # place the file in mock object store's cache targets and + # run the task again and the above file should be gone. + cache_targets.append(CacheTarget(cache_dir, 1, 0.000000001)) + # works fine on an empty list of cache targets... + clean_object_store_caches() - assert not path.exists() - - -@contextmanager -def celery_injected_app_container() -> Iterator[Container]: - container = Container() - set_thread_app(container) - try: - yield container - finally: - set_thread_app(None) + assert not path.exists()