Skip to content

Commit

Permalink
Don't store job in JobIO instance attributes
Browse files Browse the repository at this point in the history
and invalidate `_output_hdas_and_paths` when current session is not the
same session that was used to populate `_output_hdas_and_paths`.

The Job instance may originate from a session that is associated to
another thread, and when that thread closes the session the instance
(along with other instances retrieved through loading relationships)
becomes detached.

I am not sure at all if this will fix
```
DetachedInstanceError: Instance <HistoryDatasetAssociation at 0x7fe68bbf14f0> is not bound to a Session; attribute refresh operation cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
  File "galaxy/jobs/runners/__init__.py", line 291, in prepare_job
    job_wrapper.prepare()
  File "galaxy/jobs/__init__.py", line 1248, in prepare
    tool_evaluator.set_compute_environment(compute_environment, get_special=get_special)
  File "galaxy/tools/evaluation.py", line 162, in set_compute_environment
    self.param_dict = self.build_param_dict(
  File "galaxy/tools/evaluation.py", line 204, in build_param_dict
    self.__populate_output_dataset_wrappers(param_dict, output_datasets, job_working_directory)
  File "galaxy/tools/evaluation.py", line 447, in __populate_output_dataset_wrappers
    param_dict[name] = DatasetFilenameWrapper(
  File "galaxy/tools/wrappers.py", line 403, in __init__
    path_rewrite = compute_environment and compute_environment.output_path_rewrite(dataset_instance)
  File "galaxy/job_execution/compute_environment.py", line 132, in output_path_rewrite
    return str(self.job_io.get_output_path(dataset))
  File "galaxy/job_execution/setup.py", line 226, in get_output_path
    if hda.id == dataset.id:
  File "sqlalchemy/orm/attributes.py", line 487, in __get__
    return self.impl.get(state, dict_)
  File "sqlalchemy/orm/attributes.py", line 959, in get
    value = self._fire_loader_callables(state, key, passive)
  File "sqlalchemy/orm/attributes.py", line 990, in _fire_loader_callables
    return state._load_expired(state, passive)
  File "sqlalchemy/orm/state.py", line 712, in _load_expired
    self.manager.expired_attribute_loader(self, toload, passive)
  File "sqlalchemy/orm/loading.py", line 1369, in load_scalar_attributes
    raise orm_exc.DetachedInstanceError(
```
but it seems to make some sense. JobIO crosses thread boundaries as part
of the job wrapper getting put into threading queues.

Ideally we'd make sure that no ORM instance crosses the thread boundary
(or we systematically re-associated with a session).

I also tried flagging these patterns automatically using something like:

```

    @event.listens_for(session, "persistent_to_detached")
    def on_detach(sess, instance):
        if not getattr(instance, "allow_detatch", False):
            raise Exception(f"{instance} detached. This ain't good for how we do things ?")
```

but it seems tricky to figure out when this is fine and when it is not.
  • Loading branch information
mvdbeek committed Nov 2, 2023
1 parent 12b20c6 commit d83493d
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions lib/galaxy/job_execution/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def __init__(
user_context_instance = user_context
self.user_context = user_context_instance
self.sa_session = sa_session
self.job = job
self.job_id = job.id
self.working_directory = working_directory
self.outputs_directory = outputs_directory
Expand All @@ -122,9 +121,14 @@ def __init__(
self.tool_source = tool_source
self.tool_source_class = tool_source_class
self._output_paths: Optional[OutputPaths] = None
self._output_paths_for_job_object_id: Optional[int] = None
self._output_hdas_and_paths: Optional[OutputHdasAndType] = None
self._dataset_path_rewriter: Optional[DatasetPathRewriter] = None

@property
def job(self):
return self.sa_session.query(Job).get(self.job_id)

@classmethod
def from_json(cls, path, sa_session):
with open(path) as job_io_serialized:
Expand All @@ -134,9 +138,7 @@ def from_json(cls, path, sa_session):
@classmethod
def from_dict(cls, io_dict, sa_session):
io_dict.pop("model_class")
job_id = io_dict.pop("job_id")
job = sa_session.query(Job).get(job_id)
return cls(sa_session=sa_session, job=job, **io_dict)
return cls(sa_session=sa_session, **io_dict)

def to_dict(self):
io_dict = super().to_dict()
Expand Down Expand Up @@ -171,7 +173,7 @@ def output_paths(self) -> OutputPaths:

@property
def output_hdas_and_paths(self) -> OutputHdasAndType:
if self._output_hdas_and_paths is None:
if self._output_hdas_and_paths is None and id(self.job) == self._output_paths_for_job_object_id:
self.compute_outputs()
return cast(OutputHdasAndType, self._output_hdas_and_paths)

Expand Down Expand Up @@ -237,6 +239,7 @@ def compute_outputs(self) -> None:
dataset_path_rewriter = self.dataset_path_rewriter

job = self.job
self._output_paths_for_job_object_id = id(job)
# Job output datasets are combination of history, library, and jeha datasets.
special = self.sa_session.query(JobExportHistoryArchive).filter_by(job=job).first()
false_path = None
Expand Down

0 comments on commit d83493d

Please sign in to comment.