diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index fce05a75cdfa..b21532214e67 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -21,12 +21,11 @@ ARCHTTPError = None NoValueInARCResult = None -from galaxy.util import unicodify -from galaxy.jobs.command_factory import build_command from galaxy.job_execution.compute_environment import ( ComputeEnvironment, dataset_path_to_extra_path, ) +from galaxy.util import unicodify log = logging.getLogger(__name__) @@ -64,21 +63,27 @@ def __init__(self): "Job not found": "Failed", } + class ARCComputeEnvironment(ComputeEnvironment): + """NB! This is just blunt copy-paste and simplification from pulsar runner. + Many things are not used, and will not work properly. + Currently just using input-path rewrite""" + def __init__(self, job_wrapper): self.job_wrapper = job_wrapper - + self.local_path_config = job_wrapper.default_compute_environment() - + self.path_rewrites_input_extra = {} self._working_directory = "." + self._config_directory = "." self._home_directory = "" self._tool_dir = "" self._tmp_directory = "" self._shared_home_dir = "" + self._sep = "" self._version_path = "" - - + def output_names(self): # Maybe this should use the path mapper, but the path mapper just uses basenames return self.job_wrapper.job_io.get_output_basenames() @@ -88,49 +93,48 @@ def input_path_rewrite(self, dataset): ARC Jobs run in the ARC remote compute clusters workdir - not known to Galaxy at this point. But all input-files are all uploaded (by ARC) to this workdir, so a simple relative path will work for all ARC jobs """ - return f'{str(self._working_directory)}/{str(dataset.get_display_name())}' - + return f"{str(self._working_directory)}/{str(dataset.get_display_name())}" + def output_path_rewrite(self, dataset): """ ARC Jobs run in the ARC remote compute clusters workdir - not known to Galaxy at this point. But all outputfiles are created in this workdir, so a simple relative path will work for all ARC jobs """ - #return f'{str(self._working_directory)}/{str(dataset.get_file_name())}' - return f'{str(dataset.get_file_name())}' + # return f'{str(self._working_directory)}/{str(dataset.get_file_name())}' + return f"{str(dataset.get_file_name())}" - def input_extra_files_rewrite(self, dataset): - """ TODO - find out what this is and if I need it """ + """TODO - find out what this is and if I need it""" input_path_rewrite = self.input_path_rewrite(dataset) remote_extra_files_path_rewrite = dataset_path_to_extra_path(input_path_rewrite) self.path_rewrites_input_extra[dataset.extra_files_path] = remote_extra_files_path_rewrite return remote_extra_files_path_rewrite def output_extra_files_rewrite(self, dataset): - """ TODO - find out what this is and if I need it """ + """TODO - find out what this is and if I need it""" output_path_rewrite = self.output_path_rewrite(dataset) remote_extra_files_path_rewrite = dataset_path_to_extra_path(output_path_rewrite) return remote_extra_files_path_rewrite def input_metadata_rewrite(self, dataset, metadata_val): - """ TODO - find out what this is and if I need it """ + """TODO - find out what this is and if I need it""" return None def unstructured_path_rewrite(self, parameter_value): - """ TODO - find out what this is and if I need it """ + """TODO - find out what this is and if I need it""" return self._working_directory def working_directory(self): return self._working_directory def env_config_directory(self): - return self.config_directory() + return self._config_directory def config_directory(self): return self._config_directory def new_file_path(self): - return self.working_directory() # Problems with doing this? + return self._working_directory def sep(self): return self._sep @@ -152,8 +156,8 @@ def galaxy_url(self): def get_file_sources_dict(self): return self.job_wrapper.job_io.file_sources_dict - - + + class ArcRESTJobRunner(AsynchronousJobRunner): """ Job runner backed by a finite pool of worker threads. FIFO scheduling @@ -175,10 +179,9 @@ def __init__(self, app, nworkers, **kwargs): self.arcjob = None self.provider_backend = provider_name_to_backend("wlcg") - def queue_job(self, job_wrapper): """When a tool is submitted for execution in galaxy""" - """ This method + """ This method 1. Fetches the configured ARC endpoint for this user 2. Prepares an ARC job description based on the jobs destination parameters 3. Submits the job to the remote ARC endpoint via pyarcrest @@ -188,14 +191,14 @@ def queue_job(self, job_wrapper): job_destination = job_wrapper.job_destination job_id = job_wrapper.job_id - """ Build the command line - needs a rewrite of input-paths as ARC is a remote cluster """ - if not self.prepare_job(job_wrapper, - include_metadata=False, - include_work_dir_outputs=False, - modify_command_for_container=False, - stream_stdout_stderr=False - ): + if not self.prepare_job( + job_wrapper, + include_metadata=False, + include_work_dir_outputs=False, + modify_command_for_container=False, + stream_stdout_stderr=False, + ): return """ prepare_job() calls prepare() but not allowing to pass a compute_environment object As I need to define my own compute_environment for the remote compute I must call it here passing the compute_environment @@ -214,12 +217,11 @@ def queue_job(self, job_wrapper): log.exception("(%s) Failure preparing job", job_id) job_wrapper.fail(unicodify(e), exception=True) return - + if not job_wrapper.runner_command_line: job_wrapper.finish("", "") return - - + """ Set the ARC endpoint url to submit the job to - extracted from the job_destination parameters in job_conf.xml """ user_preferences = job_wrapper.get_job().user.extra_preferences self.arc = Arc() @@ -227,7 +229,7 @@ def queue_job(self, job_wrapper): """ Prepare and submit job to arc """ arc_job = self.prepare_job_arc(job_wrapper) - + token = job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)["access"] self.arcrest = get_client(self.arc.url, token=token) @@ -366,8 +368,10 @@ def check_watched_item(self, job_state): galaxy_job_wrapper.change_state(model.Job.states.OK) galaxy_outputdir = galaxy_workdir + "/working" - #self.arcrest.downloadJobFiles(galaxy_outputdir, [arc_job_id]) - self.arcrest.downloadJobFiles(galaxy_outputdir, [arc_job_id],outputFilters={f"{arc_job_id}":'(?!user.proxy$)'}) + # self.arcrest.downloadJobFiles(galaxy_outputdir, [arc_job_id]) + self.arcrest.downloadJobFiles( + galaxy_outputdir, [arc_job_id], outputFilters={f"{arc_job_id}": "(?!user.proxy$)"} + ) self.place_output_files(job_state, mapped_state) self.mark_as_finished(job_state) @@ -471,21 +475,13 @@ def recover(self, job, job_wrapper): job_wrapper.command_line = job.command_line ajs.job_wrapper = job_wrapper if job.state == model.Job.states.RUNNING: - log.debug( - "({}/{}) is still in running state, adding to the god queue".format( - job.id, ajs.job_id - ) - ) + log.debug("({}/{}) is still in running state, adding to the god queue".format(job.id, ajs.job_id)) ajs.old_state = "R" ajs.running = True self.monitor_queue.put(ajs) elif job.state == model.Job.states.QUEUED: - log.debug( - "({}/{}) is still in god queued state, adding to the god queue".format( - job.id, ajs.job_id - ) - ) + log.debug("({}/{}) is still in god queued state, adding to the god queue".format(job.id, ajs.job_id)) ajs.old_state = "Q" ajs.running = False self.monitor_queue.put(ajs) @@ -554,22 +550,21 @@ def prepare_job_arc(self, job_wrapper): """ The job_wrapper.job_destination has access to the parameters from the id=arc destination configured in the job_conf """ galaxy_job = job_wrapper.get_job() - """ - Organize the galaxy jobs input-files into executables, input- and output-files - The ARC job description expects a different format for executables compared to other input files. - - TODO: Use the command-builder to extract the executable command instead of using an executable file uploaded to Galaxy. - TODO: Extend to support fuller ARC job description options - such as ARC runtimeenvironment that inform the ARC client about what capabilities the endpoint has. + """ + Organize the galaxy jobs input-files into executables, input- and output-files + The ARC job description expects a different format for executables compared to other input files. + TODO: Use the command-builder to extract the executable command instead of using an executable file uploaded to Galaxy. + TODO: Extend to support fuller ARC job description options - such as ARC runtimeenvironment that inform the ARC client about what capabilities the endpoint has. e.g. what software is installed. """ arc_job = ARCJobBuilder() - """ - These are the files that are uploaded by the user for this job - file_source: is the file path in the galaxy data folder, + """ + These are the files that are uploaded by the user for this job + file_source: is the file path in the galaxy data folder, file_realname: the filename the uploaded file had - tool_input_tag: - the tools form input name + tool_input_tag: - the tools form input name """ input_datasets = galaxy_job.get_input_datasets() @@ -578,25 +573,22 @@ def prepare_job_arc(self, job_wrapper): tool_input_tag = input_data.name file_realname = input_data.dataset.get_display_name() - if 'arcjob_remote_filelist' in tool_input_tag: - log.debug(f'==MAIKEN== arcjob_remote_filelist') - """ Demo gymnastics just to show how ARC can handle fetching remote files - this needs to be discussed how to achieve in Galaxy - relies on the hello_arc.xml tool """ - with open(file_source,'r') as f: + if "arcjob_remote_filelist" in tool_input_tag: + """Demo gymnastics just to show how ARC can handle fetching remote files - this needs to be discussed how to achieve in Galaxy - relies on the hello_arc.xml tool""" + with open(file_source) as f: files = f.readlines() - for idx,file_url in enumerate(files): - file_n_xrls = f'remote_file_{idx}' + for idx, file_url in enumerate(files): + file_n_xrls = f"remote_file_{idx}" arc_job.inputs[file_n_xrls] = file_url.strip() else: - """ Example of file local to the Galaxy server """ + """Example of file local to the Galaxy server""" arc_job.inputs[file_realname] = "file://" + file_source - - """ Need also to upload the Executable produced by Galaxy - the tool_script.sh """ file_realname = "tool_script.sh" file_source = "file://" + job_wrapper.working_directory + "/" + file_realname arc_job.inputs[file_realname] = file_source - + """ Use the tool_script.sh created by Galaxy as the executable to run """ arc_job.exe_path = "./" + file_realname @@ -612,10 +604,10 @@ def prepare_job_arc(self, job_wrapper): arc_cpuhrs = str(job_wrapper.job_destination.params["arc_cpuhrs"]) arc_mem = str(job_wrapper.job_destination.params["arc_mem"]) - """ - TODO- should probably not be Hard-coded - the user should him/herself enter what oout and err files - that the executable produces + """ + TODO- should probably not be Hard-coded + the user should him/herself enter what oout and err files + that the executable produces """ std_out = "arc.out" std_err = "arc.err" @@ -634,10 +626,10 @@ def prepare_job_arc(self, job_wrapper): arc_job.memory = arc_mem """ Populate the arcjob object with rest of necessary and useful fields including the full job description string""" - """ All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list - - here it is just the folder / and all files in the folder will be downloaded. + """ All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list - + here it is just the folder / and all files in the folder will be downloaded. The arc.py in pyarcrest loops over this list to fetch all outputfiles """ arc_job.descrstr = arc_job.to_xml_str() - log.debug(f'{arc_job.descrstr}') + log.debug(f"{arc_job.descrstr}") return arc_job diff --git a/lib/galaxy/jobs/runners/util/arc_util.py b/lib/galaxy/jobs/runners/util/arc_util.py index b568203995b2..deee2c445e98 100644 --- a/lib/galaxy/jobs/runners/util/arc_util.py +++ b/lib/galaxy/jobs/runners/util/arc_util.py @@ -28,6 +28,7 @@ log = logging.getLogger(__name__) + def ensure_pyarc() -> None: if ARCHTTPError is None: raise Exception( @@ -73,16 +74,15 @@ def to_xml_str(self) -> str: for file_name, file_remote_source in self.inputs.items(): sub_el = SubElement(datastaging, "InputFile") SubElement(sub_el, "Name").text = file_name - if 'file://' not in file_remote_source: - """ Only supply url for remote files, not local ones - local ones are handled by the ARC client on the client side (Galaxy)""" - source_el = SubElement(sub_el,"Source") - SubElement(source_el,"URI").text = file_remote_source - + if "file://" not in file_remote_source: + """Only supply url for remote files, not local ones - local ones are handled by the ARC client on the client side (Galaxy)""" + source_el = SubElement(sub_el, "Source") + SubElement(source_el, "URI").text = file_remote_source + for file_name in self.outputs: sub_el = SubElement(datastaging, "OutputFile") SubElement(sub_el, "Name").text = file_name """----------- End datastaging ---------""" - SubElement(resources, "IndividualCPUTime").text = self.cpu_time SubElement(resources, "IndividualPhysicalMemory").text = self.memory