Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[23.1] Fix metadata setting in extended metadata + outputs_to_working_directory mode #16678

Merged
merged 10 commits into from
Sep 13, 2023
2 changes: 1 addition & 1 deletion .github/workflows/converter_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
- name: Run tests
run: |
mapfile -t TOOL_ARRAY < tool_list.txt
planemo test --galaxy_python_version ${{ matrix.python-version }} --galaxy_root 'galaxy root' "${TOOL_ARRAY[@]}"
planemo test --biocontainers --galaxy_python_version ${{ matrix.python-version }} --galaxy_root 'galaxy root' "${TOOL_ARRAY[@]}"
- uses: actions/upload-artifact@v3
if: failure()
with:
Expand Down
80 changes: 51 additions & 29 deletions lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

More information on Pulsar can be found at https://pulsar.readthedocs.io/ .
"""

import copy
import errno
import logging
import os
Expand All @@ -12,6 +12,7 @@
from typing import (
Any,
Dict,
Optional,
)

import pulsar.core
Expand Down Expand Up @@ -54,7 +55,6 @@
specs,
string_as_bool_or_none,
)
from galaxy.util.bunch import Bunch

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -466,7 +466,7 @@ def __prepare_job(self, job_wrapper, job_destination):
command_line = None
client = None
remote_job_config = None
compute_environment = None
compute_environment: Optional[PulsarComputeEnvironment] = None
remote_container = None

fail_or_resubmit = False
Expand All @@ -492,7 +492,13 @@ def __prepare_job(self, job_wrapper, job_destination):
self.__prepare_input_files_locally(job_wrapper)
remote_metadata = PulsarJobRunner.__remote_metadata(client)
dependency_resolution = PulsarJobRunner.__dependency_resolution(client)
metadata_kwds = self.__build_metadata_configuration(client, job_wrapper, remote_metadata, remote_job_config)
metadata_kwds = self.__build_metadata_configuration(
client,
job_wrapper,
remote_metadata,
remote_job_config,
compute_environment=compute_environment,
)
remote_working_directory = remote_job_config["working_directory"]
remote_job_directory = os.path.abspath(os.path.join(remote_working_directory, os.path.pardir))
remote_tool_directory = os.path.abspath(os.path.join(remote_job_directory, "tool_files"))
Expand Down Expand Up @@ -896,45 +902,61 @@ def __use_remote_datatypes_conf(pulsar_client):
def __rewrite_parameters(pulsar_client):
return string_as_bool_or_none(pulsar_client.destination_params.get("rewrite_parameters", False)) or False

def __build_metadata_configuration(self, client, job_wrapper, remote_metadata, remote_job_config):
metadata_kwds = {}
if remote_metadata and not job_wrapper.use_metadata_binary:
remote_system_properties = remote_job_config.get("system_properties", {})
remote_galaxy_home = remote_system_properties.get("galaxy_home", None)
if not remote_galaxy_home:
raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE)
metadata_kwds["exec_dir"] = remote_galaxy_home
outputs_directory = remote_job_config["outputs_directory"]
def __build_metadata_configuration(
self,
client,
job_wrapper,
remote_metadata,
remote_job_config,
compute_environment: Optional["PulsarComputeEnvironment"] = None,
):
metadata_kwds: Dict[str, Any] = {}
if remote_metadata:
working_directory = remote_job_config["working_directory"]
metadata_directory = remote_job_config["metadata_directory"]
# For metadata calculation, we need to build a list of of output
# file objects with real path indicating location on Galaxy server
# and false path indicating location on compute server. Since the
# Pulsar disables from_work_dir copying as part of the job command
# line we need to take the list of output locations on the Pulsar
# server (produced by self.get_output_files(job_wrapper)) and for
# server (produced by job_wrapper.job_io.get_output_fnames() and for
# each work_dir output substitute the effective path on the Pulsar
# server relative to the remote working directory as the
# false_path to send the metadata command generation module.
work_dir_outputs = self.get_work_dir_outputs(job_wrapper, tool_working_directory=working_directory)
outputs = [
Bunch(false_path=os.path.join(outputs_directory, os.path.basename(path)), real_path=path)
for path in self.get_output_files(job_wrapper)
]
for output in outputs:
for pulsar_workdir_path, real_path in work_dir_outputs:
if real_path == output.real_path:
output.false_path = pulsar_workdir_path
metadata_kwds["output_fnames"] = outputs
metadata_kwds["compute_tmp_dir"] = metadata_directory
metadata_kwds["config_root"] = remote_galaxy_home
default_config_file = os.path.join(remote_galaxy_home, "config/galaxy.ini")
metadata_kwds["config_file"] = remote_system_properties.get("galaxy_config_file", default_config_file)
metadata_kwds["dataset_files_path"] = remote_system_properties.get("galaxy_dataset_files_path", None)
outputs = job_wrapper.job_io.get_output_fnames()
# copy fixes 'test/integration/test_pulsar_embedded_remote_metadata.py::test_tools[job_properties]'
real_path_to_output = {o.real_path: copy.copy(o) for o in outputs}
rewritten_outputs = []
for pulsar_workdir_path, real_path in work_dir_outputs:
work_dir_output = real_path_to_output.pop(real_path, None)
if work_dir_output:
work_dir_output.false_path = pulsar_workdir_path
rewritten_outputs.append(work_dir_output)

for output in real_path_to_output.values():
if compute_environment:
output.false_path = compute_environment.path_mapper.remote_output_path_rewrite(str(output))
rewritten_outputs.append(output)

metadata_kwds["output_fnames"] = rewritten_outputs
remote_system_properties = remote_job_config.get("system_properties", {})
remote_galaxy_home = remote_system_properties.get("galaxy_home")
if not job_wrapper.use_metadata_binary:
if not remote_galaxy_home:
raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE)
metadata_kwds["exec_dir"] = remote_galaxy_home
metadata_kwds["compute_tmp_dir"] = metadata_directory
metadata_kwds["config_root"] = remote_galaxy_home
default_config_file = os.path.join(remote_galaxy_home, "config/galaxy.ini")
metadata_kwds["config_file"] = remote_system_properties.get("galaxy_config_file", default_config_file)
metadata_kwds["dataset_files_path"] = remote_system_properties.get("galaxy_dataset_files_path", None)
if PulsarJobRunner.__use_remote_datatypes_conf(client):
remote_datatypes_config = remote_system_properties.get("galaxy_datatypes_config_file", None)
remote_datatypes_config = remote_system_properties.get("galaxy_datatypes_config_file")
if not remote_datatypes_config:
log.warning(NO_REMOTE_DATATYPES_CONFIG)
if not remote_galaxy_home:
raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE)
remote_datatypes_config = os.path.join(remote_galaxy_home, "datatypes_conf.xml")
metadata_kwds["datatypes_config"] = remote_datatypes_config
else:
Expand Down
23 changes: 13 additions & 10 deletions lib/galaxy/metadata/set_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ def reset_external_filename(dataset_instance: DatasetInstance):
dataset_instance.dataset.extra_files_path = None


def push_if_necessary(object_store: ObjectStore, dataset: DatasetInstance, external_filename):
# Here we might be updating a disk based objectstore when outputs_to_working_directory is used,
# or a remote object store from its cache path.
# empty files could happen when outputs are discovered from working dir,
# empty file check needed for e.g. test/integration/test_extended_metadata_outputs_to_working_directory.py::test_tools[multi_output_assign_primary]
if os.path.getsize(external_filename):
object_store.update_from_file(dataset.dataset, file_name=external_filename, create=True)


def set_validated_state(dataset_instance):
datatype_validation = validate(dataset_instance)

Expand Down Expand Up @@ -399,9 +408,7 @@ def set_meta(new_dataset_instance, file_dict):
if not link_data_only:
# Only set external filename if we're dealing with files in job working directory.
# Fixes link_data_only uploads
if not object_store:
# overriding the external filename would break pushing to object stores
dataset.dataset.external_filename = external_filename
dataset.dataset.external_filename = external_filename
# We derive extra_files_dir_name from external_filename, because OutputsToWorkingDirectoryPathRewriter
# always rewrites the path to include the uuid, even if store_by is set to id, and the extra files
# rewrite is derived from the dataset path (since https://github.com/galaxyproject/galaxy/pull/16541).
Expand Down Expand Up @@ -437,15 +444,11 @@ def set_meta(new_dataset_instance, file_dict):
if not object_store or not export_store:
# Can't happen, but type system doesn't know
raise Exception("object_store not built")
if not is_deferred and not link_data_only and os.path.getsize(external_filename):
# Here we might be updating a disk based objectstore when outputs_to_working_directory is used,
# or a remote object store from its cache path.
if not is_deferred and not link_data_only:
object_store_update_actions.append(
partial(
object_store.update_from_file, dataset.dataset, file_name=external_filename, create=True
)
partial(push_if_necessary, object_store, dataset, external_filename)
)
object_store_update_actions.append(partial(reset_external_filename, dataset))
object_store_update_actions.append(partial(reset_external_filename, dataset))
object_store_update_actions.append(partial(export_store.add_dataset, dataset))
if dataset_instance_id not in unnamed_id_to_path:
object_store_update_actions.append(partial(collect_extra_files, object_store, dataset, "."))
Expand Down
1 change: 1 addition & 0 deletions test/functional/tools/from_work_dir_glob.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ echo "hi" > output1.txt
<assert_contents>
<has_text text="hi" />
</assert_contents>
<metadata name="data_lines" value="1" />
<!-- This does not work with the default __copy_if_exists mechanism
<metadata name="created_from_basename" value="output1.txt" />
-->
Expand Down
4 changes: 3 additions & 1 deletion test/functional/tools/job_properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
<tests>
<test expect_exit_code="0">
<param name="thebool" value="true" />
<output name="out_file1" file="simple_line.txt" />
<output name="out_file1" file="simple_line.txt" >
<metadata name="data_lines" value="1" />
</output>
<assert_command>
<has_text text="really" />
</assert_command>
Expand Down
1 change: 1 addition & 0 deletions test/integration/test_extended_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from galaxy_test.driver import integration_util

TEST_TOOL_IDS = [
"from_work_dir_glob",
"job_properties",
"multi_output",
"multi_output_configured",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class ExtendedMetadataOutputsToWorkingDirIntegrationInstance(ExtendedMetadataInt
def handle_galaxy_config_kwds(cls, config):
config["metadata_strategy"] = "extended"
config["object_store_store_by"] = "uuid"
config["outpus_to_working_dir"] = True
config["outputs_to_working_directory"] = True
config["retry_metadata_internally"] = False


Expand Down