From 3c2c67ffe9f4f79eba9673fea3cfe3664e6d650b Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Fri, 1 Sep 2023 14:12:16 +0200 Subject: [PATCH 01/21] Update conditional-requirements.txt Adding pyarcrest conditional requirement for the ArcRESTJobRunner --- lib/galaxy/dependencies/conditional-requirements.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/galaxy/dependencies/conditional-requirements.txt b/lib/galaxy/dependencies/conditional-requirements.txt index 6c7a0cc38f35..899bf24cca49 100644 --- a/lib/galaxy/dependencies/conditional-requirements.txt +++ b/lib/galaxy/dependencies/conditional-requirements.txt @@ -37,6 +37,9 @@ chronos-python==1.2.1 # Kubernetes job runner pykube==0.15.0 +# ARC job runner +pyarcrest==0.1 + # Synnefo / Pithos+ object store client kamaki From 54770d13ef446431bbbc8609c6a677b690b1c7d4 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Fri, 1 Sep 2023 14:13:32 +0200 Subject: [PATCH 02/21] Update __init__.py ArcRESTJobRunner --- lib/galaxy/dependencies/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/galaxy/dependencies/__init__.py b/lib/galaxy/dependencies/__init__.py index eb484b9dcbdf..026969d00c02 100644 --- a/lib/galaxy/dependencies/__init__.py +++ b/lib/galaxy/dependencies/__init__.py @@ -208,6 +208,9 @@ def check_pbs_python(self): def check_pykube(self): return "galaxy.jobs.runners.kubernetes:KubernetesJobRunner" in self.job_runners or which("kubectl") + def check_pyarcrest(self): + return "galaxy.jobs.runners.arc:ArcRESTJobRunner" in self.job_runners + def check_chronos_python(self): return "galaxy.jobs.runners.chronos:ChronosJobRunner" in self.job_runners From bffcef8acd02df3f6f997cb5695ebb3037838771 Mon Sep 17 00:00:00 2001 From: Matthias Bernt Date: Thu, 31 Aug 2023 09:44:38 +0200 Subject: [PATCH 03/21] explicitly document default of multiple --- lib/galaxy/tool_util/xsd/galaxy.xsd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/galaxy/tool_util/xsd/galaxy.xsd b/lib/galaxy/tool_util/xsd/galaxy.xsd index 69c88f4883ba..da3abb1dce55 100644 --- a/lib/galaxy/tool_util/xsd/galaxy.xsd +++ b/lib/galaxy/tool_util/xsd/galaxy.xsd @@ -3646,7 +3646,7 @@ drop-down menu select list. Used only when the ``type`` attribute value is Allow multiple values to be selected. ``select`` parameters with ``multiple="true"`` are optional by default. Used only when the ``type`` attribute value is ``data``, ``group_tag``, -or ``select``. +or ``select``. Default is ``false`` From 1dc557e8d172554ac9ed2a1230d10004c0352856 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Wed, 6 Sep 2023 18:15:08 +0200 Subject: [PATCH 04/21] Adding new job runner - the ARC job runner --- lib/galaxy/jobs/runners/arc.py | 593 +++++++++++++++++++++++++++++++++ 1 file changed, 593 insertions(+) create mode 100644 lib/galaxy/jobs/runners/arc.py diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py new file mode 100644 index 000000000000..9f16a8d5bdb8 --- /dev/null +++ b/lib/galaxy/jobs/runners/arc.py @@ -0,0 +1,593 @@ +import os, sys +import json +import logging +import time +from datetime import datetime +import shutil + +import requests +from galaxy.authnz.util import provider_name_to_backend + +from galaxy import model +from galaxy.jobs.runners import ( + AsynchronousJobRunner, + AsynchronousJobState +) +from galaxy.util import unicodify + +from xml.etree.ElementTree import Element, SubElement, tostring, fromstring + +from pyarcrest.common import getNullLogger +from pyarcrest.arc import ARCJob, ARCRest, ARCRest_1_1 +from pyarcrest.errors import (ARCError, ARCHTTPError) + +log = logging.getLogger(__name__) + +__all__ = ('ArcRESTJobRunner', ) + +class Arc: + """ + API parameters + """ + + def __init__(self): + + self.cluster = "" + self.job_mapping = {} + + self.ARC_STATE_MAPPING = { + "ACCEPTING": "Accepted", + "Accepted": "Accepted", + "ACCEPTED": "Accepted", + "PREPARING": "Preparing", + "PREPARED": "Preparing", + "SUBMITTING": "Submitting", + "QUEUING": "Queuing", + "RUNNING": "Running", + "HELD": "Hold", + "EXITINGLRMS": "Running", + "OTHER": "Other", + "EXECUTED": "Running", + "FINISHING": "Finishing", + "FINISHED": "Finished", + "FAILED": "Failed", + "KILLING": "Killing", + "KILLED": "Killed", + "WIPED": "Deleted", + "None": "Failed", + "Job not found": "Failed" + } + + + def set_job_cluster(self,cluster): + self.cluster = cluster + + +class ArcRESTJobRunner(AsynchronousJobRunner): + """ + Job runner backed by a finite pool of worker threads. FIFO scheduling + """ + runner_name = "ArcRESTJobRunner" + + def __init__(self, app, nworkers, **kwargs): + """ 1: Get runner_param_specs from job_conf.xml + 2: Initialise job runner parent object + 3: Start the worker and monitor threads + """ + + # Start the job runner parent object + super(ArcRESTJobRunner,self).__init__(app, nworkers, **kwargs) + + self.arc = Arc() + self.arcjob = ARCJob() + self.provider_backend = provider_name_to_backend("wlcg") + #self.arc_url = None + + + """ Following methods starts threads. + These methods invoke threading.Thread(name,target) + which in turn invokes methods monitor() and run_next(). + """ + self._init_monitor_thread() + self._init_worker_threads() + + def queue_job(self, job_wrapper): + """ When a tool is submitted for execution in galaxy """ + """ This method + 1. Fetches the configured ARC endpoint for this user + 2. Prepares an ARC job description based on the jobs destination parameters + 3. Submits the job to the remote ARC endpoint via pyarcrest + 4. Adds the job to the galaxy job queue + """ + + job_destination = job_wrapper.job_destination + galaxy_jobid = job_wrapper.job_id + + """ Set the cluster to submit the job to - extracted from the job_destination parameters in job_conf.xml """ + user_preferences = job_wrapper.get_job().user.extra_preferences + arc_url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") + self.arc.set_job_cluster(arc_url) + + """ Prepare and submit job to arc """ + self.prepare_job(job_wrapper, self.arcjob) + + token = job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)['access'] + # proxypath is ignored if you are using token + self.arcrest = ARCRest.getClient(url=self.arc.cluster, version="1.1", token=token, impls={"1.1":ARCRest_1_1}) + + # token parameter isn't necessary, unless there is a bug + delegationID = self.arcrest.createDelegation() + + bulkdesc = "" + bulkdesc += self.arcjob.descstr + bulkdesc += "" + + results = self.arcrest.createJobs(bulkdesc, delegationID=delegationID) + + if isinstance(results[0], ARCHTTPError): + # submission error + log.error("Job creation failure. No Response from ARC") + job_wrapper.fail("Not submitted") + else: + # successful submission + arc_jobid, status = results[0] + log.debug(f'Successfully submitted job to remote ARC resource {self.arc.cluster} with ARC id: {arc_jobid}') + # beware! this means 1 worker, no timeout and default upload buffer + errors = self.arcrest.uploadJobFiles([arc_jobid], [self.arcjob.inputFiles]) + if errors[0]: # input upload error + log.error("Job creation failure. No Response from ARC") + log.debug(f'Could not upload job files for job with galaxy-id: {galaxy_jobid} to ARC resource {self.arc.cluster}. Error was: {errors[0]}') + job_wrapper.fail("Not submitted") + else: + # successful input upload + log.debug(f'Successfully uploaded input-files to remote ARC resource {self.arc.cluster} for job with galaxy-id: {galaxy_jobid} and ARC id: {arc_jobid}') + self.arc.job_mapping[galaxy_jobid]=arc_jobid + # Create an object of AsynchronousJobState and add it to the monitor queue. + ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_id=galaxy_jobid, job_destination=job_destination) + self.monitor_queue.put(ajs) + + def place_output_files(self, job_state, job_status_arc): + """ Create log files in galaxy, namely error_file, output_file, exit_code_file + Return true, if all the file creations are successful + """ + + job_dir = job_state.job_wrapper.working_directory + galaxy_workdir= job_dir + '/working' + galaxy_outputs= job_dir + '/outputs' + + arc_jobid = self.arc.job_mapping[job_state.job_id] + outputs_dir = job_state.job_wrapper.outputs_directory + + + """ job_state.output_file and job_state.error_file is e.g. galaxy_5.e and galaxy_5.o where 5 is the galaxy job id """ + """ Hardcoded out and err files - this is ok. But TODO - need to handle if the tool itself has some stdout that should be kept""" + + """ Galaxy stderr and stdout files need to be poupulated from the arc.out and arc.err files """ + try: + # Read from ARC output_file and write it into galaxy output_file. + out_log = '' + tool_stdout_path = galaxy_outputs + '/tool_stdout' + with open(galaxy_workdir+"/"+arc_jobid+"/arc.out","r") as f: + out_log = f.read() + with open(job_state.output_file,"a+") as log_file: + log_file.write(out_log) + log_file.write("Some hardcoded stdout - as a sample from the arc.py runner.") + with open(tool_stdout_path, "a+") as tool_stdout: + tool_stdout.write(out_log) + + + # Read from ARC error_file and write it into galaxy error_file. + err_log = '' + tool_stderr_path = galaxy_outputs + '/tool_stderr' + with open(galaxy_workdir+"/"+arc_jobid+"/arc.err","r") as f: + err_log = f.read() + with open(job_state.error_file,"w+") as log_file: + log_file.write(err_log) + log_file.write("Some hardcoded stderr - as a sample from the arc.py runner.") + with open(tool_stderr_path, "w+") as tool_stderr: + tool_stderr.write(err_log) + + except OSError as e: + log.error('Could not access task log file: %s', unicodify(e)) + log.debug("IO Error occurred when accessing the files.") + return False + return True + + + def check_watched_item(self, job_state): + + """ Get the job current status from ARC + using job_id and update the status in galaxy. + If the job execution is successful, call + mark_as_finished() and return 'None' to galaxy. + else if the job failed, call mark_as_failed() + and return 'None' to galaxy. + else if the job is running or in pending state, simply + return the 'AsynchronousJobState object' (job_state). + """ + ''' This function is called by check_watched_items() where + param job_state is an object of AsynchronousJobState. + Expected return type of this function is None or + AsynchronousJobState object with updated running status. + ''' + + galaxy_job_wrapper = job_state.job_wrapper + galaxy_job = galaxy_job_wrapper.get_job() + galaxy_workdir = galaxy_job_wrapper.working_directory + mapped_state= '' + + """ Make sure to get a fresh token and client """ + token = self._get_token(galaxy_job_wrapper) + self.arcrest = ARCRest.getClient(url=self.arc.cluster, version="1.1", token=token, impls={"1.1":ARCRest_1_1}) + + """ Get task from ARC """ + arc_jobid = self.arc.job_mapping[job_state.job_id] + arc_job_state = self.job_actions(arc_jobid, "status") + if arc_job_state is None: + return None + + + if arc_job_state: + mapped_state = self.arc.ARC_STATE_MAPPING[arc_job_state] + else: + log.debug(f'Could not map state of ARC job with id: {arc_jobid} and Galaxy job id: {job_state.job_id}') + return None + + self.arcrest = ARCRest.getClient(url=self.arc.cluster, version="1.1", token=self._get_token(galaxy_job_wrapper), impls={"1.1":ARCRest_1_1}) + + if mapped_state == "Finished": + + job_state.running = False + galaxy_job_wrapper.change_state(model.Job.states.OK) + + galaxy_outputdir = galaxy_workdir + '/working' + self.arcrest.downloadJobFiles(galaxy_outputdir,[arc_jobid]) + + self.place_output_files(job_state,mapped_state) + self.mark_as_finished(job_state) + + '''The function mark_as_finished() executes: + self.work_queue.put((self.finish_job, job_state)) + *self.finish_job -> + job_state.job_wrapper.finish( stdout, stderr, exit_code ) + job_state.job_wrapper.reclaim_ownership() + job_state.cleanup() + *self.work_queue.put( method , arg ) -> + The run_next() method starts execution on starting worker threads. + This run_next() method executes method(arg) + by using self.work_queue.get() + *Possible outcomes of finish_job(job_state) -> + job_state.job_wrapper.finish( stdout, stderr, exit_code ) + job_state.job_wrapper.fail( "Unable to finish job", exception=True) + *Similar workflow is done for mark_as_failed() method. + ''' + return None + + elif mapped_state == "Running": + job_state.running = True + galaxy_job_wrapper.change_state(model.Job.states.RUNNING) + return job_state + elif mapped_state == "Accepted" or mapped_state == "Preparing" or mapped_state == "Submitting" or mapped_state == "Queuing" or mapped_state == "Hold" or mapped_state == "Other": + """ Job is in transition status """ + return job_state + elif mapped_state == "Killing" or mapped_state == "Killed": + job_state.running = False + galaxy_job_wrapper.change_state(model.Job.states.DELETING) + return job_state + elif mapped_state == "Failed": + job_state.running = False + galaxy_job_wrapper.change_state(model.Job.states.ERROR) + self.mark_as_failed(job_state) + return None + else: + job_state.running = False + self.mark_as_failed(job_state) + return None + + + def stop_job(self, job_wrapper): + + + """ + TODO: I am not sure this method is working as intended. Is it supposed to be triggered if the user e.g. + deletes an active job from history? + I can not see that this method is called then. It seems to only get called once the external job state is + fetched and rendered as "Finished". + + """ + """ Attempts to delete a dispatched executing Job in ARC """ + '''This function is called by fail_job() + where param job = self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) + No Return data expected + ''' + job_id = job_wrapper.job_id + arc_jobid = '' + + """ Make sure to get a fresh token and client """ + token = self._get_token(job_wrapper) + self.arcrest = ARCRest.getClient(url=self.arc.cluster, version="1.1", token=token, impls={"1.1":ARCRest_1_1}) + + + # Get task status from ARC. + try: + arc_jobid = self.arc.job_mapping[job_id] + except KeyError: + log.debug(f'Could not find arc_jobid for stopping job {job_id}') + return None + + arc_job_state = self.job_actions([arc_jobid], "status") + if arc_job_state is None: + return None + mapped_state = self.arc.ARC_STATE_MAPPING[arc_job_state] + if not(mapped_state == "Killed" or mapped_state == "Deleted" or mapped_state == "Finished"): + + try: + # Initiate a delete call,if the job is running in ARC. + waskilled = self.job_actions([arc_jobid],"kill") + except Exception as e: + log.debug(f'Job with ARC id: {arc_jobid} and Galaxy id: {job_id} was attempted killed by external request (user or admin), but this did not succeed. Exception was: {e}') + + return None + + def recover(self, job, job_wrapper): + """ Recovers jobs stuck in the queued/running state when Galaxy started """ + """ This method is called by galaxy at the time of startup. + Jobs in Running & Queued status in galaxy are put in the monitor_queue by creating an AsynchronousJobState object + """ + job_id = job_wrapper.job_id + ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper) + ajs.job_id = str(job_id) + ajs.job_destination = job_wrapper.job_destination + job_wrapper.command_line = job.command_line + ajs.job_wrapper = job_wrapper + if job.state == model.Job.states.RUNNING: + log.debug("({}/{}) is still in running state, adding to the god queue".format(job.id, job.get_job_runner_external_id())) + ajs.old_state = 'R' + ajs.running = True + self.monitor_queue.put(ajs) + + elif job.state == model.Job.states.QUEUED: + log.debug("({}/{}) is still in god queued state, adding to the god queue".format(job.id, job.get_job_runner_external_id())) + ajs.old_state = 'Q' + ajs.running = False + self.monitor_queue.put(ajs) + + + def _get_token(self, job_wrapper): + return job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)['access'] + + + def prepare_job(self, job_wrapper, arcjob): + + """ + job_wrapper is wrapper around python model galaxy.model.Job + input_datasets + output_datasets + input_dataset_collections... + parameters + + https://docs.galaxyproject.org/en/release_21.05/lib/galaxy.model.html?highlight=galaxy%20model#galaxy.model.Job + + Example of simple ARC job-description: + + + + + + arc_hello_test + + + arc.out + arc.err + + ./runhello.sh + + + + main + 1 + 100 + + + + runhello.sh + + + arcout1.txt + + file:///storage/galaxy/jobs_directory/007/7595/outputs/arcout1.txt + + + + arcout2.txt + + file:///storage/galaxy/jobs_directory/007/7595/outputs/arcout2.txt + + + + arc.out + + + arc.err + + + + + + """ + + """ The job_wrapper.job_destination has access to the parameters from the id=arc destination configured in the job_conf""" + job_destination = job_wrapper.job_destination + galaxy_job = job_wrapper.get_job() + galaxy_workdir = job_wrapper.working_directory + + """ job_input_params are the input params fetched from the tool """ + job_input_params = {} + """ Make a dictionary of the job inputs and use this for filling in the job description""" + for param in galaxy_job.parameters: + job_input_params[str(param.name)]=str(param.value.strip('"')) + + """ + Organize the galaxy jobs input-files into executables, input- and output-files + + This works currently in the following way + - The tool (hello_arc.xml) has param with name tag arcjob_exe, arcjob_outputs (and could potentially have arcjob_inputs) + In the below I match the strings + - exe in the tag_name to match the input file uploaded via the arcjob_exe form field + - output in the tag_name to match the input file uploaded via the arcjob_outputs form field + - currently input is empty - TODO + TODO - discuss if the exe box is convenient to use - I think so - This can then be used as a generic tool to run any kind of script. But then... must consider what to do with dependencies... So probably this option would lead to lots of errors for users. + + TODO - This needs to be reconsidered so that any tool can work on an ARC endpoint. Not the special arc-tool created here. + Need a way to reliably identify executables (if local) inputs and outputs independent on how the tool form is like + """ + job_files = {} + + job_files['inputs'] = [] + job_files['outputs'] = [] + for inputdata in galaxy_job.input_datasets: + tag_name = inputdata.name + file_name = (inputdata.__dict__["dataset"]).__dict__["name"] + file_id = (inputdata.__dict__["dataset"]).__dict__["dataset_id"] + isExe = 'exe' in tag_name + data_dict = {'tag':tag_name,'name':file_name ,'dataset_id':file_id, 'isexe':isExe} + + if "input" in tag_name or "exe" in tag_name: + job_files['inputs'].append(data_dict) + elif "output" in tag_name: + job_files['outputs'].append(data_dict) + + + """ Fetch the other job description items from the ARC destination """ + arc_cpuhrs = str(job_input_params["arcjob_cpuhrs"]) + arc_mem = str(job_input_params["arcjob_memory"]) + + + """ + TODO- should probably not be Hard-coded + the user should him/herself enter what oout and err files + that the executable produces + """ + + std_out = "arc.out" + std_err = "arc.err" + + """ Construct the job description xml object """ + """ TODO - extend to support fuller ARC job description options """ + + descr = Element('ActivityDescription') + descr.set("xmlns","http://www.eu-emi.eu/es/2010/12/adl") + descr.set("xmlns:emiestypes","http://www.eu-emi.eu/es/2010/12/types") + descr.set("xmlns:nordugrid-adl","http://www.nordugrid.org/es/2011/12/nordugrid-adl") + + actid = SubElement(descr,"ActivityIdentification") + app = SubElement(descr,"Application") + resources = SubElement(descr,"Resources") + datastaging = SubElement(descr,"DataStaging") + + actid_name = SubElement(actid,"Name") + actid_name.text = "galaxy_arc_hello_test" + + app_out = SubElement(app,"Output") + app_out.text = std_out + + app_err = SubElement(app,"Error") + app_err.text = std_err + + app_exe = SubElement(app,"Executable") + + + """ These are the files that are uploaded by the user for this job - store the path in a dict for use later + key is dataset_id and value is the file path in the galaxy data folder """ + inputfile_paths = job_wrapper.job_io.get_input_paths() + job_inputfiles_galaxy_paths = {} + for idx, input_path in enumerate(inputfile_paths): + job_inputfiles_galaxy_paths[input_path.dataset_id] = input_path.real_path + + + """ Populate datastaging exec tag with all exec files - in addition populate the arcjob object """ + for job_file in job_files['inputs']: + + dataset_name = job_file['name'] + dataset_id = job_file['dataset_id'] + dataset_path = job_inputfiles_galaxy_paths[dataset_id] + isexe = job_file['isexe'] + + """ Populate the arcjob object with the source - pyarcrest expects this""" + arcjob.inputFiles[dataset_name] = 'file://' + dataset_path + + """ Datastaging tag """ + sub_el = SubElement(datastaging,"InputFile") + subsub_el = SubElement(sub_el,"Name") + subsub_el.text = dataset_name + + if isexe: + """ Fill the appropriate job description fields expected for executables""" + """ App tag """ + sub_el = SubElement(app_exe,"Path") + sub_el.text = "./" + dataset_name + + + """ Populate datastaging output tag with all output files - in addition to populate the arcjob object""" + """ Potentially more than one file - but currently actually only one, so the for-loop here is currently not actually needed """ + + """ Fill the appropriate job description fields """ + sub_el = SubElement(datastaging,"OutputFile") + subsub_el = SubElement(sub_el,"Name") + subsub_el.text = "/" + + + """ This is hardcoded stdout and stderr files from the ARC job defined here in the runner - TODO - not hardcoded """ + sub_el = SubElement(datastaging,"OutputFile") + subsub_el = SubElement(sub_el,"Name") + subsub_el.text = std_out + + """ This is hardcoded stdout and stderr files from the ARC job defined here in the runner - TODO - not hardcoded """ + sub_el = SubElement(datastaging,"OutputFile") + subsub_el = SubElement(sub_el,"Name") + subsub_el.text = std_err + + """ TODO - just a sample, this will probably be set by the destination itself - to be discussed """ + cpuhrs = arc_cpuhrs + sub_el = SubElement(resources,"IndividualCPUTime") + sub_el.text = arc_cpuhrs + + """ TODO - just a sample, this will probably be set by the destination itself - to be discussed """ + mem = arc_mem + sub_el = SubElement(resources,"IndividualPhysicalMemory") + sub_el.text = arc_mem + + """ Populate the arcjob object with rest of necessary and useful fields including the full job description string""" + """ All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list - + here it is just the folder / and all files in the folder will be downloaded. + The arc.py in pyarcrest loops over this list to fetch all outputfiles """ + arcjob.downloadFiles.append("/") + arcjob.StdOut = std_out + arcjob.StdErr = std_err + arcjob.RequestedTotalCPUTime = cpuhrs + arcjob.descstr = tostring(descr, encoding='unicode',method='xml') + + + def job_actions(self, arcid, action): + + + """ Kill job in ARC. + Pass job_id to http_delete_request method. + """ + """ Really processing one job at a time, should I do this better? pyarcrest/arc.py takes a list of jobs""" + + if action == "status": + results = self.arcrest.getJobsStatus([arcid]) + for state in results: + try: + job_state = state + return job_state + except Exception: + log.error(f'Could not get status of job with id: {arcid}') + elif action == "kill": + results = self.arcrest.killJobs([arcid]) + for killed in results: + if killed: + return True + else: + return False From 729c275c0480c24e1cd4c662fcb9edba9e0eeed4 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Thu, 7 Sep 2023 11:13:13 +0200 Subject: [PATCH 05/21] Update conditional-requirements.txt Require version 0.2 which is compatible with the ARC job runner. --- lib/galaxy/dependencies/conditional-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/galaxy/dependencies/conditional-requirements.txt b/lib/galaxy/dependencies/conditional-requirements.txt index 899bf24cca49..6bcfca2ab3ad 100644 --- a/lib/galaxy/dependencies/conditional-requirements.txt +++ b/lib/galaxy/dependencies/conditional-requirements.txt @@ -38,7 +38,7 @@ chronos-python==1.2.1 pykube==0.15.0 # ARC job runner -pyarcrest==0.1 +pyarcrest==0.2 # Synnefo / Pithos+ object store client kamaki From 3a2963212d30c1330b683938a9f199e5c2afe048 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Fri, 8 Sep 2023 09:31:46 +0200 Subject: [PATCH 06/21] Reusable abstractions for dealing with pyarcrest. Applying https://github.com/jmchilton/galaxy/commit/6cb9cc4fa0676a18f9a6d0cf917281b390ffc601 by jmchilton. --- lib/galaxy/jobs/runners/arc.py | 19 +++++++++++-------- lib/galaxy/jobs/runners/util/arc_util.py | 23 +++++++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) create mode 100644 lib/galaxy/jobs/runners/util/arc_util.py diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index 9f16a8d5bdb8..f0335271d766 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -13,14 +13,16 @@ AsynchronousJobRunner, AsynchronousJobState ) +from galaxy.jobs.runners.util.arc_util import ( + ARCHTTPError, + ARCJob, + ensure_pyarc, + get_client, +) from galaxy.util import unicodify from xml.etree.ElementTree import Element, SubElement, tostring, fromstring -from pyarcrest.common import getNullLogger -from pyarcrest.arc import ARCJob, ARCRest, ARCRest_1_1 -from pyarcrest.errors import (ARCError, ARCHTTPError) - log = logging.getLogger(__name__) __all__ = ('ArcRESTJobRunner', ) @@ -77,6 +79,7 @@ def __init__(self, app, nworkers, **kwargs): # Start the job runner parent object super(ArcRESTJobRunner,self).__init__(app, nworkers, **kwargs) + ensure_pyarc() self.arc = Arc() self.arcjob = ARCJob() @@ -113,7 +116,7 @@ def queue_job(self, job_wrapper): token = job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)['access'] # proxypath is ignored if you are using token - self.arcrest = ARCRest.getClient(url=self.arc.cluster, version="1.1", token=token, impls={"1.1":ARCRest_1_1}) + self.arcrest = get_client(self.arc.cluster, token=token) # token parameter isn't necessary, unless there is a bug delegationID = self.arcrest.createDelegation() @@ -218,7 +221,7 @@ def check_watched_item(self, job_state): """ Make sure to get a fresh token and client """ token = self._get_token(galaxy_job_wrapper) - self.arcrest = ARCRest.getClient(url=self.arc.cluster, version="1.1", token=token, impls={"1.1":ARCRest_1_1}) + self.arcrest = get_client(self.arc.cluster, token=token) """ Get task from ARC """ arc_jobid = self.arc.job_mapping[job_state.job_id] @@ -233,7 +236,7 @@ def check_watched_item(self, job_state): log.debug(f'Could not map state of ARC job with id: {arc_jobid} and Galaxy job id: {job_state.job_id}') return None - self.arcrest = ARCRest.getClient(url=self.arc.cluster, version="1.1", token=self._get_token(galaxy_job_wrapper), impls={"1.1":ARCRest_1_1}) + self.arcrest = get_client(self.arc.cluster, token=self._get_token(galaxy_job_wrapper)) if mapped_state == "Finished": @@ -305,7 +308,7 @@ def stop_job(self, job_wrapper): """ Make sure to get a fresh token and client """ token = self._get_token(job_wrapper) - self.arcrest = ARCRest.getClient(url=self.arc.cluster, version="1.1", token=token, impls={"1.1":ARCRest_1_1}) + self.arcrest = get_client(self.arc.cluster, token=token) # Get task status from ARC. diff --git a/lib/galaxy/jobs/runners/util/arc_util.py b/lib/galaxy/jobs/runners/util/arc_util.py new file mode 100644 index 000000000000..0974ec0c1aec --- /dev/null +++ b/lib/galaxy/jobs/runners/util/arc_util.py @@ -0,0 +1,23 @@ +try: + from pyarcrest.arc import ARCJob, ARCRest, ARCRest_1_1 + from pyarcrest.errors import ARCHTTPError +except ImportError: + ARCHTTPError = None + ARCRest_1_1 = None + ARCJob = None + + +def ensure_pyarc() -> None: + if ARCHTTPError is None: + raise Exception("The configured functionality requires the Python package pyarcrest, but it isn't available in the Python environment.") + + +def get_client(cluster_url: str, token: str) -> ARCRest_1_1: + return ARCRest.getClient(url=cluster_url, version="1.1", token=token, impls={"1.1":ARCRest_1_1}) + + +__all__ = ( + 'ensure_pyarc', + 'get_client', + 'ARCJob', +) From 75c07fd70ccb1b1796a262ff8b1217b0dbcc7ebb Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Fri, 8 Sep 2023 09:42:16 +0200 Subject: [PATCH 07/21] Refactor XML parsing out of middle of arc job runner. Applying patch from jmchilton https://github.com/jmchilton/galaxy/commit/96d3807c3cb9f251315c18b4d54faae2fd0eb4f3 --- lib/galaxy/jobs/runners/arc.py | 64 ++++++------------------ lib/galaxy/jobs/runners/util/arc_util.py | 62 +++++++++++++++++++++++ 2 files changed, 76 insertions(+), 50 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index f0335271d766..6e418dad6faa 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -14,6 +14,7 @@ AsynchronousJobState ) from galaxy.jobs.runners.util.arc_util import ( + ActivityDescriptionBuilder, ARCHTTPError, ARCJob, ensure_pyarc, @@ -21,8 +22,6 @@ ) from galaxy.util import unicodify -from xml.etree.ElementTree import Element, SubElement, tostring, fromstring - log = logging.getLogger(__name__) __all__ = ('ArcRESTJobRunner', ) @@ -478,26 +477,10 @@ def prepare_job(self, job_wrapper, arcjob): """ Construct the job description xml object """ """ TODO - extend to support fuller ARC job description options """ - descr = Element('ActivityDescription') - descr.set("xmlns","http://www.eu-emi.eu/es/2010/12/adl") - descr.set("xmlns:emiestypes","http://www.eu-emi.eu/es/2010/12/types") - descr.set("xmlns:nordugrid-adl","http://www.nordugrid.org/es/2011/12/nordugrid-adl") - - actid = SubElement(descr,"ActivityIdentification") - app = SubElement(descr,"Application") - resources = SubElement(descr,"Resources") - datastaging = SubElement(descr,"DataStaging") - - actid_name = SubElement(actid,"Name") - actid_name.text = "galaxy_arc_hello_test" - - app_out = SubElement(app,"Output") - app_out.text = std_out - - app_err = SubElement(app,"Error") - app_err.text = std_err - - app_exe = SubElement(app,"Executable") + description_builder = ActivityDescriptionBuilder() + description_builder.name = "galaxy_arc_hello_test" + description_builder.stdout = std_out + description_builder.stderr = std_err """ These are the files that are uploaded by the user for this job - store the path in a dict for use later @@ -518,47 +501,28 @@ def prepare_job(self, job_wrapper, arcjob): """ Populate the arcjob object with the source - pyarcrest expects this""" arcjob.inputFiles[dataset_name] = 'file://' + dataset_path - - """ Datastaging tag """ - sub_el = SubElement(datastaging,"InputFile") - subsub_el = SubElement(sub_el,"Name") - subsub_el.text = dataset_name + description_builder.inputs.append(dataset_name) if isexe: """ Fill the appropriate job description fields expected for executables""" """ App tag """ - sub_el = SubElement(app_exe,"Path") - sub_el.text = "./" + dataset_name + description_builder.app = "./" + dataset_name """ Populate datastaging output tag with all output files - in addition to populate the arcjob object""" """ Potentially more than one file - but currently actually only one, so the for-loop here is currently not actually needed """ - """ Fill the appropriate job description fields """ - sub_el = SubElement(datastaging,"OutputFile") - subsub_el = SubElement(sub_el,"Name") - subsub_el.text = "/" - - - """ This is hardcoded stdout and stderr files from the ARC job defined here in the runner - TODO - not hardcoded """ - sub_el = SubElement(datastaging,"OutputFile") - subsub_el = SubElement(sub_el,"Name") - subsub_el.text = std_out + description_builder.outputs.append("/") """ This is hardcoded stdout and stderr files from the ARC job defined here in the runner - TODO - not hardcoded """ - sub_el = SubElement(datastaging,"OutputFile") - subsub_el = SubElement(sub_el,"Name") - subsub_el.text = std_err + description_builder.outputs.append(std_out) + description_builder.outputs.append(std_err) """ TODO - just a sample, this will probably be set by the destination itself - to be discussed """ - cpuhrs = arc_cpuhrs - sub_el = SubElement(resources,"IndividualCPUTime") - sub_el.text = arc_cpuhrs + description_builder.cpu_time = arc_cpuhrs """ TODO - just a sample, this will probably be set by the destination itself - to be discussed """ - mem = arc_mem - sub_el = SubElement(resources,"IndividualPhysicalMemory") - sub_el.text = arc_mem + description_builder.memory = arc_mem """ Populate the arcjob object with rest of necessary and useful fields including the full job description string""" """ All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list - @@ -567,8 +531,8 @@ def prepare_job(self, job_wrapper, arcjob): arcjob.downloadFiles.append("/") arcjob.StdOut = std_out arcjob.StdErr = std_err - arcjob.RequestedTotalCPUTime = cpuhrs - arcjob.descstr = tostring(descr, encoding='unicode',method='xml') + arcjob.RequestedTotalCPUTime = arc_cpuhrs + arcjob.descstr = description_builder.to_xml_str() def job_actions(self, arcid, action): diff --git a/lib/galaxy/jobs/runners/util/arc_util.py b/lib/galaxy/jobs/runners/util/arc_util.py index 0974ec0c1aec..914a503d57af 100644 --- a/lib/galaxy/jobs/runners/util/arc_util.py +++ b/lib/galaxy/jobs/runners/util/arc_util.py @@ -1,3 +1,9 @@ +from typing import ( + List, + Optional, +) +from xml.etree.ElementTree import Element, SubElement, tostring + try: from pyarcrest.arc import ARCJob, ARCRest, ARCRest_1_1 from pyarcrest.errors import ARCHTTPError @@ -16,6 +22,62 @@ def get_client(cluster_url: str, token: str) -> ARCRest_1_1: return ARCRest.getClient(url=cluster_url, version="1.1", token=token, impls={"1.1":ARCRest_1_1}) +class ActivityDescriptionBuilder: + name: str + stdout: str + stderr: str + app: str + cpu_time: str + memory: str + inputs: List[str] = [] + outputs: List[str] = [] + + def to_xml_str(self) -> str: + descr = Element('ActivityDescription') + descr.set("xmlns","http://www.eu-emi.eu/es/2010/12/adl") + descr.set("xmlns:emiestypes","http://www.eu-emi.eu/es/2010/12/types") + descr.set("xmlns:nordugrid-adl","http://www.nordugrid.org/es/2011/12/nordugrid-adl") + + actid = SubElement(descr,"ActivityIdentification") + app = SubElement(descr,"Application") + resources = SubElement(descr,"Resources") + datastaging = SubElement(descr,"DataStaging") + + actid_name = SubElement(actid,"Name") + actid_name.text = "galaxy_arc_hello_test" + + app_out = SubElement(app,"Output") + app_out.text = self.stdout + + app_err = SubElement(app,"Error") + app_err.text = self.stderr + + app_exe = SubElement(app,"Executable") + app_exe.text = self.app + + """ Populate datastaging exec tag with all exec files - in addition populate the arcjob object """ + for arc_input in self.inputs: + + """ Datastaging tag """ + sub_el = SubElement(datastaging,"InputFile") + subsub_el = SubElement(sub_el,"Name") + subsub_el.text = arc_input + + for arc_output in self.outputs: + sub_el = SubElement(datastaging,"OutputFile") + subsub_el = SubElement(sub_el,"Name") + subsub_el.text = arc_output + + sub_el = SubElement(resources,"IndividualCPUTime") + sub_el.text = self.cpu_time + + sub_el = SubElement(resources,"IndividualPhysicalMemory") + sub_el.text = self.memory + + return tostring(descr, encoding='unicode',method='xml') + + + __all__ = ( 'ensure_pyarc', 'get_client', From 63d50023a88cfdcce956856728507bdf2bb619bb Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Fri, 8 Sep 2023 09:43:54 +0200 Subject: [PATCH 08/21] Remove unused imports in arc runner. Applied patch from jmchilton https://github.com/jmchilton/galaxy/commit/1906b2b4395ac6f39daac60dd8535dad11fbfe2e --- lib/galaxy/jobs/runners/arc.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index 6e418dad6faa..72f1bf405057 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -1,11 +1,5 @@ -import os, sys -import json import logging -import time -from datetime import datetime -import shutil -import requests from galaxy.authnz.util import provider_name_to_backend from galaxy import model From c30b651486a6079a409bacdcf498f40465af627e Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Fri, 8 Sep 2023 14:08:40 +0200 Subject: [PATCH 09/21] Fixed one job description omission related to executable. Removed unecessary job_actions method. Further improvements to the action on jobs will come. --- lib/galaxy/jobs/runners/arc.py | 48 +++++------------------- lib/galaxy/jobs/runners/util/arc_util.py | 6 ++- 2 files changed, 13 insertions(+), 41 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index 72f1bf405057..f0f763e7494e 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -78,15 +78,8 @@ def __init__(self, app, nworkers, **kwargs): self.arcjob = ARCJob() self.provider_backend = provider_name_to_backend("wlcg") #self.arc_url = None - + - """ Following methods starts threads. - These methods invoke threading.Thread(name,target) - which in turn invokes methods monitor() and run_next(). - """ - self._init_monitor_thread() - self._init_worker_threads() - def queue_job(self, job_wrapper): """ When a tool is submitted for execution in galaxy """ """ This method @@ -119,6 +112,7 @@ def queue_job(self, job_wrapper): bulkdesc += "" results = self.arcrest.createJobs(bulkdesc, delegationID=delegationID) + arc_jobid = None if isinstance(results[0], ARCHTTPError): # submission error @@ -127,6 +121,7 @@ def queue_job(self, job_wrapper): else: # successful submission arc_jobid, status = results[0] + job_wrapper.get_job().job_runner_external_id = arc_jobid log.debug(f'Successfully submitted job to remote ARC resource {self.arc.cluster} with ARC id: {arc_jobid}') # beware! this means 1 worker, no timeout and default upload buffer errors = self.arcrest.uploadJobFiles([arc_jobid], [self.arcjob.inputFiles]) @@ -136,7 +131,7 @@ def queue_job(self, job_wrapper): job_wrapper.fail("Not submitted") else: # successful input upload - log.debug(f'Successfully uploaded input-files to remote ARC resource {self.arc.cluster} for job with galaxy-id: {galaxy_jobid} and ARC id: {arc_jobid}') + log.debug(f'Successfully uploaded input-files {self.arcjob.inputFiles} to remote ARC resource {self.arc.cluster} for job with galaxy-id: {galaxy_jobid} and ARC id: {arc_jobid}') self.arc.job_mapping[galaxy_jobid]=arc_jobid # Create an object of AsynchronousJobState and add it to the monitor queue. ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_id=galaxy_jobid, job_destination=job_destination) @@ -218,7 +213,7 @@ def check_watched_item(self, job_state): """ Get task from ARC """ arc_jobid = self.arc.job_mapping[job_state.job_id] - arc_job_state = self.job_actions(arc_jobid, "status") + arc_job_state = self.arcrest.getJobsStatus([arc_jobid])[0] if arc_job_state is None: return None @@ -311,7 +306,7 @@ def stop_job(self, job_wrapper): log.debug(f'Could not find arc_jobid for stopping job {job_id}') return None - arc_job_state = self.job_actions([arc_jobid], "status") + arc_job_state = self.arcrest.getJobsStatus([arc_jobid]) if arc_job_state is None: return None mapped_state = self.arc.ARC_STATE_MAPPING[arc_job_state] @@ -319,7 +314,7 @@ def stop_job(self, job_wrapper): try: # Initiate a delete call,if the job is running in ARC. - waskilled = self.job_actions([arc_jobid],"kill") + waskilled = self.arcrest.killJobs([arc_jobid]) except Exception as e: log.debug(f'Job with ARC id: {arc_jobid} and Galaxy id: {job_id} was attempted killed by external request (user or admin), but this did not succeed. Exception was: {e}') @@ -464,13 +459,12 @@ def prepare_job(self, job_wrapper, arcjob): the user should him/herself enter what oout and err files that the executable produces """ - std_out = "arc.out" std_err = "arc.err" + """ Construct the job description xml object """ """ TODO - extend to support fuller ARC job description options """ - description_builder = ActivityDescriptionBuilder() description_builder.name = "galaxy_arc_hello_test" description_builder.stdout = std_out @@ -500,7 +494,7 @@ def prepare_job(self, job_wrapper, arcjob): if isexe: """ Fill the appropriate job description fields expected for executables""" """ App tag """ - description_builder.app = "./" + dataset_name + description_builder.exe_path = "./" + dataset_name """ Populate datastaging output tag with all output files - in addition to populate the arcjob object""" @@ -528,27 +522,3 @@ def prepare_job(self, job_wrapper, arcjob): arcjob.RequestedTotalCPUTime = arc_cpuhrs arcjob.descstr = description_builder.to_xml_str() - - def job_actions(self, arcid, action): - - - """ Kill job in ARC. - Pass job_id to http_delete_request method. - """ - """ Really processing one job at a time, should I do this better? pyarcrest/arc.py takes a list of jobs""" - - if action == "status": - results = self.arcrest.getJobsStatus([arcid]) - for state in results: - try: - job_state = state - return job_state - except Exception: - log.error(f'Could not get status of job with id: {arcid}') - elif action == "kill": - results = self.arcrest.killJobs([arcid]) - for killed in results: - if killed: - return True - else: - return False diff --git a/lib/galaxy/jobs/runners/util/arc_util.py b/lib/galaxy/jobs/runners/util/arc_util.py index 914a503d57af..16dfc98f4ded 100644 --- a/lib/galaxy/jobs/runners/util/arc_util.py +++ b/lib/galaxy/jobs/runners/util/arc_util.py @@ -28,6 +28,7 @@ class ActivityDescriptionBuilder: stderr: str app: str cpu_time: str + exe_path: str memory: str inputs: List[str] = [] outputs: List[str] = [] @@ -53,9 +54,10 @@ def to_xml_str(self) -> str: app_err.text = self.stderr app_exe = SubElement(app,"Executable") - app_exe.text = self.app + app_exe_path = SubElement(app_exe,"Path") + app_exe_path.text = self.exe_path + - """ Populate datastaging exec tag with all exec files - in addition populate the arcjob object """ for arc_input in self.inputs: """ Datastaging tag """ From 2a07bbc2882562cb4f4a23b0336ab04420ca47ae Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Fri, 8 Sep 2023 16:34:44 +0200 Subject: [PATCH 10/21] linting --- lib/galaxy/jobs/runners/arc.py | 362 ++++++++++++----------- lib/galaxy/jobs/runners/util/arc_util.py | 73 +++-- 2 files changed, 223 insertions(+), 212 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index f0f763e7494e..9faa761375d5 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -1,15 +1,13 @@ import logging -from galaxy.authnz.util import provider_name_to_backend - from galaxy import model +from galaxy.authnz.util import provider_name_to_backend from galaxy.jobs.runners import ( AsynchronousJobRunner, - AsynchronousJobState + AsynchronousJobState, ) from galaxy.jobs.runners.util.arc_util import ( ActivityDescriptionBuilder, - ARCHTTPError, ARCJob, ensure_pyarc, get_client, @@ -18,7 +16,8 @@ log = logging.getLogger(__name__) -__all__ = ('ArcRESTJobRunner', ) +__all__ = ("ArcRESTJobRunner",) + class Arc: """ @@ -26,7 +25,6 @@ class Arc: """ def __init__(self): - self.cluster = "" self.job_mapping = {} @@ -50,11 +48,10 @@ def __init__(self): "KILLED": "Killed", "WIPED": "Deleted", "None": "Failed", - "Job not found": "Failed" + "Job not found": "Failed", } - - def set_job_cluster(self,cluster): + def set_job_cluster(self, cluster): self.cluster = cluster @@ -62,26 +59,26 @@ class ArcRESTJobRunner(AsynchronousJobRunner): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ + runner_name = "ArcRESTJobRunner" def __init__(self, app, nworkers, **kwargs): - """ 1: Get runner_param_specs from job_conf.xml - 2: Initialise job runner parent object - 3: Start the worker and monitor threads + """1: Get runner_param_specs from job_conf.xml + 2: Initialise job runner parent object + 3: Start the worker and monitor threads """ # Start the job runner parent object - super(ArcRESTJobRunner,self).__init__(app, nworkers, **kwargs) + super(ArcRESTJobRunner, self).__init__(app, nworkers, **kwargs) ensure_pyarc() self.arc = Arc() self.arcjob = ARCJob() self.provider_backend = provider_name_to_backend("wlcg") - #self.arc_url = None + # self.arc_url = None - def queue_job(self, job_wrapper): - """ When a tool is submitted for execution in galaxy """ + """When a tool is submitted for execution in galaxy""" """ This method 1. Fetches the configured ARC endpoint for this user 2. Prepares an ARC job description based on the jobs destination parameters @@ -91,30 +88,30 @@ def queue_job(self, job_wrapper): job_destination = job_wrapper.job_destination galaxy_jobid = job_wrapper.job_id - + """ Set the cluster to submit the job to - extracted from the job_destination parameters in job_conf.xml """ user_preferences = job_wrapper.get_job().user.extra_preferences arc_url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") self.arc.set_job_cluster(arc_url) - + """ Prepare and submit job to arc """ self.prepare_job(job_wrapper, self.arcjob) - - token = job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)['access'] + + token = job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)["access"] # proxypath is ignored if you are using token self.arcrest = get_client(self.arc.cluster, token=token) - + # token parameter isn't necessary, unless there is a bug delegationID = self.arcrest.createDelegation() - + bulkdesc = "" bulkdesc += self.arcjob.descstr bulkdesc += "" - + results = self.arcrest.createJobs(bulkdesc, delegationID=delegationID) arc_jobid = None - - if isinstance(results[0], ARCHTTPError): + + if isinstance(results[0], self.arcrest.ARCHTTPError): # submission error log.error("Job creation failure. No Response from ARC") job_wrapper.fail("Not submitted") @@ -122,122 +119,125 @@ def queue_job(self, job_wrapper): # successful submission arc_jobid, status = results[0] job_wrapper.get_job().job_runner_external_id = arc_jobid - log.debug(f'Successfully submitted job to remote ARC resource {self.arc.cluster} with ARC id: {arc_jobid}') + log.debug(f"Successfully submitted job to remote ARC resource {self.arc.cluster} with ARC id: {arc_jobid}") # beware! this means 1 worker, no timeout and default upload buffer errors = self.arcrest.uploadJobFiles([arc_jobid], [self.arcjob.inputFiles]) if errors[0]: # input upload error log.error("Job creation failure. No Response from ARC") - log.debug(f'Could not upload job files for job with galaxy-id: {galaxy_jobid} to ARC resource {self.arc.cluster}. Error was: {errors[0]}') + log.debug( + f"Could not upload job files for job with galaxy-id: {galaxy_jobid} to ARC resource {self.arc.cluster}. Error was: {errors[0]}" + ) job_wrapper.fail("Not submitted") else: # successful input upload - log.debug(f'Successfully uploaded input-files {self.arcjob.inputFiles} to remote ARC resource {self.arc.cluster} for job with galaxy-id: {galaxy_jobid} and ARC id: {arc_jobid}') - self.arc.job_mapping[galaxy_jobid]=arc_jobid + log.debug( + f"Successfully uploaded input-files {self.arcjob.inputFiles} to remote ARC resource {self.arc.cluster} for job with galaxy-id: {galaxy_jobid} and ARC id: {arc_jobid}" + ) + self.arc.job_mapping[galaxy_jobid] = arc_jobid # Create an object of AsynchronousJobState and add it to the monitor queue. - ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_id=galaxy_jobid, job_destination=job_destination) + ajs = AsynchronousJobState( + files_dir=job_wrapper.working_directory, + job_wrapper=job_wrapper, + job_id=galaxy_jobid, + job_destination=job_destination, + ) self.monitor_queue.put(ajs) def place_output_files(self, job_state, job_status_arc): - """ Create log files in galaxy, namely error_file, output_file, exit_code_file - Return true, if all the file creations are successful + """Create log files in galaxy, namely error_file, output_file, exit_code_file + Return true, if all the file creations are successful """ job_dir = job_state.job_wrapper.working_directory - galaxy_workdir= job_dir + '/working' - galaxy_outputs= job_dir + '/outputs' - + galaxy_workdir = job_dir + "/working" + galaxy_outputs = job_dir + "/outputs" + arc_jobid = self.arc.job_mapping[job_state.job_id] outputs_dir = job_state.job_wrapper.outputs_directory - """ job_state.output_file and job_state.error_file is e.g. galaxy_5.e and galaxy_5.o where 5 is the galaxy job id """ """ Hardcoded out and err files - this is ok. But TODO - need to handle if the tool itself has some stdout that should be kept""" """ Galaxy stderr and stdout files need to be poupulated from the arc.out and arc.err files """ try: # Read from ARC output_file and write it into galaxy output_file. - out_log = '' - tool_stdout_path = galaxy_outputs + '/tool_stdout' - with open(galaxy_workdir+"/"+arc_jobid+"/arc.out","r") as f: + out_log = "" + tool_stdout_path = galaxy_outputs + "/tool_stdout" + with open(galaxy_workdir + "/" + arc_jobid + "/arc.out", "r") as f: out_log = f.read() - with open(job_state.output_file,"a+") as log_file: + with open(job_state.output_file, "a+") as log_file: log_file.write(out_log) log_file.write("Some hardcoded stdout - as a sample from the arc.py runner.") with open(tool_stdout_path, "a+") as tool_stdout: tool_stdout.write(out_log) - # Read from ARC error_file and write it into galaxy error_file. - err_log = '' - tool_stderr_path = galaxy_outputs + '/tool_stderr' - with open(galaxy_workdir+"/"+arc_jobid+"/arc.err","r") as f: + err_log = "" + tool_stderr_path = galaxy_outputs + "/tool_stderr" + with open(galaxy_workdir + "/" + arc_jobid + "/arc.err", "r") as f: err_log = f.read() - with open(job_state.error_file,"w+") as log_file: + with open(job_state.error_file, "w+") as log_file: log_file.write(err_log) log_file.write("Some hardcoded stderr - as a sample from the arc.py runner.") with open(tool_stderr_path, "w+") as tool_stderr: tool_stderr.write(err_log) - + except OSError as e: - log.error('Could not access task log file: %s', unicodify(e)) + log.error("Could not access task log file: %s", unicodify(e)) log.debug("IO Error occurred when accessing the files.") return False return True - def check_watched_item(self, job_state): - - """ Get the job current status from ARC - using job_id and update the status in galaxy. - If the job execution is successful, call - mark_as_finished() and return 'None' to galaxy. - else if the job failed, call mark_as_failed() - and return 'None' to galaxy. - else if the job is running or in pending state, simply - return the 'AsynchronousJobState object' (job_state). + """Get the job current status from ARC + using job_id and update the status in galaxy. + If the job execution is successful, call + mark_as_finished() and return 'None' to galaxy. + else if the job failed, call mark_as_failed() + and return 'None' to galaxy. + else if the job is running or in pending state, simply + return the 'AsynchronousJobState object' (job_state). """ - ''' This function is called by check_watched_items() where + """ This function is called by check_watched_items() where param job_state is an object of AsynchronousJobState. Expected return type of this function is None or AsynchronousJobState object with updated running status. - ''' + """ galaxy_job_wrapper = job_state.job_wrapper galaxy_job = galaxy_job_wrapper.get_job() galaxy_workdir = galaxy_job_wrapper.working_directory - mapped_state= '' + mapped_state = "" """ Make sure to get a fresh token and client """ token = self._get_token(galaxy_job_wrapper) self.arcrest = get_client(self.arc.cluster, token=token) - + """ Get task from ARC """ arc_jobid = self.arc.job_mapping[job_state.job_id] arc_job_state = self.arcrest.getJobsStatus([arc_jobid])[0] if arc_job_state is None: return None - if arc_job_state: mapped_state = self.arc.ARC_STATE_MAPPING[arc_job_state] else: - log.debug(f'Could not map state of ARC job with id: {arc_jobid} and Galaxy job id: {job_state.job_id}') + log.debug(f"Could not map state of ARC job with id: {arc_jobid} and Galaxy job id: {job_state.job_id}") return None - + self.arcrest = get_client(self.arc.cluster, token=self._get_token(galaxy_job_wrapper)) if mapped_state == "Finished": - job_state.running = False galaxy_job_wrapper.change_state(model.Job.states.OK) - galaxy_outputdir = galaxy_workdir + '/working' - self.arcrest.downloadJobFiles(galaxy_outputdir,[arc_jobid]) - - self.place_output_files(job_state,mapped_state) + galaxy_outputdir = galaxy_workdir + "/working" + self.arcrest.downloadJobFiles(galaxy_outputdir, [arc_jobid]) + + self.place_output_files(job_state, mapped_state) self.mark_as_finished(job_state) - '''The function mark_as_finished() executes: + """The function mark_as_finished() executes: self.work_queue.put((self.finish_job, job_state)) *self.finish_job -> job_state.job_wrapper.finish( stdout, stderr, exit_code ) @@ -251,15 +251,22 @@ def check_watched_item(self, job_state): job_state.job_wrapper.finish( stdout, stderr, exit_code ) job_state.job_wrapper.fail( "Unable to finish job", exception=True) *Similar workflow is done for mark_as_failed() method. - ''' + """ return None elif mapped_state == "Running": job_state.running = True galaxy_job_wrapper.change_state(model.Job.states.RUNNING) return job_state - elif mapped_state == "Accepted" or mapped_state == "Preparing" or mapped_state == "Submitting" or mapped_state == "Queuing" or mapped_state == "Hold" or mapped_state == "Other": - """ Job is in transition status """ + elif ( + mapped_state == "Accepted" + or mapped_state == "Preparing" + or mapped_state == "Submitting" + or mapped_state == "Queuing" + or mapped_state == "Hold" + or mapped_state == "Other" + ): + """Job is in transition status""" return job_state elif mapped_state == "Killing" or mapped_state == "Killed": job_state.running = False @@ -275,53 +282,50 @@ def check_watched_item(self, job_state): self.mark_as_failed(job_state) return None - def stop_job(self, job_wrapper): - - - """ - TODO: I am not sure this method is working as intended. Is it supposed to be triggered if the user e.g. + """ + TODO: I am not sure this method is working as intended. Is it supposed to be triggered if the user e.g. deletes an active job from history? - I can not see that this method is called then. It seems to only get called once the external job state is - fetched and rendered as "Finished". + I can not see that this method is called then. It seems to only get called once the external job state is + fetched and rendered as "Finished". """ """ Attempts to delete a dispatched executing Job in ARC """ - '''This function is called by fail_job() + """This function is called by fail_job() where param job = self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) No Return data expected - ''' + """ job_id = job_wrapper.job_id - arc_jobid = '' - + arc_jobid = "" + """ Make sure to get a fresh token and client """ token = self._get_token(job_wrapper) self.arcrest = get_client(self.arc.cluster, token=token) - # Get task status from ARC. try: arc_jobid = self.arc.job_mapping[job_id] except KeyError: - log.debug(f'Could not find arc_jobid for stopping job {job_id}') + log.debug(f"Could not find arc_jobid for stopping job {job_id}") return None - + arc_job_state = self.arcrest.getJobsStatus([arc_jobid]) if arc_job_state is None: return None mapped_state = self.arc.ARC_STATE_MAPPING[arc_job_state] - if not(mapped_state == "Killed" or mapped_state == "Deleted" or mapped_state == "Finished"): - + if not (mapped_state == "Killed" or mapped_state == "Deleted" or mapped_state == "Finished"): try: # Initiate a delete call,if the job is running in ARC. waskilled = self.arcrest.killJobs([arc_jobid]) except Exception as e: - log.debug(f'Job with ARC id: {arc_jobid} and Galaxy id: {job_id} was attempted killed by external request (user or admin), but this did not succeed. Exception was: {e}') - + log.debug( + f"Job with ARC id: {arc_jobid} and Galaxy id: {job_id} was attempted killed by external request (user or admin), but this did not succeed. Exception was: {e}" + ) + return None def recover(self, job, job_wrapper): - """ Recovers jobs stuck in the queued/running state when Galaxy started """ + """Recovers jobs stuck in the queued/running state when Galaxy started""" """ This method is called by galaxy at the time of startup. Jobs in Running & Queued status in galaxy are put in the monitor_queue by creating an AsynchronousJobState object """ @@ -332,78 +336,83 @@ def recover(self, job, job_wrapper): job_wrapper.command_line = job.command_line ajs.job_wrapper = job_wrapper if job.state == model.Job.states.RUNNING: - log.debug("({}/{}) is still in running state, adding to the god queue".format(job.id, job.get_job_runner_external_id())) - ajs.old_state = 'R' + log.debug( + "({}/{}) is still in running state, adding to the god queue".format( + job.id, job.get_job_runner_external_id() + ) + ) + ajs.old_state = "R" ajs.running = True self.monitor_queue.put(ajs) elif job.state == model.Job.states.QUEUED: - log.debug("({}/{}) is still in god queued state, adding to the god queue".format(job.id, job.get_job_runner_external_id())) - ajs.old_state = 'Q' + log.debug( + "({}/{}) is still in god queued state, adding to the god queue".format( + job.id, job.get_job_runner_external_id() + ) + ) + ajs.old_state = "Q" ajs.running = False self.monitor_queue.put(ajs) - def _get_token(self, job_wrapper): - return job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)['access'] + return job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)["access"] - def prepare_job(self, job_wrapper, arcjob): - """ - job_wrapper is wrapper around python model galaxy.model.Job - input_datasets - output_datasets - input_dataset_collections... - parameters - - https://docs.galaxyproject.org/en/release_21.05/lib/galaxy.model.html?highlight=galaxy%20model#galaxy.model.Job - - Example of simple ARC job-description: - - - - - - arc_hello_test - - - arc.out - arc.err - - ./runhello.sh - - - - main - 1 - 100 - - - - runhello.sh - - - arcout1.txt - - file:///storage/galaxy/jobs_directory/007/7595/outputs/arcout1.txt - - - - arcout2.txt - - file:///storage/galaxy/jobs_directory/007/7595/outputs/arcout2.txt - - - - arc.out - - - arc.err - - - - + job_wrapper is wrapper around python model galaxy.model.Job + input_datasets + output_datasets + input_dataset_collections... + parameters + + https://docs.galaxyproject.org/en/release_21.05/lib/galaxy.model.html?highlight=galaxy%20model#galaxy.model.Job + + Example of simple ARC job-description: + + + + + + arc_hello_test + + + arc.out + arc.err + + ./runhello.sh + + + + main + 1 + 100 + + + + runhello.sh + + + arcout1.txt + + file:///storage/galaxy/jobs_directory/007/7595/outputs/arcout1.txt + + + + arcout2.txt + + file:///storage/galaxy/jobs_directory/007/7595/outputs/arcout2.txt + + + + arc.out + + + arc.err + + + + """ @@ -416,8 +425,8 @@ def prepare_job(self, job_wrapper, arcjob): job_input_params = {} """ Make a dictionary of the job inputs and use this for filling in the job description""" for param in galaxy_job.parameters: - job_input_params[str(param.name)]=str(param.value.strip('"')) - + job_input_params[str(param.name)] = str(param.value.strip('"')) + """ Organize the galaxy jobs input-files into executables, input- and output-files @@ -432,27 +441,27 @@ def prepare_job(self, job_wrapper, arcjob): TODO - This needs to be reconsidered so that any tool can work on an ARC endpoint. Not the special arc-tool created here. Need a way to reliably identify executables (if local) inputs and outputs independent on how the tool form is like """ - job_files = {} - job_files['inputs'] = [] - job_files['outputs'] = [] + description_builder = ActivityDescriptionBuilder() + + #job_files: Dict = {} + description_builder.job_files["inputs"] = [] + description_builder.job_files["outputs"] = [] for inputdata in galaxy_job.input_datasets: tag_name = inputdata.name file_name = (inputdata.__dict__["dataset"]).__dict__["name"] file_id = (inputdata.__dict__["dataset"]).__dict__["dataset_id"] - isExe = 'exe' in tag_name - data_dict = {'tag':tag_name,'name':file_name ,'dataset_id':file_id, 'isexe':isExe} - + isExe = "exe" in tag_name + data_dict = {"tag": tag_name, "name": file_name, "dataset_id": file_id, "isexe": isExe} + if "input" in tag_name or "exe" in tag_name: - job_files['inputs'].append(data_dict) + description_builder.job_files["inputs"].append(data_dict) elif "output" in tag_name: - job_files['outputs'].append(data_dict) - + description_builder.job_files["outputs"].append(data_dict) """ Fetch the other job description items from the ARC destination """ arc_cpuhrs = str(job_input_params["arcjob_cpuhrs"]) arc_mem = str(job_input_params["arcjob_memory"]) - """ TODO- should probably not be Hard-coded @@ -462,15 +471,12 @@ def prepare_job(self, job_wrapper, arcjob): std_out = "arc.out" std_err = "arc.err" - """ Construct the job description xml object """ """ TODO - extend to support fuller ARC job description options """ - description_builder = ActivityDescriptionBuilder() description_builder.name = "galaxy_arc_hello_test" description_builder.stdout = std_out description_builder.stderr = std_err - """ These are the files that are uploaded by the user for this job - store the path in a dict for use later key is dataset_id and value is the file path in the galaxy data folder """ inputfile_paths = job_wrapper.job_io.get_input_paths() @@ -478,28 +484,25 @@ def prepare_job(self, job_wrapper, arcjob): for idx, input_path in enumerate(inputfile_paths): job_inputfiles_galaxy_paths[input_path.dataset_id] = input_path.real_path - """ Populate datastaging exec tag with all exec files - in addition populate the arcjob object """ - for job_file in job_files['inputs']: - - dataset_name = job_file['name'] - dataset_id = job_file['dataset_id'] + for job_file in description_builder.job_files["inputs"]: + dataset_name = job_file["name"] + dataset_id = job_file["dataset_id"] dataset_path = job_inputfiles_galaxy_paths[dataset_id] - isexe = job_file['isexe'] - + isexe = job_file["isexe"] + """ Populate the arcjob object with the source - pyarcrest expects this""" - arcjob.inputFiles[dataset_name] = 'file://' + dataset_path + arcjob.inputFiles[dataset_name] = "file://" + dataset_path description_builder.inputs.append(dataset_name) if isexe: - """ Fill the appropriate job description fields expected for executables""" + """Fill the appropriate job description fields expected for executables""" """ App tag """ description_builder.exe_path = "./" + dataset_name - """ Populate datastaging output tag with all output files - in addition to populate the arcjob object""" """ Potentially more than one file - but currently actually only one, so the for-loop here is currently not actually needed """ - + description_builder.outputs.append("/") """ This is hardcoded stdout and stderr files from the ARC job defined here in the runner - TODO - not hardcoded """ @@ -521,4 +524,3 @@ def prepare_job(self, job_wrapper, arcjob): arcjob.StdErr = std_err arcjob.RequestedTotalCPUTime = arc_cpuhrs arcjob.descstr = description_builder.to_xml_str() - diff --git a/lib/galaxy/jobs/runners/util/arc_util.py b/lib/galaxy/jobs/runners/util/arc_util.py index 16dfc98f4ded..e2699145d6c9 100644 --- a/lib/galaxy/jobs/runners/util/arc_util.py +++ b/lib/galaxy/jobs/runners/util/arc_util.py @@ -1,11 +1,20 @@ from typing import ( List, + Dict, Optional, ) -from xml.etree.ElementTree import Element, SubElement, tostring +from xml.etree.ElementTree import ( + Element, + SubElement, + tostring, +) try: - from pyarcrest.arc import ARCJob, ARCRest, ARCRest_1_1 + from pyarcrest.arc import ( + ARCJob, + ARCRest, + ARCRest_1_1, + ) from pyarcrest.errors import ARCHTTPError except ImportError: ARCHTTPError = None @@ -15,11 +24,13 @@ def ensure_pyarc() -> None: if ARCHTTPError is None: - raise Exception("The configured functionality requires the Python package pyarcrest, but it isn't available in the Python environment.") + raise Exception( + "The configured functionality requires the Python package pyarcrest, but it isn't available in the Python environment." + ) def get_client(cluster_url: str, token: str) -> ARCRest_1_1: - return ARCRest.getClient(url=cluster_url, version="1.1", token=token, impls={"1.1":ARCRest_1_1}) + return ARCRest.getClient(url=cluster_url, version="1.1", token=token, impls={"1.1": ARCRest_1_1}) class ActivityDescriptionBuilder: @@ -32,56 +43,54 @@ class ActivityDescriptionBuilder: memory: str inputs: List[str] = [] outputs: List[str] = [] - + job_files: Dict[str, List[Dict]] = {} + def to_xml_str(self) -> str: - descr = Element('ActivityDescription') - descr.set("xmlns","http://www.eu-emi.eu/es/2010/12/adl") - descr.set("xmlns:emiestypes","http://www.eu-emi.eu/es/2010/12/types") - descr.set("xmlns:nordugrid-adl","http://www.nordugrid.org/es/2011/12/nordugrid-adl") + descr = Element("ActivityDescription") + descr.set("xmlns", "http://www.eu-emi.eu/es/2010/12/adl") + descr.set("xmlns:emiestypes", "http://www.eu-emi.eu/es/2010/12/types") + descr.set("xmlns:nordugrid-adl", "http://www.nordugrid.org/es/2011/12/nordugrid-adl") - actid = SubElement(descr,"ActivityIdentification") - app = SubElement(descr,"Application") - resources = SubElement(descr,"Resources") - datastaging = SubElement(descr,"DataStaging") + actid = SubElement(descr, "ActivityIdentification") + app = SubElement(descr, "Application") + resources = SubElement(descr, "Resources") + datastaging = SubElement(descr, "DataStaging") - actid_name = SubElement(actid,"Name") + actid_name = SubElement(actid, "Name") actid_name.text = "galaxy_arc_hello_test" - app_out = SubElement(app,"Output") + app_out = SubElement(app, "Output") app_out.text = self.stdout - app_err = SubElement(app,"Error") + app_err = SubElement(app, "Error") app_err.text = self.stderr - app_exe = SubElement(app,"Executable") - app_exe_path = SubElement(app_exe,"Path") + app_exe = SubElement(app, "Executable") + app_exe_path = SubElement(app_exe, "Path") app_exe_path.text = self.exe_path - for arc_input in self.inputs: - - """ Datastaging tag """ - sub_el = SubElement(datastaging,"InputFile") - subsub_el = SubElement(sub_el,"Name") + """Datastaging tag""" + sub_el = SubElement(datastaging, "InputFile") + subsub_el = SubElement(sub_el, "Name") subsub_el.text = arc_input for arc_output in self.outputs: - sub_el = SubElement(datastaging,"OutputFile") - subsub_el = SubElement(sub_el,"Name") + sub_el = SubElement(datastaging, "OutputFile") + subsub_el = SubElement(sub_el, "Name") subsub_el.text = arc_output - sub_el = SubElement(resources,"IndividualCPUTime") + sub_el = SubElement(resources, "IndividualCPUTime") sub_el.text = self.cpu_time - sub_el = SubElement(resources,"IndividualPhysicalMemory") + sub_el = SubElement(resources, "IndividualPhysicalMemory") sub_el.text = self.memory - return tostring(descr, encoding='unicode',method='xml') - + return tostring(descr, encoding="unicode", method="xml") __all__ = ( - 'ensure_pyarc', - 'get_client', - 'ARCJob', + "ensure_pyarc", + "get_client", + "ARCJob", ) From 7c8f1fd240f67914ea1abafcd277bcbf1c15a0e7 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Fri, 8 Sep 2023 17:26:23 +0200 Subject: [PATCH 11/21] More linting --- lib/galaxy/jobs/runners/arc.py | 9 ++++----- lib/galaxy/jobs/runners/util/arc_util.py | 1 - 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index 9faa761375d5..8b288bfa6a12 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -317,6 +317,7 @@ def stop_job(self, job_wrapper): try: # Initiate a delete call,if the job is running in ARC. waskilled = self.arcrest.killJobs([arc_jobid]) + f"Job with ARC id: {arc_jobid} and Galaxy id: {job_id} was killed by external request (user or admin). Status waskilld: {waskilled}" except Exception as e: log.debug( f"Job with ARC id: {arc_jobid} and Galaxy id: {job_id} was attempted killed by external request (user or admin), but this did not succeed. Exception was: {e}" @@ -417,9 +418,8 @@ def prepare_job(self, job_wrapper, arcjob): """ """ The job_wrapper.job_destination has access to the parameters from the id=arc destination configured in the job_conf""" - job_destination = job_wrapper.job_destination - galaxy_job = job_wrapper.get_job() - galaxy_workdir = job_wrapper.working_directory + #will be used later + #job_destination = job_wrapper.job_destination """ job_input_params are the input params fetched from the tool """ job_input_params = {} @@ -444,7 +444,6 @@ def prepare_job(self, job_wrapper, arcjob): description_builder = ActivityDescriptionBuilder() - #job_files: Dict = {} description_builder.job_files["inputs"] = [] description_builder.job_files["outputs"] = [] for inputdata in galaxy_job.input_datasets: @@ -481,7 +480,7 @@ def prepare_job(self, job_wrapper, arcjob): key is dataset_id and value is the file path in the galaxy data folder """ inputfile_paths = job_wrapper.job_io.get_input_paths() job_inputfiles_galaxy_paths = {} - for idx, input_path in enumerate(inputfile_paths): + for input_path in inputfile_paths: job_inputfiles_galaxy_paths[input_path.dataset_id] = input_path.real_path """ Populate datastaging exec tag with all exec files - in addition populate the arcjob object """ diff --git a/lib/galaxy/jobs/runners/util/arc_util.py b/lib/galaxy/jobs/runners/util/arc_util.py index e2699145d6c9..36b264ef0439 100644 --- a/lib/galaxy/jobs/runners/util/arc_util.py +++ b/lib/galaxy/jobs/runners/util/arc_util.py @@ -1,7 +1,6 @@ from typing import ( List, Dict, - Optional, ) from xml.etree.ElementTree import ( Element, From b6acefb539d544966642dd548bbaea0b8a28c651 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Mon, 11 Sep 2023 18:29:21 +0200 Subject: [PATCH 12/21] Refactored to remove need for internal job id mapping. --- lib/galaxy/jobs/runners/arc.py | 48 +++++++++++------------- lib/galaxy/jobs/runners/util/arc_util.py | 7 +++- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index 8b288bfa6a12..c2ffbde3af4e 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -11,7 +11,10 @@ ARCJob, ensure_pyarc, get_client, + ARCHTTPError, + NoValueInARCResult, ) + from galaxy.util import unicodify log = logging.getLogger(__name__) @@ -26,7 +29,6 @@ class Arc: def __init__(self): self.cluster = "" - self.job_mapping = {} self.ARC_STATE_MAPPING = { "ACCEPTING": "Accepted", @@ -51,10 +53,9 @@ def __init__(self): "Job not found": "Failed", } - def set_job_cluster(self, cluster): - self.cluster = cluster + class ArcRESTJobRunner(AsynchronousJobRunner): """ Job runner backed by a finite pool of worker threads. FIFO scheduling @@ -69,7 +70,7 @@ def __init__(self, app, nworkers, **kwargs): """ # Start the job runner parent object - super(ArcRESTJobRunner, self).__init__(app, nworkers, **kwargs) + super().__init__(app, nworkers, **kwargs) ensure_pyarc() self.arc = Arc() @@ -92,7 +93,7 @@ def queue_job(self, job_wrapper): """ Set the cluster to submit the job to - extracted from the job_destination parameters in job_conf.xml """ user_preferences = job_wrapper.get_job().user.extra_preferences arc_url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") - self.arc.set_job_cluster(arc_url) + self.arc.cluster = arc_url """ Prepare and submit job to arc """ self.prepare_job(job_wrapper, self.arcjob) @@ -111,15 +112,15 @@ def queue_job(self, job_wrapper): results = self.arcrest.createJobs(bulkdesc, delegationID=delegationID) arc_jobid = None - if isinstance(results[0], self.arcrest.ARCHTTPError): + if isinstance(results[0], ARCHTTPError): # submission error log.error("Job creation failure. No Response from ARC") job_wrapper.fail("Not submitted") else: # successful submission arc_jobid, status = results[0] - job_wrapper.get_job().job_runner_external_id = arc_jobid - log.debug(f"Successfully submitted job to remote ARC resource {self.arc.cluster} with ARC id: {arc_jobid}") + job_wrapper.set_external_id(arc_jobid) + log.debug(f"Successfully submitted job to remote ARC resource {self.arc.cluster} with ARC id: {arc_jobid}job_wrapper.external_job_id: {job_wrapper.get_job().job_runner_external_id} job_wrapper.get_job().get-job_runner_external_id(): {job_wrapper.get_job().get_job_runner_external_id()}") # beware! this means 1 worker, no timeout and default upload buffer errors = self.arcrest.uploadJobFiles([arc_jobid], [self.arcjob.inputFiles]) if errors[0]: # input upload error @@ -133,12 +134,11 @@ def queue_job(self, job_wrapper): log.debug( f"Successfully uploaded input-files {self.arcjob.inputFiles} to remote ARC resource {self.arc.cluster} for job with galaxy-id: {galaxy_jobid} and ARC id: {arc_jobid}" ) - self.arc.job_mapping[galaxy_jobid] = arc_jobid # Create an object of AsynchronousJobState and add it to the monitor queue. ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, - job_id=galaxy_jobid, + job_id=arc_jobid, job_destination=job_destination, ) self.monitor_queue.put(ajs) @@ -152,8 +152,7 @@ def place_output_files(self, job_state, job_status_arc): galaxy_workdir = job_dir + "/working" galaxy_outputs = job_dir + "/outputs" - arc_jobid = self.arc.job_mapping[job_state.job_id] - outputs_dir = job_state.job_wrapper.outputs_directory + arc_jobid = job_state.job_id """ job_state.output_file and job_state.error_file is e.g. galaxy_5.e and galaxy_5.o where 5 is the galaxy job id """ """ Hardcoded out and err files - this is ok. But TODO - need to handle if the tool itself has some stdout that should be kept""" @@ -205,7 +204,6 @@ def check_watched_item(self, job_state): """ galaxy_job_wrapper = job_state.job_wrapper - galaxy_job = galaxy_job_wrapper.get_job() galaxy_workdir = galaxy_job_wrapper.working_directory mapped_state = "" @@ -214,9 +212,10 @@ def check_watched_item(self, job_state): self.arcrest = get_client(self.arc.cluster, token=token) """ Get task from ARC """ - arc_jobid = self.arc.job_mapping[job_state.job_id] + arc_jobid = job_state.job_id arc_job_state = self.arcrest.getJobsStatus([arc_jobid])[0] - if arc_job_state is None: + + if isinstance(arc_job_state, ARCHTTPError) or isinstance(arc_job_state, NoValueInARCResult): return None if arc_job_state: @@ -296,20 +295,16 @@ def stop_job(self, job_wrapper): No Return data expected """ job_id = job_wrapper.job_id - arc_jobid = "" + arc_jobid = job_wrapper.get_job().job_runner_external_id + """ Make sure to get a fresh token and client """ token = self._get_token(job_wrapper) self.arcrest = get_client(self.arc.cluster, token=token) - # Get task status from ARC. - try: - arc_jobid = self.arc.job_mapping[job_id] - except KeyError: - log.debug(f"Could not find arc_jobid for stopping job {job_id}") - return None - arc_job_state = self.arcrest.getJobsStatus([arc_jobid]) + """ Get the current ARC job status from the remote ARC endpoint """ + arc_job_state = self.arcrest.getJobsStatus([arc_jobid])[0] if arc_job_state is None: return None mapped_state = self.arc.ARC_STATE_MAPPING[arc_job_state] @@ -330,7 +325,7 @@ def recover(self, job, job_wrapper): """ This method is called by galaxy at the time of startup. Jobs in Running & Queued status in galaxy are put in the monitor_queue by creating an AsynchronousJobState object """ - job_id = job_wrapper.job_id + job_id = job_wrapper.job_runner_external_id() ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper) ajs.job_id = str(job_id) ajs.job_destination = job_wrapper.job_destination @@ -339,7 +334,7 @@ def recover(self, job, job_wrapper): if job.state == model.Job.states.RUNNING: log.debug( "({}/{}) is still in running state, adding to the god queue".format( - job.id, job.get_job_runner_external_id() + job.id, job.job_runner_external_id() ) ) ajs.old_state = "R" @@ -349,7 +344,7 @@ def recover(self, job, job_wrapper): elif job.state == model.Job.states.QUEUED: log.debug( "({}/{}) is still in god queued state, adding to the god queue".format( - job.id, job.get_job_runner_external_id() + job.id, job.job_runner_external_id() ) ) ajs.old_state = "Q" @@ -420,6 +415,7 @@ def prepare_job(self, job_wrapper, arcjob): """ The job_wrapper.job_destination has access to the parameters from the id=arc destination configured in the job_conf""" #will be used later #job_destination = job_wrapper.job_destination + galaxy_job = job_wrapper.get_job() """ job_input_params are the input params fetched from the tool """ job_input_params = {} diff --git a/lib/galaxy/jobs/runners/util/arc_util.py b/lib/galaxy/jobs/runners/util/arc_util.py index 36b264ef0439..0d9cf5d6f90b 100644 --- a/lib/galaxy/jobs/runners/util/arc_util.py +++ b/lib/galaxy/jobs/runners/util/arc_util.py @@ -14,9 +14,14 @@ ARCRest, ARCRest_1_1, ) - from pyarcrest.errors import ARCHTTPError + from pyarcrest.errors import ( + ARCHTTPError, + NoValueInARCResult, + ) + except ImportError: ARCHTTPError = None + NoValueInARCResult = None ARCRest_1_1 = None ARCJob = None From 91a20f38167fb4da76fbd09226351e1543d24fb7 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Wed, 13 Sep 2023 12:38:30 +0200 Subject: [PATCH 13/21] Refactoring and simplification. Especially for the ARC job description part. --- lib/galaxy/jobs/runners/arc.py | 163 +++++++++++++++------------------ 1 file changed, 75 insertions(+), 88 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index c2ffbde3af4e..4246b491fe3a 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -7,14 +7,13 @@ AsynchronousJobState, ) from galaxy.jobs.runners.util.arc_util import ( - ActivityDescriptionBuilder, + ARCJobBuilder, + ARCHTTPError, + NoValueInARCResult, ARCJob, ensure_pyarc, get_client, - ARCHTTPError, - NoValueInARCResult, ) - from galaxy.util import unicodify log = logging.getLogger(__name__) @@ -28,7 +27,7 @@ class Arc: """ def __init__(self): - self.cluster = "" + self.url = "" self.ARC_STATE_MAPPING = { "ACCEPTING": "Accepted", @@ -73,10 +72,10 @@ def __init__(self, app, nworkers, **kwargs): super().__init__(app, nworkers, **kwargs) ensure_pyarc() - self.arc = Arc() - self.arcjob = ARCJob() + self.arc = None + self.arcjob = None self.provider_backend = provider_name_to_backend("wlcg") - # self.arc_url = None + def queue_job(self, job_wrapper): """When a tool is submitted for execution in galaxy""" @@ -90,23 +89,23 @@ def queue_job(self, job_wrapper): job_destination = job_wrapper.job_destination galaxy_jobid = job_wrapper.job_id - """ Set the cluster to submit the job to - extracted from the job_destination parameters in job_conf.xml """ + """ Set the ARC endpoint url to submit the job to - extracted from the job_destination parameters in job_conf.xml """ user_preferences = job_wrapper.get_job().user.extra_preferences - arc_url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") - self.arc.cluster = arc_url + self.arc = Arc() + self.arc.url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") """ Prepare and submit job to arc """ - self.prepare_job(job_wrapper, self.arcjob) + #self.arcjob = ARCJob() + arc_job = self.prepare_job(job_wrapper) token = job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)["access"] - # proxypath is ignored if you are using token - self.arcrest = get_client(self.arc.cluster, token=token) + self.arcrest = get_client(self.arc.url, token=token) # token parameter isn't necessary, unless there is a bug delegationID = self.arcrest.createDelegation() bulkdesc = "" - bulkdesc += self.arcjob.descstr + bulkdesc += arc_job.descstr bulkdesc += "" results = self.arcrest.createJobs(bulkdesc, delegationID=delegationID) @@ -120,19 +119,19 @@ def queue_job(self, job_wrapper): # successful submission arc_jobid, status = results[0] job_wrapper.set_external_id(arc_jobid) - log.debug(f"Successfully submitted job to remote ARC resource {self.arc.cluster} with ARC id: {arc_jobid}job_wrapper.external_job_id: {job_wrapper.get_job().job_runner_external_id} job_wrapper.get_job().get-job_runner_external_id(): {job_wrapper.get_job().get_job_runner_external_id()}") + log.debug(f"Successfully submitted job to remote ARC resource {self.arc.url} with ARC id: {arc_jobid}job_wrapper.external_job_id: {job_wrapper.get_job().job_runner_external_id} job_wrapper.get_job().get-job_runner_external_id(): {job_wrapper.get_job().get_job_runner_external_id()}") # beware! this means 1 worker, no timeout and default upload buffer - errors = self.arcrest.uploadJobFiles([arc_jobid], [self.arcjob.inputFiles]) + errors = self.arcrest.uploadJobFiles([arc_jobid], [arc_job.inputs]) if errors[0]: # input upload error log.error("Job creation failure. No Response from ARC") log.debug( - f"Could not upload job files for job with galaxy-id: {galaxy_jobid} to ARC resource {self.arc.cluster}. Error was: {errors[0]}" + f"Could not upload job files for job with galaxy-id: {galaxy_jobid} to ARC resource {self.arc.url}. Error was: {errors[0]}" ) job_wrapper.fail("Not submitted") else: # successful input upload log.debug( - f"Successfully uploaded input-files {self.arcjob.inputFiles} to remote ARC resource {self.arc.cluster} for job with galaxy-id: {galaxy_jobid} and ARC id: {arc_jobid}" + f"Successfully uploaded input-files {arc_job.inputs.keys()} to remote ARC resource {self.arc.url} for job with galaxy-id: {galaxy_jobid} and ARC id: {arc_jobid}" ) # Create an object of AsynchronousJobState and add it to the monitor queue. ajs = AsynchronousJobState( @@ -209,7 +208,7 @@ def check_watched_item(self, job_state): """ Make sure to get a fresh token and client """ token = self._get_token(galaxy_job_wrapper) - self.arcrest = get_client(self.arc.cluster, token=token) + self.arcrest = get_client(self.arc.url, token=token) """ Get task from ARC """ arc_jobid = job_state.job_id @@ -224,7 +223,7 @@ def check_watched_item(self, job_state): log.debug(f"Could not map state of ARC job with id: {arc_jobid} and Galaxy job id: {job_state.job_id}") return None - self.arcrest = get_client(self.arc.cluster, token=self._get_token(galaxy_job_wrapper)) + self.arcrest = get_client(self.arc.url, token=self._get_token(galaxy_job_wrapper)) if mapped_state == "Finished": job_state.running = False @@ -300,7 +299,7 @@ def stop_job(self, job_wrapper): """ Make sure to get a fresh token and client """ token = self._get_token(job_wrapper) - self.arcrest = get_client(self.arc.cluster, token=token) + self.arcrest = get_client(self.arc.url, token=token) """ Get the current ARC job status from the remote ARC endpoint """ @@ -354,7 +353,7 @@ def recover(self, job, job_wrapper): def _get_token(self, job_wrapper): return job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)["access"] - def prepare_job(self, job_wrapper, arcjob): + def prepare_job(self, job_wrapper): """ job_wrapper is wrapper around python model galaxy.model.Job input_datasets @@ -413,8 +412,6 @@ def prepare_job(self, job_wrapper, arcjob): """ """ The job_wrapper.job_destination has access to the parameters from the id=arc destination configured in the job_conf""" - #will be used later - #job_destination = job_wrapper.job_destination galaxy_job = job_wrapper.get_job() """ job_input_params are the input params fetched from the tool """ @@ -425,35 +422,55 @@ def prepare_job(self, job_wrapper, arcjob): """ Organize the galaxy jobs input-files into executables, input- and output-files + The ARC job description expects a different format for executables compared to other input files. - This works currently in the following way + This works currently in the following way for the ARC test-tool - The tool (hello_arc.xml) has param with name tag arcjob_exe, arcjob_outputs (and could potentially have arcjob_inputs) + + If the galaxy_job.get_input_datasets() name attribute has "exe" in it: In the below I match the strings - exe in the tag_name to match the input file uploaded via the arcjob_exe form field - - output in the tag_name to match the input file uploaded via the arcjob_outputs form field - - currently input is empty - TODO - TODO - discuss if the exe box is convenient to use - I think so - This can then be used as a generic tool to run any kind of script. But then... must consider what to do with dependencies... So probably this option would lead to lots of errors for users. + Else I treat it as "ordinary" input file. - TODO - This needs to be reconsidered so that any tool can work on an ARC endpoint. Not the special arc-tool created here. - Need a way to reliably identify executables (if local) inputs and outputs independent on how the tool form is like + For outputs - I get the galaxy_job.get_output_datasets(). + Currently in the ARC test-tool there is no specified specific output files - ARC client will collect all output files generated in the ARC jobs working directory. + + TODO: Use the command-builder to extract the executable command instead of using an executable file uploaded to Galaxy. + TODO: Extend to support fuller ARC job description options - such as ARC runtimeenvironment that inform the ARC client about what capabilities the endpoint has. + e.g. what software is installed. """ + + arc_job = ARCJobBuilder() - description_builder = ActivityDescriptionBuilder() - - description_builder.job_files["inputs"] = [] - description_builder.job_files["outputs"] = [] - for inputdata in galaxy_job.input_datasets: - tag_name = inputdata.name - file_name = (inputdata.__dict__["dataset"]).__dict__["name"] - file_id = (inputdata.__dict__["dataset"]).__dict__["dataset_id"] - isExe = "exe" in tag_name - data_dict = {"tag": tag_name, "name": file_name, "dataset_id": file_id, "isexe": isExe} - - if "input" in tag_name or "exe" in tag_name: - description_builder.job_files["inputs"].append(data_dict) - elif "output" in tag_name: - description_builder.job_files["outputs"].append(data_dict) + """ + These are the files that are uploaded by the user for this job + file_source: is the file path in the galaxy data folder, + file_realname: the filename the uploaded file had + tool_input_tag: - the tools form input name + """ + input_datasets = galaxy_job.get_input_datasets() + + for input_data in input_datasets: + + file_source = input_data.dataset.get_file_name() + tool_input_tag = input_data.name + file_realname = input_data.dataset.get_display_name() + + arc_job.inputs[real_name] = "file://" + file_source + + """ This is just for the ARC test-tool, will not be used in the final version using generic tools. """ + if "exe" in tool_input_tag: + arc_job.exe_path = "./" + real_name + + + """ Potentially more than one file - but currently actually only one, so the for-loop here is currently not actually needed """ + output_datasets = galaxy_job.get_output_datasets() + arc_job.outputs.append("/") + for output_data in output_datasets: + file_name = output_data.name + arc_job.outputs.append(file_name) + """ Fetch the other job description items from the ARC destination """ arc_cpuhrs = str(job_input_params["arcjob_cpuhrs"]) arc_mem = str(job_input_params["arcjob_memory"]) @@ -466,56 +483,26 @@ def prepare_job(self, job_wrapper, arcjob): std_out = "arc.out" std_err = "arc.err" + + """ This is hardcoded stdout and stderr files from the ARC job defined here in the runner - TODO - not hardcoded """ + arc_job.stdout = std_out + arc_job.stderr = std_err + """ Construct the job description xml object """ - """ TODO - extend to support fuller ARC job description options """ - description_builder.name = "galaxy_arc_hello_test" - description_builder.stdout = std_out - description_builder.stderr = std_err - - """ These are the files that are uploaded by the user for this job - store the path in a dict for use later - key is dataset_id and value is the file path in the galaxy data folder """ - inputfile_paths = job_wrapper.job_io.get_input_paths() - job_inputfiles_galaxy_paths = {} - for input_path in inputfile_paths: - job_inputfiles_galaxy_paths[input_path.dataset_id] = input_path.real_path - - """ Populate datastaging exec tag with all exec files - in addition populate the arcjob object """ - for job_file in description_builder.job_files["inputs"]: - dataset_name = job_file["name"] - dataset_id = job_file["dataset_id"] - dataset_path = job_inputfiles_galaxy_paths[dataset_id] - isexe = job_file["isexe"] - - """ Populate the arcjob object with the source - pyarcrest expects this""" - arcjob.inputFiles[dataset_name] = "file://" + dataset_path - description_builder.inputs.append(dataset_name) - - if isexe: - """Fill the appropriate job description fields expected for executables""" - """ App tag """ - description_builder.exe_path = "./" + dataset_name - - """ Populate datastaging output tag with all output files - in addition to populate the arcjob object""" - """ Potentially more than one file - but currently actually only one, so the for-loop here is currently not actually needed """ - - description_builder.outputs.append("/") + arc_job.name = "galaxy_arc_hello_test" - """ This is hardcoded stdout and stderr files from the ARC job defined here in the runner - TODO - not hardcoded """ - description_builder.outputs.append(std_out) - description_builder.outputs.append(std_err) """ TODO - just a sample, this will probably be set by the destination itself - to be discussed """ - description_builder.cpu_time = arc_cpuhrs + arc_job.cpu_time = arc_cpuhrs """ TODO - just a sample, this will probably be set by the destination itself - to be discussed """ - description_builder.memory = arc_mem + arc_job.memory = arc_mem """ Populate the arcjob object with rest of necessary and useful fields including the full job description string""" """ All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list - here it is just the folder / and all files in the folder will be downloaded. The arc.py in pyarcrest loops over this list to fetch all outputfiles """ - arcjob.downloadFiles.append("/") - arcjob.StdOut = std_out - arcjob.StdErr = std_err - arcjob.RequestedTotalCPUTime = arc_cpuhrs - arcjob.descstr = description_builder.to_xml_str() + arc_job.descstr = arc_job.to_xml_str() + + + return arc_job From 88451b18e3dbf3e3efb47cf3a794de1fcca41b6c Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Wed, 13 Sep 2023 12:56:58 +0200 Subject: [PATCH 14/21] Fix last-minute change typo --- lib/galaxy/jobs/runners/arc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index 4246b491fe3a..08d08c475bc9 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -456,11 +456,11 @@ def prepare_job(self, job_wrapper): tool_input_tag = input_data.name file_realname = input_data.dataset.get_display_name() - arc_job.inputs[real_name] = "file://" + file_source + arc_job.inputs[file_realname] = "file://" + file_source """ This is just for the ARC test-tool, will not be used in the final version using generic tools. """ if "exe" in tool_input_tag: - arc_job.exe_path = "./" + real_name + arc_job.exe_path = "./" + file_realname """ Potentially more than one file - but currently actually only one, so the for-loop here is currently not actually needed """ From dda0a0e2969a6025cdbaaf2c17d064913b310c10 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Wed, 13 Sep 2023 12:58:47 +0200 Subject: [PATCH 15/21] Forgot to also commit the changed arc_util.py --- lib/galaxy/jobs/runners/util/arc_util.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/galaxy/jobs/runners/util/arc_util.py b/lib/galaxy/jobs/runners/util/arc_util.py index 0d9cf5d6f90b..fcbd26def282 100644 --- a/lib/galaxy/jobs/runners/util/arc_util.py +++ b/lib/galaxy/jobs/runners/util/arc_util.py @@ -37,7 +37,7 @@ def get_client(cluster_url: str, token: str) -> ARCRest_1_1: return ARCRest.getClient(url=cluster_url, version="1.1", token=token, impls={"1.1": ARCRest_1_1}) -class ActivityDescriptionBuilder: +class ARCJobBuilder: name: str stdout: str stderr: str @@ -45,9 +45,11 @@ class ActivityDescriptionBuilder: cpu_time: str exe_path: str memory: str - inputs: List[str] = [] + inputs: Dict[str, str] = {} outputs: List[str] = [] - job_files: Dict[str, List[Dict]] = {} + #job_files: Dict[str, List[Dict]] = {} + #job_files: Dict[str, str] = {} + descrstr: str def to_xml_str(self) -> str: descr = Element("ActivityDescription") From 894cc9a62877407c1e487acfa73c0aefb8942788 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Wed, 13 Sep 2023 13:11:47 +0200 Subject: [PATCH 16/21] Remove unnecessary open mode parameters --- lib/galaxy/jobs/runners/arc.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index 08d08c475bc9..841a1f342291 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -10,7 +10,6 @@ ARCJobBuilder, ARCHTTPError, NoValueInARCResult, - ARCJob, ensure_pyarc, get_client, ) @@ -95,7 +94,6 @@ def queue_job(self, job_wrapper): self.arc.url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") """ Prepare and submit job to arc """ - #self.arcjob = ARCJob() arc_job = self.prepare_job(job_wrapper) token = job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)["access"] @@ -161,7 +159,7 @@ def place_output_files(self, job_state, job_status_arc): # Read from ARC output_file and write it into galaxy output_file. out_log = "" tool_stdout_path = galaxy_outputs + "/tool_stdout" - with open(galaxy_workdir + "/" + arc_jobid + "/arc.out", "r") as f: + with open(galaxy_workdir + "/" + arc_jobid + "/arc.out") as f: out_log = f.read() with open(job_state.output_file, "a+") as log_file: log_file.write(out_log) @@ -172,7 +170,7 @@ def place_output_files(self, job_state, job_status_arc): # Read from ARC error_file and write it into galaxy error_file. err_log = "" tool_stderr_path = galaxy_outputs + "/tool_stderr" - with open(galaxy_workdir + "/" + arc_jobid + "/arc.err", "r") as f: + with open(galaxy_workdir + "/" + arc_jobid + "/arc.err") as f: err_log = f.read() with open(job_state.error_file, "w+") as log_file: log_file.write(err_log) From bd1d30fa64f6235a3b0c166a73a012c9dda15881 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Wed, 13 Sep 2023 13:15:24 +0200 Subject: [PATCH 17/21] Remove comment - linting --- lib/galaxy/jobs/runners/util/arc_util.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/galaxy/jobs/runners/util/arc_util.py b/lib/galaxy/jobs/runners/util/arc_util.py index fcbd26def282..dd9fc17aff18 100644 --- a/lib/galaxy/jobs/runners/util/arc_util.py +++ b/lib/galaxy/jobs/runners/util/arc_util.py @@ -47,8 +47,6 @@ class ARCJobBuilder: memory: str inputs: Dict[str, str] = {} outputs: List[str] = [] - #job_files: Dict[str, List[Dict]] = {} - #job_files: Dict[str, str] = {} descrstr: str def to_xml_str(self) -> str: From 678dd76680f5c34ed8a1cb14b332864ef6476490 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Wed, 13 Sep 2023 14:13:14 +0200 Subject: [PATCH 18/21] Hopefully fixed all linting errors now --- lib/galaxy/jobs/runners/arc.py | 50 +++++++++++++++--------- lib/galaxy/jobs/runners/util/arc_util.py | 8 ++-- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index 841a1f342291..f2f8122acf6f 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -8,11 +8,18 @@ ) from galaxy.jobs.runners.util.arc_util import ( ARCJobBuilder, - ARCHTTPError, - NoValueInARCResult, ensure_pyarc, get_client, ) +try: + from pyarcrest.errors import ( + ARCHTTPError, + NoValueInARCResult, + ) +except ImportError: + ARCHTTPError = None + NoValueInARCResult = None + from galaxy.util import unicodify log = logging.getLogger(__name__) @@ -52,8 +59,6 @@ def __init__(self): } - - class ArcRESTJobRunner(AsynchronousJobRunner): """ Job runner backed by a finite pool of worker threads. FIFO scheduling @@ -75,7 +80,6 @@ def __init__(self, app, nworkers, **kwargs): self.arcjob = None self.provider_backend = provider_name_to_backend("wlcg") - def queue_job(self, job_wrapper): """When a tool is submitted for execution in galaxy""" """ This method @@ -117,7 +121,9 @@ def queue_job(self, job_wrapper): # successful submission arc_jobid, status = results[0] job_wrapper.set_external_id(arc_jobid) - log.debug(f"Successfully submitted job to remote ARC resource {self.arc.url} with ARC id: {arc_jobid}job_wrapper.external_job_id: {job_wrapper.get_job().job_runner_external_id} job_wrapper.get_job().get-job_runner_external_id(): {job_wrapper.get_job().get_job_runner_external_id()}") + log.debug( + f"Successfully submitted job to remote ARC resource {self.arc.url} with ARC id: {arc_jobid}job_wrapper.external_job_id: {job_wrapper.get_job().job_runner_external_id} job_wrapper.get_job().get-job_runner_external_id(): {job_wrapper.get_job().get_job_runner_external_id()}" + ) # beware! this means 1 worker, no timeout and default upload buffer errors = self.arcrest.uploadJobFiles([arc_jobid], [arc_job.inputs]) if errors[0]: # input upload error @@ -204,6 +210,13 @@ def check_watched_item(self, job_state): galaxy_workdir = galaxy_job_wrapper.working_directory mapped_state = "" + + """ Set the ARC endpoint url to submit the job to - extracted from the job_destination parameters in job_conf.xml """ + user_preferences = galaxy_job_wrapper.get_job().user.extra_preferences + self.arc = Arc() + self.arc.url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") + + """ Make sure to get a fresh token and client """ token = self._get_token(galaxy_job_wrapper) self.arcrest = get_client(self.arc.url, token=token) @@ -295,11 +308,16 @@ def stop_job(self, job_wrapper): arc_jobid = job_wrapper.get_job().job_runner_external_id + """ Set the ARC endpoint url to submit the job to - extracted from the job_destination parameters in job_conf.xml """ + user_preferences = job_wrapper.get_job().user.extra_preferences + self.arc = Arc() + self.arc.url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") + + """ Make sure to get a fresh token and client """ token = self._get_token(job_wrapper) self.arcrest = get_client(self.arc.url, token=token) - """ Get the current ARC job status from the remote ARC endpoint """ arc_job_state = self.arcrest.getJobsStatus([arc_jobid])[0] if arc_job_state is None: @@ -437,7 +455,7 @@ def prepare_job(self, job_wrapper): TODO: Extend to support fuller ARC job description options - such as ARC runtimeenvironment that inform the ARC client about what capabilities the endpoint has. e.g. what software is installed. """ - + arc_job = ARCJobBuilder() """ @@ -447,19 +465,17 @@ def prepare_job(self, job_wrapper): tool_input_tag: - the tools form input name """ input_datasets = galaxy_job.get_input_datasets() - + for input_data in input_datasets: - file_source = input_data.dataset.get_file_name() tool_input_tag = input_data.name file_realname = input_data.dataset.get_display_name() - + arc_job.inputs[file_realname] = "file://" + file_source - + """ This is just for the ARC test-tool, will not be used in the final version using generic tools. """ if "exe" in tool_input_tag: arc_job.exe_path = "./" + file_realname - """ Potentially more than one file - but currently actually only one, so the for-loop here is currently not actually needed """ output_datasets = galaxy_job.get_output_datasets() @@ -468,7 +484,6 @@ def prepare_job(self, job_wrapper): file_name = output_data.name arc_job.outputs.append(file_name) - """ Fetch the other job description items from the ARC destination """ arc_cpuhrs = str(job_input_params["arcjob_cpuhrs"]) arc_mem = str(job_input_params["arcjob_memory"]) @@ -481,15 +496,13 @@ def prepare_job(self, job_wrapper): std_out = "arc.out" std_err = "arc.err" - """ This is hardcoded stdout and stderr files from the ARC job defined here in the runner - TODO - not hardcoded """ arc_job.stdout = std_out arc_job.stderr = std_err - + """ Construct the job description xml object """ arc_job.name = "galaxy_arc_hello_test" - """ TODO - just a sample, this will probably be set by the destination itself - to be discussed """ arc_job.cpu_time = arc_cpuhrs @@ -500,7 +513,6 @@ def prepare_job(self, job_wrapper): """ All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list - here it is just the folder / and all files in the folder will be downloaded. The arc.py in pyarcrest loops over this list to fetch all outputfiles """ - arc_job.descstr = arc_job.to_xml_str() - + arc_job.descrstr = arc_job.to_xml_str() return arc_job diff --git a/lib/galaxy/jobs/runners/util/arc_util.py b/lib/galaxy/jobs/runners/util/arc_util.py index dd9fc17aff18..0ec82f1e5f82 100644 --- a/lib/galaxy/jobs/runners/util/arc_util.py +++ b/lib/galaxy/jobs/runners/util/arc_util.py @@ -1,6 +1,6 @@ from typing import ( - List, Dict, + List, ) from xml.etree.ElementTree import ( Element, @@ -17,8 +17,8 @@ from pyarcrest.errors import ( ARCHTTPError, NoValueInARCResult, - ) - + ) + except ImportError: ARCHTTPError = None NoValueInARCResult = None @@ -48,7 +48,7 @@ class ARCJobBuilder: inputs: Dict[str, str] = {} outputs: List[str] = [] descrstr: str - + def to_xml_str(self) -> str: descr = Element("ActivityDescription") descr.set("xmlns", "http://www.eu-emi.eu/es/2010/12/adl") From 83417f8b4d47fb7f355c8db7eaec6a2a394326fa Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Wed, 13 Sep 2023 14:21:19 +0200 Subject: [PATCH 19/21] more linting of whitespace --- lib/galaxy/jobs/runners/arc.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index f2f8122acf6f..c5e54b7a1569 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -11,6 +11,7 @@ ensure_pyarc, get_client, ) + try: from pyarcrest.errors import ( ARCHTTPError, @@ -210,13 +211,11 @@ def check_watched_item(self, job_state): galaxy_workdir = galaxy_job_wrapper.working_directory mapped_state = "" - """ Set the ARC endpoint url to submit the job to - extracted from the job_destination parameters in job_conf.xml """ user_preferences = galaxy_job_wrapper.get_job().user.extra_preferences self.arc = Arc() self.arc.url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") - """ Make sure to get a fresh token and client """ token = self._get_token(galaxy_job_wrapper) self.arcrest = get_client(self.arc.url, token=token) @@ -307,13 +306,11 @@ def stop_job(self, job_wrapper): job_id = job_wrapper.job_id arc_jobid = job_wrapper.get_job().job_runner_external_id - """ Set the ARC endpoint url to submit the job to - extracted from the job_destination parameters in job_conf.xml """ user_preferences = job_wrapper.get_job().user.extra_preferences self.arc = Arc() self.arc.url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None") - """ Make sure to get a fresh token and client """ token = self._get_token(job_wrapper) self.arcrest = get_client(self.arc.url, token=token) From 75087afc817f5cf72eaae7258ab8920efc5eaf1c Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Wed, 13 Sep 2023 14:39:30 +0200 Subject: [PATCH 20/21] More linting and some bugfix related to external job id --- lib/galaxy/jobs/runners/arc.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index c5e54b7a1569..7126214f188a 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -108,7 +108,7 @@ def queue_job(self, job_wrapper): delegationID = self.arcrest.createDelegation() bulkdesc = "" - bulkdesc += arc_job.descstr + bulkdesc += arc_job.descrstr bulkdesc += "" results = self.arcrest.createJobs(bulkdesc, delegationID=delegationID) @@ -337,28 +337,20 @@ def recover(self, job, job_wrapper): """ This method is called by galaxy at the time of startup. Jobs in Running & Queued status in galaxy are put in the monitor_queue by creating an AsynchronousJobState object """ - job_id = job_wrapper.job_runner_external_id() + job_id = job.job_runner_external_id ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper) ajs.job_id = str(job_id) ajs.job_destination = job_wrapper.job_destination job_wrapper.command_line = job.command_line ajs.job_wrapper = job_wrapper if job.state == model.Job.states.RUNNING: - log.debug( - "({}/{}) is still in running state, adding to the god queue".format( - job.id, job.job_runner_external_id() - ) - ) + log.debug("({}/{}) is still in running state, adding to the god queue".format(job.id, ajs.job_id)) ajs.old_state = "R" ajs.running = True self.monitor_queue.put(ajs) elif job.state == model.Job.states.QUEUED: - log.debug( - "({}/{}) is still in god queued state, adding to the god queue".format( - job.id, job.job_runner_external_id() - ) - ) + log.debug("({}/{}) is still in god queued state, adding to the god queue".format(job.id, ajs.job_id)) ajs.old_state = "Q" ajs.running = False self.monitor_queue.put(ajs) From d1f2fb0922e39b59d740fa61bab622cebfd518e8 Mon Sep 17 00:00:00 2001 From: Maiken Pedersen Date: Wed, 13 Sep 2023 15:09:52 +0200 Subject: [PATCH 21/21] linting whitespace --- lib/galaxy/jobs/runners/arc.py | 45 +++++++++++++++++----------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/lib/galaxy/jobs/runners/arc.py b/lib/galaxy/jobs/runners/arc.py index 7126214f188a..bd210f0c6d25 100644 --- a/lib/galaxy/jobs/runners/arc.py +++ b/lib/galaxy/jobs/runners/arc.py @@ -83,7 +83,7 @@ def __init__(self, app, nworkers, **kwargs): def queue_job(self, job_wrapper): """When a tool is submitted for execution in galaxy""" - """ This method + """ This method 1. Fetches the configured ARC endpoint for this user 2. Prepares an ARC job description based on the jobs destination parameters 3. Submits the job to the remote ARC endpoint via pyarcrest @@ -425,33 +425,32 @@ def prepare_job(self, job_wrapper): for param in galaxy_job.parameters: job_input_params[str(param.name)] = str(param.value.strip('"')) - """ - Organize the galaxy jobs input-files into executables, input- and output-files - The ARC job description expects a different format for executables compared to other input files. - + """ Organize the galaxy jobs input-files into executables, input- and output-files + The ARC job description expects a different format for executables compared to other input files. + This works currently in the following way for the ARC test-tool - - The tool (hello_arc.xml) has param with name tag arcjob_exe, arcjob_outputs (and could potentially have arcjob_inputs) - - If the galaxy_job.get_input_datasets() name attribute has "exe" in it: + - The tool (hello_arc.xml) has param with name tag arcjob_exe, arcjob_outputs (and could potentially have arcjob_inputs) + + If the galaxy_job.get_input_datasets() name attribute has "exe" in it: In the below I match the strings - exe in the tag_name to match the input file uploaded via the arcjob_exe form field - Else I treat it as "ordinary" input file. + Else I treat it as "ordinary" input file. For outputs - I get the galaxy_job.get_output_datasets(). - Currently in the ARC test-tool there is no specified specific output files - ARC client will collect all output files generated in the ARC jobs working directory. - - TODO: Use the command-builder to extract the executable command instead of using an executable file uploaded to Galaxy. - TODO: Extend to support fuller ARC job description options - such as ARC runtimeenvironment that inform the ARC client about what capabilities the endpoint has. + Currently in the ARC test-tool there is no specified specific output files - ARC client will collect all output files generated in the ARC jobs working directory. + + TODO: Use the command-builder to extract the executable command instead of using an executable file uploaded to Galaxy. + TODO: Extend to support fuller ARC job description options - such as ARC runtimeenvironment that inform the ARC client about what capabilities the endpoint has. e.g. what software is installed. """ arc_job = ARCJobBuilder() - """ - These are the files that are uploaded by the user for this job - file_source: is the file path in the galaxy data folder, + """ + These are the files that are uploaded by the user for this job + file_source: is the file path in the galaxy data folder, file_realname: the filename the uploaded file had - tool_input_tag: - the tools form input name + tool_input_tag: - the tools form input name """ input_datasets = galaxy_job.get_input_datasets() @@ -477,10 +476,10 @@ def prepare_job(self, job_wrapper): arc_cpuhrs = str(job_input_params["arcjob_cpuhrs"]) arc_mem = str(job_input_params["arcjob_memory"]) - """ - TODO- should probably not be Hard-coded - the user should him/herself enter what oout and err files - that the executable produces + """ + TODO- should probably not be Hard-coded + the user should him/herself enter what oout and err files + that the executable produces """ std_out = "arc.out" std_err = "arc.err" @@ -499,8 +498,8 @@ def prepare_job(self, job_wrapper): arc_job.memory = arc_mem """ Populate the arcjob object with rest of necessary and useful fields including the full job description string""" - """ All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list - - here it is just the folder / and all files in the folder will be downloaded. + """ All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list - + here it is just the folder / and all files in the folder will be downloaded. The arc.py in pyarcrest loops over this list to fetch all outputfiles """ arc_job.descrstr = arc_job.to_xml_str()