diff --git a/.github/workflows/api.yaml b/.github/workflows/api.yaml index 93ef7e855840..e0f6d98b6472 100644 --- a/.github/workflows/api.yaml +++ b/.github/workflows/api.yaml @@ -2,6 +2,7 @@ name: API tests on: [push, pull_request] env: GALAXY_TEST_DBURI: 'postgresql://postgres:postgres@localhost:5432/galaxy?client_encoding=utf8' + GALAXY_TEST_RAISE_EXCEPTION_ON_HISTORYLESS_HDA: '1' concurrency: group: api-${{ github.ref }} cancel-in-progress: true diff --git a/.github/workflows/api_paste.yaml b/.github/workflows/api_paste.yaml index dbb9d62ca9a0..69cb6e108098 100644 --- a/.github/workflows/api_paste.yaml +++ b/.github/workflows/api_paste.yaml @@ -3,6 +3,7 @@ on: [push, pull_request] env: GALAXY_TEST_DBURI: 'postgresql://postgres:postgres@localhost:5432/galaxy?client_encoding=utf8' GALAXY_TEST_USE_UVICORN: false + GALAXY_TEST_RAISE_EXCEPTION_ON_HISTORYLESS_HDA: '1' concurrency: group: api-legacy-${{ github.ref }} cancel-in-progress: true diff --git a/.github/workflows/converter_tests.yaml b/.github/workflows/converter_tests.yaml index e01d907018ec..a4363cc50b1f 100644 --- a/.github/workflows/converter_tests.yaml +++ b/.github/workflows/converter_tests.yaml @@ -1,5 +1,7 @@ name: Converter tests on: [push, pull_request] +env: + GALAXY_TEST_RAISE_EXCEPTION_ON_HISTORYLESS_HDA: '1' concurrency: group: converter-${{ github.ref }} cancel-in-progress: true @@ -65,4 +67,4 @@ jobs: if: failure() with: name: Converter test results (${{ matrix.python-version }}) - path: tool_test_output.html \ No newline at end of file + path: tool_test_output.html diff --git a/.github/workflows/framework.yaml b/.github/workflows/framework.yaml index a04ecfe2e83d..38e96c44f5e1 100644 --- a/.github/workflows/framework.yaml +++ b/.github/workflows/framework.yaml @@ -1,5 +1,7 @@ name: Framework tests on: [push, pull_request] +env: + GALAXY_TEST_RAISE_EXCEPTION_ON_HISTORYLESS_HDA: '1' concurrency: group: framework-${{ github.ref }} cancel-in-progress: true diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index d368d5ea6aac..a3ded7cefe2b 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -6,6 +6,7 @@ concurrency: env: GALAXY_TEST_DBURI: 'postgresql://postgres:postgres@localhost:5432/galaxy?client_encoding=utf8' GALAXY_TEST_AMQP_URL: 'amqp://localhost:5672//' + GALAXY_TEST_RAISE_EXCEPTION_ON_HISTORYLESS_HDA: '1' jobs: test: name: Test diff --git a/.github/workflows/integration_selenium.yaml b/.github/workflows/integration_selenium.yaml index b0d1cd41497a..6907c2301bf9 100644 --- a/.github/workflows/integration_selenium.yaml +++ b/.github/workflows/integration_selenium.yaml @@ -8,6 +8,7 @@ env: GALAXY_SKIP_CLIENT_BUILD: '0' GALAXY_TEST_SELENIUM_RETRIES: 1 YARN_INSTALL_OPTS: --frozen-lockfile + GALAXY_TEST_RAISE_EXCEPTION_ON_HISTORYLESS_HDA: '1' jobs: test: name: Test diff --git a/.github/workflows/selenium.yaml b/.github/workflows/selenium.yaml index 4bba7d61b345..b73af41ee76b 100644 --- a/.github/workflows/selenium.yaml +++ b/.github/workflows/selenium.yaml @@ -8,6 +8,7 @@ env: GALAXY_TEST_SKIP_FLAKEY_TESTS_ON_ERROR: 'true' GALAXY_TEST_SELENIUM_RETRIES: 1 YARN_INSTALL_OPTS: --frozen-lockfile + GALAXY_TEST_RAISE_EXCEPTION_ON_HISTORYLESS_HDA: '1' jobs: test: name: Test diff --git a/.github/workflows/selenium_beta.yaml b/.github/workflows/selenium_beta.yaml index 52824eaa0afb..6c97089d4fd5 100644 --- a/.github/workflows/selenium_beta.yaml +++ b/.github/workflows/selenium_beta.yaml @@ -6,6 +6,7 @@ env: GALAXY_TEST_SELENIUM_RETRIES: 1 GALAXY_TEST_SELENIUM_BETA_HISTORY: 1 YARN_INSTALL_OPTS: --frozen-lockfile + GALAXY_TEST_RAISE_EXCEPTION_ON_HISTORYLESS_HDA: '1' jobs: test: name: Test diff --git a/client/src/components/History/providers/HistoryContentProvider/contentPayload.js b/client/src/components/History/providers/HistoryContentProvider/contentPayload.js index 704f013d6545..88bb2b05f60d 100644 --- a/client/src/components/History/providers/HistoryContentProvider/contentPayload.js +++ b/client/src/components/History/providers/HistoryContentProvider/contentPayload.js @@ -150,7 +150,7 @@ export const contentPayload = (cfg = {}) => { filter(([a,b]) => !isNaN(a.maxHid) && !isNaN(b.maxHid)), withLatestFrom(pos$, hid$), map(([[lastResponse, response], pos, hid]) => { - const updatesAtTop = response.maxHid > lastResponse.maxHid; + const updatesAtTop = response.maxHid >= lastResponse.maxHid; const scrollerExactlyAtTop = pos.cursor === 0 || pos.key === lastResponse.maxHid; const fudge = 2; diff --git a/lib/galaxy/datatypes/test/1.yaml b/lib/galaxy/datatypes/test/1.yaml new file mode 100644 index 000000000000..d32738e86f9e --- /dev/null +++ b/lib/galaxy/datatypes/test/1.yaml @@ -0,0 +1,7 @@ +test: + - test1 + - test2 +testing: testing 1 +test2: + - Lorem ipsum dolor sit amet, consectetur adipiscing elit. Phasellus ac justo metus. Praesent eget mollis enim, sed facilisis felis. Pellentesque vulputate varius mi, vel eleifend mauris luctus vestibulum. Suspendisse elementum nisi lorem, in consequat nisl commodo nec. Curabitur faucibus nec diam id interdum. Sed lobortis sed libero id feugiat. Duis diam odio, elementum sed suscipit ac, venenatis a eros. Phasellus at sollicitudin ligula. Suspendisse congue, tortor vitae aliquet ornare, diam mi scelerisque eros, vel tempus elit odio ac ligula. Pellentesque ultricies diam ornare tempor sagittis. Donec vel elit ac elit placerat dictum. Pellentesque et diam at sem porttitor blandit. + - Lorem ipsum dolor sit amet, consectetur adipiscing elit. Phasellus ac justo metus. Praesent eget mollis enim, sed facilisis felis. Pellentesque vulputate varius mi, vel eleifend mauris luctus vestibulum. Suspendisse elementum nisi lorem, in consequat nisl commodo nec. Curabitur faucibus nec diam id interdum. Sed lobortis sed libero id feugiat. Duis diam odio, elementum sed suscipit ac, venenatis a eros. Phasellus at sollicitudin ligula. Suspendisse congue, tortor vitae aliquet ornare, diam mi scelerisque eros, vel tempus elit odio ac ligula. Pellentesque ultricies diam ornare tempor sagittis. Donec vel elit ac elit placerat dictum. Pellentesque et diam at sem porttitor blandit. diff --git a/lib/galaxy/job_execution/output_collect.py b/lib/galaxy/job_execution/output_collect.py index 0e67ca67ca04..dc85351cda3c 100644 --- a/lib/galaxy/job_execution/output_collect.py +++ b/lib/galaxy/job_execution/output_collect.py @@ -17,7 +17,6 @@ ModelPersistenceContext, persist_elements_to_folder, persist_elements_to_hdca, - persist_extra_files, persist_hdas, RegexCollectedDatasetMatch, SessionlessModelPersistenceContext, @@ -207,6 +206,13 @@ def __init__( self.object_store = object_store self.final_job_state = final_job_state self.flush_per_n_datasets = flush_per_n_datasets + self._tag_handler = None + + @property + def tag_handler(self): + if self._tag_handler is None: + self._tag_handler = self.app.tag_handler.create_tag_handler_session() + return self._tag_handler @property def work_context(self): @@ -221,10 +227,6 @@ def user(self): user = None return user - @property - def tag_handler(self): - return self.app.tag_handler - def persist_object(self, obj): self.sa_session.add(obj) @@ -284,7 +286,8 @@ def add_library_dataset_to_folder(self, library_folder, ld): def add_datasets_to_history(self, datasets, for_output_dataset=None): sa_session = self.sa_session - self.job.history.add_datasets(sa_session, datasets) + self.job.history.stage_addition(datasets) + pending_histories = {self.job.history} if for_output_dataset is not None: # Need to update all associated output hdas, i.e. history was # shared with job running @@ -293,9 +296,11 @@ def add_datasets_to_history(self, datasets, for_output_dataset=None): continue for dataset in datasets: new_data = dataset.copy() - copied_dataset.history.add_dataset(new_data) + copied_dataset.history.stage_addition(new_data) + pending_histories.add(copied_dataset.history) sa_session.add(new_data) - sa_session.flush() + for history in pending_histories: + history.add_pending_items() def output_collection_def(self, name): tool = self.tool @@ -388,6 +393,7 @@ def collect_primary_datasets(job_context, output, input_ext): primary_output_assigned = False new_outdata_name = None primary_datasets = {} + storage_callbacks = [] for output_index, (name, outdata) in enumerate(output.items()): dataset_collectors = [DEFAULT_DATASET_COLLECTOR] output_def = job_context.output_def(name) @@ -429,7 +435,11 @@ def collect_primary_datasets(job_context, output, input_ext): # TODO: should be able to disambiguate files in different directories... new_primary_filename = os.path.split(filename)[-1] new_primary_datasets_attributes = job_context.tool_provided_metadata.get_new_dataset_meta_by_basename(name, new_primary_filename) - + extra_files = None + if new_primary_datasets_attributes: + extra_files_path = new_primary_datasets_attributes.get('extra_files', None) + if extra_files_path: + extra_files = os.path.join(job_working_directory, extra_files_path) primary_data = job_context.create_dataset( ext, designation, @@ -437,19 +447,15 @@ def collect_primary_datasets(job_context, output, input_ext): dbkey, new_primary_name, filename, + extra_files=extra_files, info=info, init_from=outdata, dataset_attributes=new_primary_datasets_attributes, - creating_job_id=job_context.get_job_id() if job_context else None + creating_job_id=job_context.get_job_id() if job_context else None, + storage_callbacks=storage_callbacks ) # Associate new dataset with job job_context.add_output_dataset_association(f'__new_primary_file_{name}|{designation}__', primary_data) - - if new_primary_datasets_attributes: - extra_files_path = new_primary_datasets_attributes.get('extra_files', None) - if extra_files_path: - extra_files_path_joined = os.path.join(job_working_directory, extra_files_path) - persist_extra_files(job_context.object_store, extra_files_path_joined, primary_data) job_context.add_datasets_to_history([primary_data], for_output_dataset=outdata) # Add dataset to return dict primary_datasets[name][designation] = primary_data @@ -462,7 +468,9 @@ def collect_primary_datasets(job_context, output, input_ext): if sa_session: sa_session.add(outdata) - job_context.flush() + # Move discovered outputs to storage and set metdata / peeks + for callback in storage_callbacks: + callback() return primary_datasets diff --git a/lib/galaxy/jobs/actions/post.py b/lib/galaxy/jobs/actions/post.py index be0bd5a70cc7..1f9deb2620a0 100644 --- a/lib/galaxy/jobs/actions/post.py +++ b/lib/galaxy/jobs/actions/post.py @@ -209,7 +209,7 @@ def _gen_new_name(self, action, input_names, replacement_dict): @classmethod def execute(cls, app, sa_session, action, job, replacement_dict, final_job_state=None): input_names = {} - # Lookp through inputs find one with "to_be_replaced" input + # Loop through inputs find one with "to_be_replaced" input # variable name, and get the replacement name for input_assoc in job.input_datasets: if input_assoc.dataset: @@ -399,31 +399,31 @@ class TagDatasetAction(DefaultJobAction): @classmethod def execute_on_mapped_over(cls, trans, sa_session, action, step_inputs, step_outputs, replacement_dict, final_job_state=None): + tag_handler = trans.app.tag_handler.create_tag_handler_session() if action.action_arguments: tags = [t.replace('#', 'name:') if t.startswith('#') else t for t in [t.strip() for t in action.action_arguments.get('tags', '').split(',') if t.strip()]] if tags: for name, step_output in step_outputs.items(): if action.output_name == '' or name == action.output_name: - cls._execute(trans.app, trans.user, step_output, tags) + cls._execute(tag_handler, trans.user, step_output, tags) @classmethod def execute(cls, app, sa_session, action, job, replacement_dict, final_job_state=None): if action.action_arguments: + tag_handler = app.tag_handler.create_tag_handler_session() tags = [t.replace('#', 'name:') if t.startswith('#') else t for t in [t.strip() for t in action.action_arguments.get('tags', '').split(',') if t.strip()]] if tags: for dataset_assoc in job.output_datasets: if action.output_name == '' or dataset_assoc.name == action.output_name: - cls._execute(app, job.user, dataset_assoc.dataset, tags) + cls._execute(tag_handler, job.user, dataset_assoc.dataset, tags) for dataset_collection_assoc in job.output_dataset_collection_instances: if action.output_name == '' or dataset_collection_assoc.name == action.output_name: - cls._execute(app, job.user, dataset_collection_assoc.dataset_collection_instance, tags) - - sa_session.flush() + cls._execute(tag_handler, job.user, dataset_collection_assoc.dataset_collection_instance, tags) @classmethod - def _execute(cls, app, user, output, tags): - app.tag_handler.add_tags_from_list(user, output, tags) + def _execute(cls, tag_handler, user, output, tags): + tag_handler.add_tags_from_list(user, output, tags, flush=False) @classmethod def get_short_str(cls, pja): @@ -443,8 +443,8 @@ class RemoveTagDatasetAction(TagDatasetAction): direction = "from" @classmethod - def _execute(cls, app, user, output, tags): - app.tag_handler.remove_tags_from_list(user, output, tags) + def _execute(cls, tag_handler, user, output, tags): + tag_handler.remove_tags_from_list(user, output, tags) class ActionBox: diff --git a/lib/galaxy/managers/collections.py b/lib/galaxy/managers/collections.py index 93ff9b3246bd..48949e509524 100644 --- a/lib/galaxy/managers/collections.py +++ b/lib/galaxy/managers/collections.py @@ -57,7 +57,7 @@ def __init__( self.hda_manager = hda_manager self.history_manager = history_manager - self.tag_handler = tag_handler + self.tag_handler = tag_handler.create_tag_handler_session() self.ldda_manager = ldda_manager def precreate_dataset_collection_instance(self, trans, parent, name, structure, implicit_inputs=None, implicit_output_name=None, tags=None, completed_collection=None): @@ -182,7 +182,7 @@ def _create_instance_for_collection(self, trans, parent, name, dataset_collectio # values. if isinstance(tags, list): assert implicit_inputs is None, implicit_inputs - tags = self.tag_handler.add_tags_from_list(trans.user, dataset_collection_instance, tags) + tags = self.tag_handler.add_tags_from_list(trans.user, dataset_collection_instance, tags, flush=False) else: tags = self._append_tags(dataset_collection_instance, implicit_inputs, tags) return self.__persist(dataset_collection_instance, flush=flush) @@ -208,6 +208,8 @@ def create_dataset_collection(self, trans, collection_type, element_identifiers= hide_source_items=hide_source_items, copy_elements=copy_elements, history=history) + if history: + history.add_pending_items() else: if has_subcollections: # Nested collection - recursively create collections as needed. @@ -339,7 +341,7 @@ def copy(self, trans, parent, source, encoded_source_id, copy_elements=False, da copy_kwds["element_destination"] = parent # e.g. a history if dataset_instance_attributes is not None: copy_kwds["dataset_instance_attributes"] = dataset_instance_attributes - new_hdca = source_hdca.copy(**copy_kwds) + new_hdca = source_hdca.copy(flush=False, **copy_kwds) new_hdca.copy_tags_from(target_user=trans.get_user(), source=source_hdca) if not copy_elements: parent.add_dataset_collection(new_hdca) @@ -493,16 +495,16 @@ def __load_element(self, trans, element_identifier, hide_source_items, copy_elem decoded_id = int(trans.app.security.decode_id(encoded_id)) hda = self.hda_manager.get_accessible(decoded_id, trans.user) if copy_elements: - element = self.hda_manager.copy(hda, history=history or trans.history, hide_copy=True) + element = self.hda_manager.copy(hda, history=history or trans.history, hide_copy=True, flush=False) else: element = hda if hide_source_items and self.hda_manager.get_owned(hda.id, user=trans.user, current_history=history or trans.history): hda.visible = False - self.tag_handler.apply_item_tags(user=trans.user, item=element, tags_str=tag_str) + self.tag_handler.apply_item_tags(user=trans.user, item=element, tags_str=tag_str, flush=False) elif src_type == 'ldda': element = self.ldda_manager.get(trans, encoded_id, check_accessible=True) element = element.to_history_dataset_association(history or trans.history, add_to_history=True, visible=not hide_source_items) - self.tag_handler.apply_item_tags(user=trans.user, item=element, tags_str=tag_str) + self.tag_handler.apply_item_tags(user=trans.user, item=element, tags_str=tag_str, flush=False) elif src_type == 'hdca': # TODO: Option to copy? Force copy? Copy or allow if not owned? element = self.__get_history_collection_instance(trans, encoded_id).collection diff --git a/lib/galaxy/managers/hdas.py b/lib/galaxy/managers/hdas.py index 6662c154300e..dbcf53d748ca 100644 --- a/lib/galaxy/managers/hdas.py +++ b/lib/galaxy/managers/hdas.py @@ -8,6 +8,8 @@ import logging import os +from sqlalchemy.orm.session import object_session + from galaxy import ( datatypes, exceptions, @@ -110,26 +112,24 @@ def create(self, history=None, dataset=None, flush=True, **kwargs): self.session().flush() return hda - def copy(self, hda, history=None, hide_copy=False, **kwargs): + def copy(self, hda, history=None, hide_copy=False, flush=True, **kwargs): """ Copy hda, including annotation and tags, add to history and return the given HDA. """ - copy = hda.copy(parent_id=kwargs.get('parent_id'), copy_hid=False) + copy = hda.copy(parent_id=kwargs.get('parent_id'), copy_hid=False, copy_tags=hda.tags, flush=flush) if hide_copy: copy.visible = False - # add_dataset will update the hid to the next avail. in history if history: - history.add_dataset(copy) + history.stage_addition(copy) - copy.copied_from_history_dataset_association = hda copy.set_size() original_annotation = self.annotation(hda) - self.annotate(copy, original_annotation, user=hda.history.user) - - # these use a session flush - original_tags = self.get_tags(hda) - self.set_tags(copy, original_tags, user=hda.history.user) + self.annotate(copy, original_annotation, user=hda.history.user, flush=False) + if flush: + if history: + history.add_pending_items() + object_session(copy).flush() return copy diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 35e9ac1ba144..a3a02d607301 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -2420,10 +2420,9 @@ def init_on_load(self): def stage_addition(self, items): history_id = self.id for item in listify(items): + item.history = self if history_id: item.history_id = history_id - else: - item.history = self self._pending_additions.append(item) @property @@ -2568,10 +2567,9 @@ def copy(self, name=None, target_user=None, activatable=False, all_datasets=Fals else: hdcas = self.active_dataset_collections for hdca in hdcas: - new_hdca = hdca.copy() + new_hdca = hdca.copy(flush=False) new_history.add_dataset_collection(new_hdca, set_hid=False) db_session.add(new_hdca) - db_session.flush() if target_user: new_hdca.copy_item_annotation(db_session, self.user, hdca, target_user, new_hdca) @@ -5666,7 +5664,7 @@ def find_implicit_input_collection(self, name): break return matching_collection - def copy(self, element_destination=None, dataset_instance_attributes=None): + def copy(self, element_destination=None, dataset_instance_attributes=None, flush=True): """ Create a copy of this history dataset collection association. Copy underlying collection. @@ -5696,7 +5694,7 @@ def copy(self, element_destination=None, dataset_instance_attributes=None): if element_destination: element_destination.stage_addition(hdca) element_destination.add_pending_items() - else: + if flush: object_session(self).flush() return hdca diff --git a/lib/galaxy/model/base.py b/lib/galaxy/model/base.py index cdf95d4a7f4d..4a4e3346b5df 100644 --- a/lib/galaxy/model/base.py +++ b/lib/galaxy/model/base.py @@ -2,6 +2,7 @@ Shared model and mapping code between Galaxy and Tool Shed, trying to generalize to generic database connections. """ +import os import threading from contextvars import ContextVar from inspect import ( @@ -108,6 +109,24 @@ def versioned_objects(iter): yield obj +def versioned_objects_strict(iter): + for obj in iter: + if hasattr(obj, '__create_version__'): + if obj.extension != "len": + # TODO: Custom builds (with .len extension) do not get a history or a HID. + # These should get some other type of permanent storage, perhaps UserDatasetAssociation ? + # Everything else needs to have a hid and a history + if not obj.history and not obj.history_id: + raise Exception(f'HistoryDatsetAssociation {obj} without history detected, this is not valid') + elif not obj.hid: + raise Exception(f'HistoryDatsetAssociation {obj} without has no hid, this is not valid') + yield obj + + +if os.environ.get("GALAXY_TEST_RAISE_EXCEPTION_ON_HISTORYLESS_HDA"): + versioned_objects = versioned_objects_strict # noqa: F811 + + def versioned_session(session): @event.listens_for(session, 'before_flush') def before_flush(session, flush_context, instances): diff --git a/lib/galaxy/model/mapping.py b/lib/galaxy/model/mapping.py index d4b32513a992..de6414c4ab40 100644 --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -45,19 +45,13 @@ def db_next_hid(self, n=1): """ session = object_session(self) table = self.table - trans = session.begin() - try: - if "postgres" not in session.bind.dialect.name: - next_hid = select([table.c.hid_counter], table.c.id == model.cached_id(self)).with_for_update().scalar() - table.update(table.c.id == self.id).execute(hid_counter=(next_hid + n)) - else: - stmt = table.update().where(table.c.id == model.cached_id(self)).values(hid_counter=(table.c.hid_counter + n)).returning(table.c.hid_counter) - next_hid = session.execute(stmt).scalar() - n - trans.commit() - return next_hid - except Exception: - trans.rollback() - raise + if "postgres" not in session.bind.dialect.name: + next_hid = select([table.c.hid_counter], table.c.id == model.cached_id(self)).with_for_update().scalar() + table.update(table.c.id == self.id).execute(hid_counter=(next_hid + n)) + else: + stmt = table.update().where(table.c.id == model.cached_id(self)).values(hid_counter=(table.c.hid_counter + n)).returning(table.c.hid_counter) + next_hid = session.execute(stmt).scalar() - n + return next_hid model.History._next_hid = db_next_hid # type: ignore diff --git a/lib/galaxy/model/store/__init__.py b/lib/galaxy/model/store/__init__.py index 495f202cca0f..e096286d8d28 100644 --- a/lib/galaxy/model/store/__init__.py +++ b/lib/galaxy/model/store/__init__.py @@ -275,7 +275,6 @@ def remap_objects(p, k, obj): setattr(hda, attribute, value) handle_dataset_object_edit(hda) - self._flush() else: metadata = dataset_attrs['metadata'] @@ -324,7 +323,6 @@ def remap_objects(p, k, obj): dataset_instance.dataset.uuid = dataset_attrs["dataset_uuid"] self._session_add(dataset_instance) - self._flush() if model_class == "HistoryDatasetAssociation": hda = cast(model.HistoryDatasetAssociation, dataset_instance) @@ -336,7 +334,6 @@ def remap_objects(p, k, obj): else: object_import_tracker.requires_hid.append(hda) - self._flush() file_source_root = self.file_source_root # If dataset is in the dictionary - we will assert this dataset is tied to the Galaxy instance @@ -447,7 +444,6 @@ def import_folder(folder_attrs): library_folder.deleted = deleted self._session_add(library_folder) - self._flush() for sub_folder_attrs in folder_attrs.get("folders", []): sub_folder = import_folder(sub_folder_attrs) @@ -465,8 +461,7 @@ def import_folder(folder_attrs): ld.library_dataset_dataset_association = ldda self._session_add(ld) - self._flush() - + self.sa_session.flush() return library_folder if 'root_folder' in library_attrs: @@ -682,7 +677,6 @@ def _import_jobs(self, object_import_tracker, history): assert not self.sessionless job = self.sa_session.query(model.Job).get(job_attrs["id"]) self._connect_job_io(job, job_attrs, _find_hda, _find_hdca, _find_dce) - self._flush() continue imported_job = model.Job() @@ -716,7 +710,6 @@ def _import_jobs(self, object_import_tracker, history): except Exception: pass self._session_add(imported_job) - self._flush() # Connect jobs to input and output datasets. params = self._normalize_job_parameters(imported_job, job_attrs, _find_hda, _find_hdca, _find_dce) @@ -725,7 +718,6 @@ def _import_jobs(self, object_import_tracker, history): imported_job.add_parameter(name, dumps(value)) self._connect_job_io(imported_job, job_attrs, _find_hda, _find_hdca, _find_dce) - self._flush() if object_key in job_attrs: object_import_tracker.jobs_by_key[job_attrs[object_key]] = imported_job @@ -747,7 +739,6 @@ def _import_implicit_collection_jobs(self, object_import_tracker): self._session_add(icja) self._session_add(icj) - self._flush() def _session_add(self, obj): self.sa_session.add(obj) diff --git a/lib/galaxy/model/store/discover.py b/lib/galaxy/model/store/discover.py index 9f1ec661030e..c6b945385969 100644 --- a/lib/galaxy/model/store/discover.py +++ b/lib/galaxy/model/store/discover.py @@ -59,6 +59,7 @@ def create_dataset( created_from_basename=None, final_job_state='ok', creating_job_id=None, + storage_callbacks=None, ): tag_list = tag_list or [] sources = sources or [] @@ -129,31 +130,8 @@ def create_dataset( if created_from_basename is not None: primary_data.created_from_basename = created_from_basename - has_flushed = False if tag_list: - # If we have a tag we need a primary id, so need to flush here - # TODO: eliminate creating tag associations within create dataset - # We can do this incrementally by not passing in a tag list. - self.flush() - has_flushed = True - self.tag_handler.add_tags_from_list(self.job.user, primary_data, tag_list) - - # Move data from temp location to dataset location - if filename: - # TODO: eliminate this, should happen outside of create_dataset so that we don't need to flush - if not has_flushed: - self.flush() - has_flushed = True - if not link_data: - self.object_store.update_from_file(primary_data.dataset, file_name=filename, create=True) - else: - primary_data.link_to(filename) - if extra_files: - persist_extra_files(self.object_store, extra_files, primary_data) - primary_data.set_size() - else: - # We are sure there are no extra files, so optimize things that follow by settting total size also. - primary_data.set_size(no_extra_files=True) + self.tag_handler.add_tags_from_list(self.job.user, primary_data, tag_list, flush=False) # If match specified a name use otherwise generate one from # designation. @@ -174,12 +152,28 @@ def create_dataset( if info is not None: primary_data.info = info - if filename: - self.set_datasets_metadata(datasets=[primary_data], datasets_attributes=[dataset_attributes]) - + if storage_callbacks is None: + self.finalize_storage(primary_data=primary_data, dataset_attributes=dataset_attributes, extra_files=extra_files, filename=filename, link_data=link_data) + else: + storage_callbacks.append(lambda: self.finalize_storage(primary_data=primary_data, dataset_attributes=dataset_attributes, extra_files=extra_files, filename=filename, link_data=link_data)) return primary_data + def finalize_storage(self, primary_data, dataset_attributes, extra_files, filename, link_data): + # Move data from temp location to dataset location + if not link_data: + self.object_store.update_from_file(primary_data.dataset, file_name=filename, create=True) + else: + primary_data.link_to(filename) + if extra_files: + persist_extra_files(self.object_store, extra_files, primary_data) + primary_data.set_size() + else: + # We are sure there are no extra files, so optimize things that follow by settting total size also. + primary_data.set_size(no_extra_files=True) + # TODO: this might run set_meta after copying the file to the object store, which could be inefficient if job working directory is closer to the node. + self.set_datasets_metadata(datasets=[primary_data], datasets_attributes=[dataset_attributes]) + @staticmethod def set_datasets_metadata(datasets, datasets_attributes=None): datasets_attributes = datasets_attributes or [{} for _ in datasets] @@ -297,9 +291,9 @@ def _populate_elements(self, chunk, name, root_collection_builder, metadata_sour association_name = f'__new_primary_file_{name}|{element_identifier_str}__' self.add_output_dataset_association(association_name, dataset) - self.update_object_store_with_datasets(datasets=element_datasets['datasets'], paths=element_datasets['paths'], extra_files=element_datasets['extra_files']) add_datasets_timer = ExecutionTimer() self.add_datasets_to_history(element_datasets['datasets']) + self.update_object_store_with_datasets(datasets=element_datasets['datasets'], paths=element_datasets['paths'], extra_files=element_datasets['extra_files']) log.debug( "(%s) Add dynamic collection datasets to history for output [%s] %s", self.job_id(), @@ -314,9 +308,8 @@ def add_tags_to_datasets(self, datasets, tag_lists): # that's not better or worse than what we previously did in create_datasets # TDOD: implement that or figure out why it is not implemented and find a better solution. # Could it be that SessionlessModelPersistenceContext doesn't support tags? - tag_session = self.tag_handler.create_tag_handler_session() for dataset, tags in zip(datasets, tag_lists): - tag_session.add_tags_from_list(self.job.user, dataset, tags, flush=False) + self.tag_handler.add_tags_from_list(self.job.user, dataset, tags, flush=False) def update_object_store_with_datasets(self, datasets, paths, extra_files): for dataset, path, extra_file in zip(datasets, paths, extra_files): @@ -603,6 +596,7 @@ def persist_elements_to_folder(model_persistence_context, elements, library_fold def persist_hdas(elements, model_persistence_context, final_job_state='ok'): # discover files as individual datasets for the target history datasets = [] + storage_callbacks = [] def collect_elements_for_history(elements): for element in elements: @@ -648,12 +642,15 @@ def collect_elements_for_history(elements): hashes=hashes, created_from_basename=created_from_basename, final_job_state=state, + storage_callbacks=storage_callbacks, ) if not hda_id: datasets.append(dataset) collect_elements_for_history(elements) model_persistence_context.add_datasets_to_history(datasets) + for callback in storage_callbacks: + callback() def add_datasets_to_history(self, datasets, for_output_dataset=None): if for_output_dataset is not None: diff --git a/lib/galaxy/tool_util/linters/inputs.py b/lib/galaxy/tool_util/linters/inputs.py index 437efe5e76f9..ea887af358ba 100644 --- a/lib/galaxy/tool_util/linters/inputs.py +++ b/lib/galaxy/tool_util/linters/inputs.py @@ -19,16 +19,16 @@ ATTRIB_VALIDATOR_COMPATIBILITY = { "check": ["metadata"], - "expression": ["regex", "substitute_value_in_message"], + "expression": ["substitute_value_in_message"], "table_name": ["dataset_metadata_in_data_table", "dataset_metadata_not_in_data_table", "value_in_data_table", "value_not_in_data_table"], "filename": ["dataset_metadata_in_file"], "metadata_name": ["dataset_metadata_in_data_table", "dataset_metadata_not_in_data_table", "dataset_metadata_in_file"], - "metadata_column": ["dataset_metadata_in_data_table", "dataset_metadata_not_in_data_table", "value_in_data_table", "value_not_in_data_table", "dataset_metadata_in_file options"], + "metadata_column": ["dataset_metadata_in_data_table", "dataset_metadata_not_in_data_table", "value_in_data_table", "value_not_in_data_table", "dataset_metadata_in_file"], "line_startswith": ["dataset_metadata_in_file"], - "min": ["in_range", "length"], - "max": ["in_range", "length"], - "exclude_min": ["in_range"], - "exclude_max": ["in_range"], + "min": ["in_range", "length", "dataset_metadata_in_range"], + "max": ["in_range", "length", "dataset_metadata_in_range"], + "exclude_min": ["in_range", "dataset_metadata_in_range"], + "exclude_max": ["in_range", "dataset_metadata_in_range"], "split": ["dataset_metadata_in_file"], "skip": ["metadata"] } @@ -132,7 +132,7 @@ def lint_inputs(tool_xml, lint_ctx): # lint statically defined options if any(['value' not in option.attrib for option in select_options]): lint_ctx.error(f"Select parameter [{param_name}] has option without value") - if len(set([option.text.strip() for option in select_options])) != len(select_options): + if len(set([option.text.strip() for option in select_options if option.text is not None])) != len(select_options): lint_ctx.error(f"Select parameter [{param_name}] has multiple options with the same text content") if len(set([option.attrib.get("value") for option in select_options])) != len(select_options): lint_ctx.error(f"Select parameter [{param_name}] has multiple options with the same value") @@ -161,14 +161,12 @@ def lint_inputs(tool_xml, lint_ctx): lint_ctx.error(f"Parameter [{param_name}]: attribute '{attrib}' is incompatible with validator of type '{vtype}'") if vtype == "expression" and validator.text is None: lint_ctx.error(f"Parameter [{param_name}]: expression validator without content") - if vtype != "expression" and validator.text is not None: + if vtype not in ["expression", "regex"] and validator.text is not None: lint_ctx.warn(f"Parameter [{param_name}]: '{vtype}' validators are not expected to contain text (found '{validator.text}')") - if vtype == "regex" and "expression" not in validator.attrib: - lint_ctx.error(f"Parameter [{param_name}]: '{vtype}' validators need to define an 'expression' attribute") - if vtype in ["in_range", "length", "dataset_metadata_in_range"] and ("min" not in validator.attrib or "max" not in validator.attrib): + if vtype in ["in_range", "length", "dataset_metadata_in_range"] and ("min" not in validator.attrib and "max" not in validator.attrib): lint_ctx.error(f"Parameter [{param_name}]: '{vtype}' validators need to define the 'min' or 'max' attribute(s)") - if vtype in ["metadata"] and ("check" not in validator.attrib or "skip" not in validator.attrib): - lint_ctx.error(f"Parameter [{param_name}]: '{vtype}' validators need to define the 'check' or 'skip' attribute(s)") + if vtype in ["metadata"] and ("check" not in validator.attrib and "skip" not in validator.attrib): + lint_ctx.error(f"Parameter [{param_name}]: '{vtype}' validators need to define the 'check' or 'skip' attribute(s) {validator.attrib}") if vtype in ["value_in_data_table", "value_not_in_data_table", "dataset_metadata_in_data_table", "dataset_metadata_not_in_data_table"] and "table_name" not in validator.attrib: lint_ctx.error(f"Parameter [{param_name}]: '{vtype}' validators need to define the 'table_name' attribute") diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index ac810fd0718a..8882daae2836 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -84,8 +84,6 @@ def process_dataset(data, formats=None): if formats is None: formats = input.formats - # Need to refresh in case this conversion just took place, i.e. input above in tool performed the same conversion - trans.sa_session.refresh(data) direct_match, target_ext, converted_dataset = data.find_conversion_destination(formats) if not direct_match and target_ext: if converted_dataset: @@ -539,6 +537,7 @@ def handle_output(name, output, hidden=None): for name, data in out_data.items(): if name not in child_dataset_names and name not in incoming: # don't add children; or already existing datasets, i.e. async created history.stage_addition(data) + history.add_pending_items(set_output_hid=set_output_hid) # Add all the children to their parents for parent_name, child_name in parent_to_child_pairs: @@ -560,13 +559,16 @@ def handle_output(name, output, hidden=None): if completed_job: job.set_copied_from_job_id(completed_job.id) trans.sa_session.add(job) - # Now that we have a job id, we can remap any outputs if this is a rerun and the user chose to continue dependent jobs + # Remap any outputs if this is a rerun and the user chose to continue dependent jobs # This functionality requires tracking jobs in the database. if app.config.track_jobs_in_database and rerun_remap_job_id is not None: - # We need a flush here and get hids in order to rewrite jobs parameter, - # but remapping jobs should only affect single jobs anyway, so this is not too costly. - history.add_pending_items(set_output_hid=set_output_hid) - trans.sa_session.flush() + # Need to flush here so that referencing outputs by id works + session = trans.sa_session() + try: + session.expire_on_commit = False + session.flush() + finally: + session.expire_on_commit = True self._remap_job_on_rerun(trans=trans, galaxy_session=galaxy_session, rerun_remap_job_id=rerun_remap_job_id, @@ -597,7 +599,6 @@ def handle_output(name, output, hidden=None): else: if flush_job: # Set HID and add to history. - history.add_pending_items(set_output_hid=set_output_hid) job_flush_timer = ExecutionTimer() trans.sa_session.flush() log.info(f"Flushed transaction for job {job.log_str()} {job_flush_timer}") @@ -820,6 +821,7 @@ class OutputCollections: def __init__(self, trans, history, tool, tool_action, input_collections, dataset_collection_elements, on_text, incoming, params, job_params, tags, hdca_tags): self.trans = trans + self.tag_handler = trans.app.tag_handler.create_tag_handler_session() self.history = history self.tool = tool self.tool_action = tool_action diff --git a/lib/galaxy/tools/actions/upload_common.py b/lib/galaxy/tools/actions/upload_common.py index d4fb3a2fc5fd..834cfec7f792 100644 --- a/lib/galaxy/tools/actions/upload_common.py +++ b/lib/galaxy/tools/actions/upload_common.py @@ -232,12 +232,12 @@ def __new_library_upload(trans, cntrller, uploaded_dataset, library_bunch, tag_h sa_session=trans.sa_session) if uploaded_dataset.get('tag_using_filenames', False): tag_from_filename = os.path.splitext(os.path.basename(uploaded_dataset.name))[0] - tag_handler.apply_item_tag(item=ldda, user=trans.user, name='name', value=tag_from_filename) + tag_handler.apply_item_tag(item=ldda, user=trans.user, name='name', value=tag_from_filename, flush=False) tags_list = uploaded_dataset.get('tags', False) if tags_list: for tag in tags_list: - tag_handler.apply_item_tag(item=ldda, user=trans.user, name='name', value=tag) + tag_handler.apply_item_tag(item=ldda, user=trans.user, name='name', value=tag, flush=False) trans.sa_session.add(ldda) if state: @@ -294,12 +294,12 @@ def new_upload(trans, cntrller, uploaded_dataset, library_bunch=None, history=No if library_bunch.tags and not uploaded_dataset.tags: new_tags = tag_handler.parse_tags_list(library_bunch.tags) for tag in new_tags: - tag_handler.apply_item_tag(user=trans.user, item=upload_target_dataset_instance, name=tag[0], value=tag[1]) + tag_handler.apply_item_tag(user=trans.user, item=upload_target_dataset_instance, name=tag[0], value=tag[1], flush=False) else: upload_target_dataset_instance = __new_history_upload(trans, uploaded_dataset, history=history, state=state) if tag_list: - tag_handler.add_tags_from_list(trans.user, upload_target_dataset_instance, tag_list) + tag_handler.add_tags_from_list(trans.user, upload_target_dataset_instance, tag_list, flush=False) return upload_target_dataset_instance diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index 750f6818243c..d45419b7535a 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -111,18 +111,16 @@ def execute_single_job(execution_slice, completed_job): history = execution_slice.history or history jobs_executed += 1 - if execution_slice: - # a side effect of adding datasets to a history is a commit within db_next_hid (even with flush=False). - history.add_pending_items() - else: - # Make sure collections, implicit jobs etc are flushed even if there are no precreated output datasets - trans.sa_session.flush() - if job_datasets: for job, datasets in job_datasets.items(): for dataset_instance in datasets: dataset_instance.dataset.job = job + if execution_slice: + history.add_pending_items() + # Make sure collections, implicit jobs etc are flushed even if there are no precreated output datasets + trans.sa_session.flush() + tool_id = tool.id for job in execution_tracker.successful_jobs: # Put the job in the queue if tracking in memory diff --git a/lib/galaxy/webapps/galaxy/api/dataset_collections.py b/lib/galaxy/webapps/galaxy/api/dataset_collections.py index be12e0080759..e9430705a4f5 100644 --- a/lib/galaxy/webapps/galaxy/api/dataset_collections.py +++ b/lib/galaxy/webapps/galaxy/api/dataset_collections.py @@ -42,7 +42,7 @@ def create(self, trans: ProvidesHistoryContext, payload: dict, **kwd): :type payload: dict :param payload: (optional) dictionary structure containing: - * collection_type: dataset colltion type to create. + * collection_type: dataset collection type to create. * instance_type: Instance type - 'history' or 'library'. * name: the new dataset collections's name * datasets: object describing datasets for collection @@ -57,6 +57,7 @@ def create(self, trans: ProvidesHistoryContext, payload: dict, **kwd): history_id = decode_id(self.app, history_id) history = self.history_manager.get_owned(history_id, trans.user, current_history=trans.history) create_params["parent"] = history + create_params["history"] = history elif instance_type == "library": folder_id = payload.get('folder_id') library_folder = self.get_library_folder(trans, folder_id, check_accessible=True) diff --git a/lib/galaxy/workflow/run_request.py b/lib/galaxy/workflow/run_request.py index 055663d370c1..be275d17ca6d 100644 --- a/lib/galaxy/workflow/run_request.py +++ b/lib/galaxy/workflow/run_request.py @@ -326,11 +326,8 @@ def build_workflow_run_configs(trans, workflow, payload): else: raise exceptions.RequestParameterInvalidException(f"Unknown workflow input source '{input_source}' specified.") if add_to_history and content.history != history: - content = content.copy() - if isinstance(content, app.model.HistoryDatasetAssociation): - history.add_dataset(content) - else: - history.add_dataset_collection(content) + content = content.copy(flush=False) + history.stage_addition(content) input_dict['content'] = content except AssertionError: raise exceptions.ItemAccessibilityException(f"Invalid workflow input '{input_id}' specified") @@ -368,7 +365,7 @@ def build_workflow_run_configs(trans, workflow, payload): valid_option = True if not valid_option: raise exceptions.RequestParameterInvalidException(f"Invalid value for parameter '{name}' found.") - + history.add_pending_items() run_configs.append(WorkflowRunConfig( target_history=history, replacement_dict=payload.get('replacement_params', {}), diff --git a/test/unit/app/jobs/test_job_context.py b/test/unit/app/jobs/test_job_context.py index 6bbacc623310..019ab2e87dfe 100644 --- a/test/unit/app/jobs/test_job_context.py +++ b/test/unit/app/jobs/test_job_context.py @@ -55,6 +55,7 @@ def test_job_context_discover_outputs_flushes_once(mocker): job = model.Job() job.history = h sa_session.add(job) + sa_session.flush() job_working_directory = tempfile.mkdtemp() setup_data(job_working_directory) permission_provider = PermissionProvider() diff --git a/test/unit/app/tools/test_history_imp_exp.py b/test/unit/app/tools/test_history_imp_exp.py index 6d0bf81ecd42..b35cbeddfce4 100644 --- a/test/unit/app/tools/test_history_imp_exp.py +++ b/test/unit/app/tools/test_history_imp_exp.py @@ -535,6 +535,7 @@ def test_export_copied_objects_copied_outside_history(): other_h = model.History(name=h.name + "-other", user=h.user) sa_session.add(other_h) + sa_session.flush() hc3 = hc2.copy(element_destination=other_h) other_h.add_pending_items() diff --git a/test/unit/data/model/test_model_store.py b/test/unit/data/model/test_model_store.py index 7e67f5c84194..048e54f2abb1 100644 --- a/test/unit/data/model/test_model_store.py +++ b/test/unit/data/model/test_model_store.py @@ -228,10 +228,9 @@ def test_import_export_edit_collection(): sa_session.add(hc1) sa_session.add(h) - sa_session.flush() - import_history = model.History(name="Test History for Import", user=u) sa_session.add(import_history) + sa_session.flush() temp_directory = mkdtemp() with store.DirectoryModelExportStore(temp_directory, app=app, for_edit=True) as export_store: @@ -411,6 +410,7 @@ def _setup_simple_export(export_kwds): import_history = model.History(name="Test History for Import", user=u) sa_session.add(import_history) + sa_session.flush() temp_directory = mkdtemp() with store.DirectoryModelExportStore(temp_directory, app=app, **export_kwds) as export_store: diff --git a/test/unit/tool_util/test_tool_linters.py b/test/unit/tool_util/test_tool_linters.py index 88dbbc19e03a..ee27e371c53c 100644 --- a/test/unit/tool_util/test_tool_linters.py +++ b/test/unit/tool_util/test_tool_linters.py @@ -146,6 +146,51 @@ """ +VALIDATOR_CORRECT = """ + +""" + # check that linter accepts format source for collection elements as means to specify format # and that the linter warns if format and format_source are used OUTPUTS_COLLECTION_FORMAT_SOURCE = """ @@ -246,8 +291,11 @@ and "Parameter [param_name]: validator with an incompatible type 'in_range'" in x.error_messages and "Parameter [param_name]: 'in_range' validators need to define the 'min' or 'max' attribute(s)" in x.error_messages and "Parameter [param_name]: attribute 'filename' is incompatible with validator of type 'regex'" in x.error_messages - and "Parameter [param_name]: 'regex' validators need to define an 'expression' attribute" in x.error_messages - and len(x.warn_messages) == 1 and len(x.error_messages) == 4 + and len(x.warn_messages) == 1 and len(x.error_messages) == 3 + ), + ( + VALIDATOR_CORRECT, inputs.lint_inputs, + lambda x: len(x.warn_messages) == 0 and len(x.error_messages) == 0 ), ( OUTPUTS_COLLECTION_FORMAT_SOURCE, outputs.lint_output, @@ -272,6 +320,7 @@ 'select deprecations', 'select option definitions', 'validator imcompatibilities', + 'validator all correct', 'outputs collection static elements with format_source', 'outputs discover datatsets with tool provided metadata' ]