Skip to content

Commit

Permalink
Linting done
Browse files Browse the repository at this point in the history
  • Loading branch information
Maiken Pedersen committed Sep 27, 2023
1 parent 140ef15 commit 09ce2e2
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 77 deletions.
134 changes: 63 additions & 71 deletions lib/galaxy/jobs/runners/arc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
ARCHTTPError = None
NoValueInARCResult = None

from galaxy.util import unicodify
from galaxy.jobs.command_factory import build_command
from galaxy.job_execution.compute_environment import (
ComputeEnvironment,
dataset_path_to_extra_path,
)
from galaxy.util import unicodify

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,21 +63,27 @@ def __init__(self):
"Job not found": "Failed",
}


class ARCComputeEnvironment(ComputeEnvironment):
"""NB! This is just blunt copy-paste and simplification from pulsar runner.
Many things are not used, and will not work properly.
Currently just using input-path rewrite"""

def __init__(self, job_wrapper):
self.job_wrapper = job_wrapper

self.local_path_config = job_wrapper.default_compute_environment()

self.path_rewrites_input_extra = {}
self._working_directory = "."
self._config_directory = "."
self._home_directory = ""
self._tool_dir = ""
self._tmp_directory = ""
self._shared_home_dir = ""
self._sep = ""
self._version_path = ""



def output_names(self):
# Maybe this should use the path mapper, but the path mapper just uses basenames
return self.job_wrapper.job_io.get_output_basenames()
Expand All @@ -88,49 +93,48 @@ def input_path_rewrite(self, dataset):
ARC Jobs run in the ARC remote compute clusters workdir - not known to Galaxy at this point.
But all input-files are all uploaded (by ARC) to this workdir, so a simple relative path will work for all ARC jobs
"""
return f'{str(self._working_directory)}/{str(dataset.get_display_name())}'
return f"{str(self._working_directory)}/{str(dataset.get_display_name())}"

def output_path_rewrite(self, dataset):
"""
ARC Jobs run in the ARC remote compute clusters workdir - not known to Galaxy at this point.
But all outputfiles are created in this workdir, so a simple relative path will work for all ARC jobs
"""
#return f'{str(self._working_directory)}/{str(dataset.get_file_name())}'
return f'{str(dataset.get_file_name())}'
# return f'{str(self._working_directory)}/{str(dataset.get_file_name())}'
return f"{str(dataset.get_file_name())}"


def input_extra_files_rewrite(self, dataset):
""" TODO - find out what this is and if I need it """
"""TODO - find out what this is and if I need it"""
input_path_rewrite = self.input_path_rewrite(dataset)
remote_extra_files_path_rewrite = dataset_path_to_extra_path(input_path_rewrite)
self.path_rewrites_input_extra[dataset.extra_files_path] = remote_extra_files_path_rewrite
return remote_extra_files_path_rewrite

def output_extra_files_rewrite(self, dataset):
""" TODO - find out what this is and if I need it """
"""TODO - find out what this is and if I need it"""
output_path_rewrite = self.output_path_rewrite(dataset)
remote_extra_files_path_rewrite = dataset_path_to_extra_path(output_path_rewrite)
return remote_extra_files_path_rewrite

def input_metadata_rewrite(self, dataset, metadata_val):
""" TODO - find out what this is and if I need it """
"""TODO - find out what this is and if I need it"""
return None

def unstructured_path_rewrite(self, parameter_value):
""" TODO - find out what this is and if I need it """
"""TODO - find out what this is and if I need it"""
return self._working_directory

def working_directory(self):
return self._working_directory

def env_config_directory(self):
return self.config_directory()
return self._config_directory

def config_directory(self):
return self._config_directory

def new_file_path(self):
return self.working_directory() # Problems with doing this?
return self._working_directory

def sep(self):
return self._sep
Expand All @@ -152,8 +156,8 @@ def galaxy_url(self):

def get_file_sources_dict(self):
return self.job_wrapper.job_io.file_sources_dict


class ArcRESTJobRunner(AsynchronousJobRunner):
"""
Job runner backed by a finite pool of worker threads. FIFO scheduling
Expand All @@ -175,10 +179,9 @@ def __init__(self, app, nworkers, **kwargs):
self.arcjob = None
self.provider_backend = provider_name_to_backend("wlcg")


