Skip to content

Commit

Permalink
Refactoring and simplification. Especially for the ARC job descriptio…
Browse files Browse the repository at this point in the history
…n part.
  • Loading branch information
Maiken Pedersen committed Sep 13, 2023
1 parent b6acefb commit 91a20f3
Showing 1 changed file with 75 additions and 88 deletions.
163 changes: 75 additions & 88 deletions lib/galaxy/jobs/runners/arc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@
AsynchronousJobState,
)
from galaxy.jobs.runners.util.arc_util import (
ActivityDescriptionBuilder,
ARCJobBuilder,
ARCHTTPError,
NoValueInARCResult,
ARCJob,
ensure_pyarc,
get_client,
ARCHTTPError,
NoValueInARCResult,
)

from galaxy.util import unicodify

log = logging.getLogger(__name__)
Expand All @@ -28,7 +27,7 @@ class Arc:
"""

def __init__(self):
self.cluster = ""
self.url = ""

self.ARC_STATE_MAPPING = {
"ACCEPTING": "Accepted",
Expand Down Expand Up @@ -73,10 +72,10 @@ def __init__(self, app, nworkers, **kwargs):
super().__init__(app, nworkers, **kwargs)
ensure_pyarc()

self.arc = Arc()
self.arcjob = ARCJob()
self.arc = None
self.arcjob = None
self.provider_backend = provider_name_to_backend("wlcg")
# self.arc_url = None


def queue_job(self, job_wrapper):
"""When a tool is submitted for execution in galaxy"""
Expand All @@ -90,23 +89,23 @@ def queue_job(self, job_wrapper):
job_destination = job_wrapper.job_destination
galaxy_jobid = job_wrapper.job_id

""" Set the cluster to submit the job to - extracted from the job_destination parameters in job_conf.xml """
""" Set the ARC endpoint url to submit the job to - extracted from the job_destination parameters in job_conf.xml """
user_preferences = job_wrapper.get_job().user.extra_preferences
arc_url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None")
self.arc.cluster = arc_url
self.arc = Arc()
self.arc.url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None")

""" Prepare and submit job to arc """
self.prepare_job(job_wrapper, self.arcjob)
#self.arcjob = ARCJob()
arc_job = self.prepare_job(job_wrapper)

token = job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)["access"]
# proxypath is ignored if you are using token
self.arcrest = get_client(self.arc.cluster, token=token)
self.arcrest = get_client(self.arc.url, token=token)

# token parameter isn't necessary, unless there is a bug
delegationID = self.arcrest.createDelegation()

bulkdesc = "<ActivityDescriptions>"
bulkdesc += self.arcjob.descstr
bulkdesc += arc_job.descstr
bulkdesc += "</ActivityDescriptions>"

results = self.arcrest.createJobs(bulkdesc, delegationID=delegationID)
Expand All @@ -120,19 +119,19 @@ def queue_job(self, job_wrapper):
# successful submission
arc_jobid, status = results[0]
job_wrapper.set_external_id(arc_jobid)
log.debug(f"Successfully submitted job to remote ARC resource {self.arc.cluster} with ARC id: {arc_jobid}job_wrapper.external_job_id: {job_wrapper.get_job().job_runner_external_id} job_wrapper.get_job().get-job_runner_external_id(): {job_wrapper.get_job().get_job_runner_external_id()}")
log.debug(f"Successfully submitted job to remote ARC resource {self.arc.url} with ARC id: {arc_jobid}job_wrapper.external_job_id: {job_wrapper.get_job().job_runner_external_id} job_wrapper.get_job().get-job_runner_external_id(): {job_wrapper.get_job().get_job_runner_external_id()}")
# beware! this means 1 worker, no timeout and default upload buffer
errors = self.arcrest.uploadJobFiles([arc_jobid], [self.arcjob.inputFiles])
errors = self.arcrest.uploadJobFiles([arc_jobid], [arc_job.inputs])
if errors[0]: # input upload error
log.error("Job creation failure. No Response from ARC")
log.debug(
f"Could not upload job files for job with galaxy-id: {galaxy_jobid} to ARC resource {self.arc.cluster}. Error was: {errors[0]}"
f"Could not upload job files for job with galaxy-id: {galaxy_jobid} to ARC resource {self.arc.url}. Error was: {errors[0]}"
)
job_wrapper.fail("Not submitted")
else:
# successful input upload
log.debug(
f"Successfully uploaded input-files {self.arcjob.inputFiles} to remote ARC resource {self.arc.cluster} for job with galaxy-id: {galaxy_jobid} and ARC id: {arc_jobid}"
f"Successfully uploaded input-files {arc_job.inputs.keys()} to remote ARC resource {self.arc.url} for job with galaxy-id: {galaxy_jobid} and ARC id: {arc_jobid}"
)
# Create an object of AsynchronousJobState and add it to the monitor queue.
ajs = AsynchronousJobState(
Expand Down Expand Up @@ -209,7 +208,7 @@ def check_watched_item(self, job_state):

""" Make sure to get a fresh token and client """
token = self._get_token(galaxy_job_wrapper)
self.arcrest = get_client(self.arc.cluster, token=token)
self.arcrest = get_client(self.arc.url, token=token)

""" Get task from ARC """
arc_jobid = job_state.job_id
Expand All @@ -224,7 +223,7 @@ def check_watched_item(self, job_state):
log.debug(f"Could not map state of ARC job with id: {arc_jobid} and Galaxy job id: {job_state.job_id}")
return None

self.arcrest = get_client(self.arc.cluster, token=self._get_token(galaxy_job_wrapper))
self.arcrest = get_client(self.arc.url, token=self._get_token(galaxy_job_wrapper))

if mapped_state == "Finished":
job_state.running = False
Expand Down Expand Up @@ -300,7 +299,7 @@ def stop_job(self, job_wrapper):

""" Make sure to get a fresh token and client """
token = self._get_token(job_wrapper)
self.arcrest = get_client(self.arc.cluster, token=token)
self.arcrest = get_client(self.arc.url, token=token)


""" Get the current ARC job status from the remote ARC endpoint """
Expand Down Expand Up @@ -354,7 +353,7 @@ def recover(self, job, job_wrapper):
def _get_token(self, job_wrapper):
return job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)["access"]

def prepare_job(self, job_wrapper, arcjob):
def prepare_job(self, job_wrapper):
"""
job_wrapper is wrapper around python model galaxy.model.Job
input_datasets
Expand Down Expand Up @@ -413,8 +412,6 @@ def prepare_job(self, job_wrapper, arcjob):
"""

""" The job_wrapper.job_destination has access to the parameters from the id=arc destination configured in the job_conf"""
#will be used later
#job_destination = job_wrapper.job_destination
galaxy_job = job_wrapper.get_job()

""" job_input_params are the input params fetched from the tool """
Expand All @@ -425,35 +422,55 @@ def prepare_job(self, job_wrapper, arcjob):

"""
Organize the galaxy jobs input-files into executables, input- and output-files
The ARC job description expects a different format for executables compared to other input files.
This works currently in the following way
This works currently in the following way for the ARC test-tool
- The tool (hello_arc.xml) has param with name tag arcjob_exe, arcjob_outputs (and could potentially have arcjob_inputs)
If the galaxy_job.get_input_datasets() name attribute has "exe" in it:
In the below I match the strings
- exe in the tag_name to match the input file uploaded via the arcjob_exe form field
- output in the tag_name to match the input file uploaded via the arcjob_outputs form field
- currently input is empty - TODO
TODO - discuss if the exe box is convenient to use - I think so - This can then be used as a generic tool to run any kind of script. But then... must consider what to do with dependencies... So probably this option would lead to lots of errors for users.
Else I treat it as "ordinary" input file.
TODO - This needs to be reconsidered so that any tool can work on an ARC endpoint. Not the special arc-tool created here.
Need a way to reliably identify executables (if local) inputs and outputs independent on how the tool form is like
For outputs - I get the galaxy_job.get_output_datasets().
Currently in the ARC test-tool there is no specified specific output files - ARC client will collect all output files generated in the ARC jobs working directory.
TODO: Use the command-builder to extract the executable command instead of using an executable file uploaded to Galaxy.
TODO: Extend to support fuller ARC job description options - such as ARC runtimeenvironment that inform the ARC client about what capabilities the endpoint has.
e.g. what software is installed.
"""

arc_job = ARCJobBuilder()

description_builder = ActivityDescriptionBuilder()

description_builder.job_files["inputs"] = []
description_builder.job_files["outputs"] = []
for inputdata in galaxy_job.input_datasets:
tag_name = inputdata.name
file_name = (inputdata.__dict__["dataset"]).__dict__["name"]
file_id = (inputdata.__dict__["dataset"]).__dict__["dataset_id"]
isExe = "exe" in tag_name
data_dict = {"tag": tag_name, "name": file_name, "dataset_id": file_id, "isexe": isExe}

if "input" in tag_name or "exe" in tag_name:
description_builder.job_files["inputs"].append(data_dict)
elif "output" in tag_name:
description_builder.job_files["outputs"].append(data_dict)
"""
These are the files that are uploaded by the user for this job
file_source: is the file path in the galaxy data folder,
file_realname: the filename the uploaded file had
tool_input_tag: - the tools form input name
"""
input_datasets = galaxy_job.get_input_datasets()

for input_data in input_datasets:

file_source = input_data.dataset.get_file_name()
tool_input_tag = input_data.name
file_realname = input_data.dataset.get_display_name()

arc_job.inputs[real_name] = "file://" + file_source

""" This is just for the ARC test-tool, will not be used in the final version using generic tools. """
if "exe" in tool_input_tag:
arc_job.exe_path = "./" + real_name


""" Potentially more than one file - but currently actually only one, so the for-loop here is currently not actually needed """
output_datasets = galaxy_job.get_output_datasets()
arc_job.outputs.append("/")
for output_data in output_datasets:
file_name = output_data.name
arc_job.outputs.append(file_name)


""" Fetch the other job description items from the ARC destination """
arc_cpuhrs = str(job_input_params["arcjob_cpuhrs"])
arc_mem = str(job_input_params["arcjob_memory"])
Expand All @@ -466,56 +483,26 @@ def prepare_job(self, job_wrapper, arcjob):
std_out = "arc.out"
std_err = "arc.err"


""" This is hardcoded stdout and stderr files from the ARC job defined here in the runner - TODO - not hardcoded """
arc_job.stdout = std_out
arc_job.stderr = std_err

""" Construct the job description xml object """
""" TODO - extend to support fuller ARC job description options """
description_builder.name = "galaxy_arc_hello_test"
description_builder.stdout = std_out
description_builder.stderr = std_err

""" These are the files that are uploaded by the user for this job - store the path in a dict for use later
key is dataset_id and value is the file path in the galaxy data folder """
inputfile_paths = job_wrapper.job_io.get_input_paths()
job_inputfiles_galaxy_paths = {}
for input_path in inputfile_paths:
job_inputfiles_galaxy_paths[input_path.dataset_id] = input_path.real_path

""" Populate datastaging exec tag with all exec files - in addition populate the arcjob object """
for job_file in description_builder.job_files["inputs"]:
dataset_name = job_file["name"]
dataset_id = job_file["dataset_id"]
dataset_path = job_inputfiles_galaxy_paths[dataset_id]
isexe = job_file["isexe"]

""" Populate the arcjob object with the source - pyarcrest expects this"""
arcjob.inputFiles[dataset_name] = "file://" + dataset_path
description_builder.inputs.append(dataset_name)

if isexe:
"""Fill the appropriate job description fields expected for executables"""
""" App tag """
description_builder.exe_path = "./" + dataset_name

""" Populate datastaging output tag with all output files - in addition to populate the arcjob object"""
""" Potentially more than one file - but currently actually only one, so the for-loop here is currently not actually needed """

description_builder.outputs.append("/")
arc_job.name = "galaxy_arc_hello_test"

""" This is hardcoded stdout and stderr files from the ARC job defined here in the runner - TODO - not hardcoded """
description_builder.outputs.append(std_out)
description_builder.outputs.append(std_err)

""" TODO - just a sample, this will probably be set by the destination itself - to be discussed """
description_builder.cpu_time = arc_cpuhrs
arc_job.cpu_time = arc_cpuhrs

""" TODO - just a sample, this will probably be set by the destination itself - to be discussed """
description_builder.memory = arc_mem
arc_job.memory = arc_mem

""" Populate the arcjob object with rest of necessary and useful fields including the full job description string"""
""" All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list -
here it is just the folder / and all files in the folder will be downloaded.
The arc.py in pyarcrest loops over this list to fetch all outputfiles """
arcjob.downloadFiles.append("/")
arcjob.StdOut = std_out
arcjob.StdErr = std_err
arcjob.RequestedTotalCPUTime = arc_cpuhrs
arcjob.descstr = description_builder.to_xml_str()
arc_job.descstr = arc_job.to_xml_str()


return arc_job

0 comments on commit 91a20f3

Please sign in to comment.