Skip to content

Commit

Permalink
Discard connection after task completion
Browse files Browse the repository at this point in the history
Running the test without removing the session yields:
```
celery.app.trace ERROR 2024-01-17 11:49:42,502 [pN:main,p:81158,tN:Thread-2 (start)] Task integration.test_celery_tasks.use_session[0f89df2c-4960-4aac-8e1a-ce431dae6749] raised unexpected: PendingRollbackError("Can't reconnect until invalid transaction is rolled back.")
Traceback (most recent call last):
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/celery/app/trace.py", line 477, in trace_task
    R = retval = fun(*args, **kwargs)
                 ^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/lib/galaxy/celery/__init__.py", line 181, in wrapper
    rval = app.magic_partial(func)(*args, **kwds)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/lagom/wrapping.py", line 28, in _bound_func
    return inner_func(*bound_args, **bound_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/lagom/wrapping.py", line 45, in _error_handling_func
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/test/integration/test_celery_tasks.py", line 42, in use_session
    sa_session().query(HistoryDatasetAssociation).get(1)
  File "<string>", line 2, in get
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 468, in warned
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 947, in get
    return self._get_impl(ident, loading.load_on_pk_identity)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 951, in _get_impl
    return self.session._get_impl(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2975, in _get_impl
    return db_load_fn(
           ^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
    session.execute(
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1717, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1808, in _execute_context
    conn = self._revalidate_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 650, in _revalidate_connection
    self._invalid_transaction()
  File "/Users/mvandenb/src/galaxy/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 622, in _invalid_transaction
    raise exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled back. (Background on this error at: https://sqlalche.me/e/14/8s2b)
FAILEDINFO:     127.0.0.1:59776 - "GET /api/jobs?state=running HTTP/1.1" 200 OK
```

The docstring of remove says:
```
       This will first call :meth:`.Session.close` method
        on the current :class:`.Session`, which releases any existing
        transactional/connection resources still being held; transactions
        specifically are rolled back.  The :class:`.Session` is then
        discarded.   Upon next usage within the same scope,
        the :class:`.scoped_session` will produce a new
        :class:`.Session` object.
```
which I think is what we want after every celery task.
  • Loading branch information
mvdbeek committed Jan 17, 2024
1 parent 8118f07 commit 081190d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
2 changes: 2 additions & 0 deletions lib/galaxy/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def wrapper(*args, **kwds):
except Exception:
log.warning(f"Celery task execution failed for {desc} {timer}")
raise
finally:
app.model.session.remove()

return wrapper

Expand Down
15 changes: 15 additions & 0 deletions test/integration/test_celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
purge_hda,
)
from galaxy.model import HistoryDatasetAssociation
from galaxy.model.scoped_session import galaxy_scoped_session
from galaxy.schema import PdfDocumentType
from galaxy.schema.schema import CreatePagePayload
from galaxy.schema.tasks import GeneratePdfDownload
Expand All @@ -31,13 +32,27 @@ def process_page(request: CreatePagePayload):
return f"content_format is {request.content_format} with annotation {request.annotation}"


@galaxy_task
def invalidate_connection(sa_session: galaxy_scoped_session):
sa_session().connection().invalidate()


@galaxy_task
def use_session(sa_session: galaxy_scoped_session):
sa_session().query(HistoryDatasetAssociation).get(1)


class TestCeleryTasksIntegration(IntegrationTestCase):
dataset_populator: DatasetPopulator

def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)

def test_recover_from_invalid_connection(self):
invalidate_connection.delay().get()
use_session.delay().get()

def test_random_simple_task_to_verify_framework_for_testing(self):
assert mul.delay(4, 4).get(timeout=10) == 16

Expand Down

0 comments on commit 081190d

Please sign in to comment.