def queue_job(self, job_wrapper):
"""When a tool is submitted for execution in galaxy"""
""" This method
""" This method
1. Fetches the configured ARC endpoint for this user
2. Prepares an ARC job description based on the jobs destination parameters
3. Submits the job to the remote ARC endpoint via pyarcrest
Expand All @@ -188,14 +191,14 @@ def queue_job(self, job_wrapper):
job_destination = job_wrapper.job_destination
job_id = job_wrapper.job_id


""" Build the command line - needs a rewrite of input-paths as ARC is a remote cluster """
if not self.prepare_job(job_wrapper,
include_metadata=False,
include_work_dir_outputs=False,
modify_command_for_container=False,
stream_stdout_stderr=False
):
if not self.prepare_job(
job_wrapper,
include_metadata=False,
include_work_dir_outputs=False,
modify_command_for_container=False,
stream_stdout_stderr=False,
):
return
""" prepare_job() calls prepare() but not allowing to pass a compute_environment object
As I need to define my own compute_environment for the remote compute I must call it here passing the compute_environment
Expand All @@ -214,20 +217,19 @@ def queue_job(self, job_wrapper):
log.exception("(%s) Failure preparing job", job_id)
job_wrapper.fail(unicodify(e), exception=True)
return

if not job_wrapper.runner_command_line:
job_wrapper.finish("", "")
return



""" Set the ARC endpoint url to submit the job to - extracted from the job_destination parameters in job_conf.xml """
user_preferences = job_wrapper.get_job().user.extra_preferences
self.arc = Arc()
self.arc.url = user_preferences.get("distributed_arc_compute|remote_arc_resources", "None")

""" Prepare and submit job to arc """
arc_job = self.prepare_job_arc(job_wrapper)

token = job_wrapper.get_job().user.get_oidc_tokens(self.provider_backend)["access"]
self.arcrest = get_client(self.arc.url, token=token)

Expand Down Expand Up @@ -366,8 +368,10 @@ def check_watched_item(self, job_state):
galaxy_job_wrapper.change_state(model.Job.states.OK)

galaxy_outputdir = galaxy_workdir + "/working"
#self.arcrest.downloadJobFiles(galaxy_outputdir, [arc_job_id])
self.arcrest.downloadJobFiles(galaxy_outputdir, [arc_job_id],outputFilters={f"{arc_job_id}":'(?!user.proxy$)'})
# self.arcrest.downloadJobFiles(galaxy_outputdir, [arc_job_id])
self.arcrest.downloadJobFiles(
galaxy_outputdir, [arc_job_id], outputFilters={f"{arc_job_id}": "(?!user.proxy$)"}
)

self.place_output_files(job_state, mapped_state)
self.mark_as_finished(job_state)
Expand Down Expand Up @@ -471,21 +475,13 @@ def recover(self, job, job_wrapper):
job_wrapper.command_line = job.command_line
ajs.job_wrapper = job_wrapper
if job.state == model.Job.states.RUNNING:
log.debug(
"({}/{}) is still in running state, adding to the god queue".format(
job.id, ajs.job_id
)
)
log.debug("({}/{}) is still in running state, adding to the god queue".format(job.id, ajs.job_id))
ajs.old_state = "R"
ajs.running = True
self.monitor_queue.put(ajs)

elif job.state == model.Job.states.QUEUED:
log.debug(
"({}/{}) is still in god queued state, adding to the god queue".format(
job.id, ajs.job_id
)
)
log.debug("({}/{}) is still in god queued state, adding to the god queue".format(job.id, ajs.job_id))
ajs.old_state = "Q"
ajs.running = False
self.monitor_queue.put(ajs)
Expand Down Expand Up @@ -554,22 +550,21 @@ def prepare_job_arc(self, job_wrapper):
""" The job_wrapper.job_destination has access to the parameters from the id=arc destination configured in the job_conf """
galaxy_job = job_wrapper.get_job()

"""
Organize the galaxy jobs input-files into executables, input- and output-files
The ARC job description expects a different format for executables compared to other input files.
TODO: Use the command-builder to extract the executable command instead of using an executable file uploaded to Galaxy.
TODO: Extend to support fuller ARC job description options - such as ARC runtimeenvironment that inform the ARC client about what capabilities the endpoint has.
"""
Organize the galaxy jobs input-files into executables, input- and output-files
The ARC job description expects a different format for executables compared to other input files.
TODO: Use the command-builder to extract the executable command instead of using an executable file uploaded to Galaxy.
TODO: Extend to support fuller ARC job description options - such as ARC runtimeenvironment that inform the ARC client about what capabilities the endpoint has.
e.g. what software is installed.
"""

arc_job = ARCJobBuilder()

