From 678dd76680f5c34ed8a1cb14b332864ef6476490 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Wed, 13 Sep 2023 14:13:14 +0200 Subject: [PATCH] Hopefully fixed all linting errors now --- lib/galaxy/jobs/runners/arc.py | 50 +++++++++++++++--------- lib/galaxy/jobs/runners/util/arc_util.py | 8 ++-- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index 841a1f342291..f2f8122acf6f 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -8,11 +8,18 @@ ) from galaxy.jobs.runners.util.arc_util import ( ARCJobBuilder, - ARCHTTPError, - NoValueInARCResult, ensure_pyarc, get_client, ) +try: + from pyarcrest.errors import ( + ARCHTTPError, + NoValueInARCResult, + ) +except ImportError: + ARCHTTPError = None + NoValueInARCResult = None + from galaxy.util import unicodify log = logging.getLogger(__name__) @@ -52,8 +59,6 @@ def __init__(self): } - - class ArcRESTJobRunner(AsynchronousJobRunner): """ Job runner backed by a finite pool of worker threads. FIFO scheduling @@ -75,7 +80,6 @@ def __init__(self, app, nworkers, **kwargs): self.arcjob = None self.provider_backend = provider_name_to_backend("wlcg") - def queue_job(self, job_wrapper): """When a tool is submitted for execution in galaxy""" """ This method @@ -117,7 +121,9 @@ def queue_job(self, job_wrapper): # successful submission arc_jobid, status = results[0] job_wrapper.set_external_id(arc_jobid) - log.debug(f"Successfully submitted job to remote ARC resource {self.arc.url} with ARC id: {arc_jobid}job_wrapper.external_job_id: {job_wrapper.get_job().job_runner_external_id} job_wrapper.get_job().get-job_runner_external_id(): {job_wrapper.get_job().get_job_runner_external_id()}") + log.debug( + f"Successfully submitted job to remote ARC resource {self.arc.url} with ARC id: {arc_jobid}job_wrapper.external_job_id: {job_wrapper.get_job().job_runner_external_id} job_wrapper.get_job().get-job_runner_external_id(): {job_wrapper.get_job().get_job_runner_external_id()}" + ) # beware! this means 1 worker, no timeout and default upload buffer errors = self.arcrest.uploadJobFiles([arc_jobid], [arc_job.inputs]) if errors[0]: # input upload error @@ -204,6 +210,13 @@ def check_watched_item(self, job_state): galaxy_workdir = galaxy_job_wrapper.working_directory mapped_state = "" + + """ Set the ARC endpoint url to submit the job to - extracted from the job_destination parameters in job_conf.xml """ + user_preferences = galaxy_job_wrapper.get_job().user.extra_preferences + self.arc = Arc() + self.arc.url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") + + """ Make sure to get a fresh token and client """ token = self._get_token(galaxy_job_wrapper) self.arcrest = get_client(self.arc.url, token=token) @@ -295,11 +308,16 @@ def stop_job(self, job_wrapper): arc_jobid = job_wrapper.get_job().job_runner_external_id + """ Set the ARC endpoint url to submit the job to - extracted from the job_destination parameters in job_conf.xml """ + user_preferences = job_wrapper.get_job().user.extra_preferences + self.arc = Arc() + self.arc.url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") + + """ Make sure to get a fresh token and client """ token = self._get_token(job_wrapper) self.arcrest = get_client(self.arc.url, token=token) - """ Get the current ARC job status from the remote ARC endpoint """ arc_job_state = self.arcrest.getJobsStatus([arc_jobid])[0] if arc_job_state is None: @@ -437,7 +455,7 @@ def prepare_job(self, job_wrapper): TODO: Extend to support fuller ARC job description options - such as ARC runtimeenvironment that inform the ARC client about what capabilities the endpoint has. e.g. what software is installed. """ - + arc_job = ARCJobBuilder() """ @@ -447,19 +465,17 @@ def prepare_job(self, job_wrapper): tool_input_tag: - the tools form input name """ input_datasets = galaxy_job.get_input_datasets() - + for input_data in input_datasets: - file_source = input_data.dataset.get_file_name() tool_input_tag = input_data.name file_realname = input_data.dataset.get_display_name() - + arc_job.inputs[file_realname] = "file://" + file_source - + """ This is just for the ARC test-tool, will not be used in the final version using generic tools. """ if "exe" in tool_input_tag: arc_job.exe_path = "./" + file_realname - """ Potentially more than one file - but currently actually only one, so the for-loop here is currently not actually needed """ output_datasets = galaxy_job.get_output_datasets() @@ -468,7 +484,6 @@ def prepare_job(self, job_wrapper): file_name = output_data.name arc_job.outputs.append(file_name) - """ Fetch the other job description items from the ARC destination """ arc_cpuhrs = str(job_input_params["arcjob_cpuhrs"]) arc_mem = str(job_input_params["arcjob_memory"]) @@ -481,15 +496,13 @@ def prepare_job(self, job_wrapper): std_out = "arc.out" std_err = "arc.err" - """ This is hardcoded stdout and stderr files from the ARC job defined here in the runner - TODO - not hardcoded """ arc_job.stdout = std_out arc_job.stderr = std_err - + """ Construct the job description xml object """ arc_job.name = "galaxy_arc_hello_test" - """ TODO - just a sample, this will probably be set by the destination itself - to be discussed """ arc_job.cpu_time = arc_cpuhrs @@ -500,7 +513,6 @@ def prepare_job(self, job_wrapper): """ All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list - here it is just the folder / and all files in the folder will be downloaded. The arc.py in pyarcrest loops over this list to fetch all outputfiles """ - arc_job.descstr = arc_job.to_xml_str() - + arc_job.descrstr = arc_job.to_xml_str() return arc_job diff --git a/lib/galaxy/jobs/runners/util/arc_util.py b/lib/galaxy/jobs/runners/util/arc_util.py index dd9fc17aff18..0ec82f1e5f82 100644 --- a/lib/galaxy/jobs/runners/util/arc_util.py +++ b/lib/galaxy/jobs/runners/util/arc_util.py @@ -1,6 +1,6 @@ from typing import ( - List, Dict, + List, ) from xml.etree.ElementTree import ( Element, @@ -17,8 +17,8 @@ from pyarcrest.errors import ( ARCHTTPError, NoValueInARCResult, - ) - + ) + except ImportError: ARCHTTPError = None NoValueInARCResult = None @@ -48,7 +48,7 @@ class ARCJobBuilder: inputs: Dict[str, str] = {} outputs: List[str] = [] descrstr: str - + def to_xml_str(self) -> str: descr = Element("ActivityDescription") descr.set("xmlns", "http://www.eu-emi.eu/es/2010/12/adl")