"""
These are the files that are uploaded by the user for this job
file_source: is the file path in the galaxy data folder,
"""
These are the files that are uploaded by the user for this job
file_source: is the file path in the galaxy data folder,
file_realname: the filename the uploaded file had
tool_input_tag: - the tools form input name
tool_input_tag: - the tools form input name
"""
input_datasets = galaxy_job.get_input_datasets()

Expand All @@ -578,25 +573,22 @@ def prepare_job_arc(self, job_wrapper):
tool_input_tag = input_data.name
file_realname = input_data.dataset.get_display_name()

if 'arcjob_remote_filelist' in tool_input_tag:
log.debug(f'==MAIKEN== arcjob_remote_filelist')
""" Demo gymnastics just to show how ARC can handle fetching remote files - this needs to be discussed how to achieve in Galaxy - relies on the hello_arc.xml tool """
with open(file_source,'r') as f:
if "arcjob_remote_filelist" in tool_input_tag:
"""Demo gymnastics just to show how ARC can handle fetching remote files - this needs to be discussed how to achieve in Galaxy - relies on the hello_arc.xml tool"""
with open(file_source) as f:
files = f.readlines()
for idx,file_url in enumerate(files):
file_n_xrls = f'remote_file_{idx}'
for idx, file_url in enumerate(files):
file_n_xrls = f"remote_file_{idx}"
arc_job.inputs[file_n_xrls] = file_url.strip()
else:
""" Example of file local to the Galaxy server """
"""Example of file local to the Galaxy server"""
arc_job.inputs[file_realname] = "file://" + file_source



""" Need also to upload the Executable produced by Galaxy - the tool_script.sh """
file_realname = "tool_script.sh"
file_source = "file://" + job_wrapper.working_directory + "/" + file_realname
arc_job.inputs[file_realname] = file_source

""" Use the tool_script.sh created by Galaxy as the executable to run """
arc_job.exe_path = "./" + file_realname

Expand All @@ -612,10 +604,10 @@ def prepare_job_arc(self, job_wrapper):
arc_cpuhrs = str(job_wrapper.job_destination.params["arc_cpuhrs"])
arc_mem = str(job_wrapper.job_destination.params["arc_mem"])

"""
TODO- should probably not be Hard-coded
the user should him/herself enter what oout and err files
that the executable produces
"""
TODO- should probably not be Hard-coded
the user should him/herself enter what oout and err files
that the executable produces
"""
std_out = "arc.out"
std_err = "arc.err"
Expand All @@ -634,10 +626,10 @@ def prepare_job_arc(self, job_wrapper):
arc_job.memory = arc_mem

""" Populate the arcjob object with rest of necessary and useful fields including the full job description string"""
""" All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list -
here it is just the folder / and all files in the folder will be downloaded.
""" All files that should be collected by ARC when the job is finished need to be appended to the downloadFiles list -
here it is just the folder / and all files in the folder will be downloaded.
The arc.py in pyarcrest loops over this list to fetch all outputfiles """
arc_job.descrstr = arc_job.to_xml_str()
log.debug(f'{arc_job.descrstr}')
log.debug(f"{arc_job.descrstr}")

return arc_job
12 changes: 6 additions & 6 deletions lib/galaxy/jobs/runners/util/arc_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

log = logging.getLogger(__name__)


def ensure_pyarc() -> None:
if ARCHTTPError is None:
raise Exception(
Expand Down Expand Up @@ -73,16 +74,15 @@ def to_xml_str(self) -> str:
for file_name, file_remote_source in self.inputs.items():
sub_el = SubElement(datastaging, "InputFile")
SubElement(sub_el, "Name").text = file_name
if 'file://' not in file_remote_source:
""" Only supply url for remote files, not local ones - local ones are handled by the ARC client on the client side (Galaxy)"""
source_el = SubElement(sub_el,"Source")
SubElement(source_el,"URI").text = file_remote_source
if "file://" not in file_remote_source:
"""Only supply url for remote files, not local ones - local ones are handled by the ARC client on the client side (Galaxy)"""
source_el = SubElement(sub_el, "Source")
SubElement(source_el, "URI").text = file_remote_source

for file_name in self.outputs:
sub_el = SubElement(datastaging, "OutputFile")
SubElement(sub_el, "Name").text = file_name
"""----------- End datastaging ---------"""


SubElement(resources, "IndividualCPUTime").text = self.cpu_time
SubElement(resources, "IndividualPhysicalMemory").text = self.memory
Expand Down

0 comments on commit 09ce2e2

Please sign in to comment.