From 73a44a6bc1598b039dd6b2e265af69e918f853f5 Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Mon, 19 Dec 2022 22:36:56 +0000 Subject: [PATCH 01/20] Enables resubmissions for the k8s runner --- lib/galaxy/jobs/runners/kubernetes.py | 31 +++++++++++++++++++-------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 1f9e984605c3..87a28d815b8c 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -95,7 +95,8 @@ def __init__(self, app, nworkers, **kwargs): map=str, valid=lambda s: s == "$gid" or isinstance(s, int) or not s or s.isdigit(), default=None ), k8s_cleanup_job=dict(map=str, valid=lambda s: s in {"onsuccess", "always", "never"}, default="always"), - k8s_pod_retries=dict(map=int, valid=lambda x: int(x) >= 0, default=3), + k8s_pod_retries=dict(map=int, valid=lambda x: int(x) >= 0, default=1), # note that if the backOffLimit is lower, this paramer will have not effect. + k8s_job_spec_back_off_limit=dict(map=int, valid=lambda x: int(x) >= 0, default=0), # this means that it will stop retrying after 1 failure. k8s_walltime_limit=dict(map=int, valid=lambda x: int(x) >= 0, default=172800), k8s_unschedulable_walltime_limit=dict(map=int, valid=lambda x: not x or int(x) >= 0, default=None), k8s_interactivetools_use_ssl=dict(map=bool, default=False), @@ -323,6 +324,7 @@ def __get_k8s_job_spec(self, ajs): job_ttl = self.runner_params["k8s_job_ttl_secs_after_finished"] if self.runner_params["k8s_cleanup_job"] != "never" and job_ttl is not None: k8s_job_spec["ttlSecondsAfterFinished"] = job_ttl + k8s_job_spec["backoffLimit"] = self.runner_params["k8s_job_spec_back_off_limit"] return k8s_job_spec def __force_label_conformity(self, value): @@ -526,7 +528,7 @@ def __get_k8s_containers(self, ajs): # command line execution, separated by ;, which is what Galaxy does # to assemble the command. "command": [ajs.job_wrapper.shell], - "args": ["-c", ajs.job_file], + "args": ["-c", f"{ajs.job_file}; exit $(cat {ajs.exit_code_file})"], "workingDir": ajs.job_wrapper.working_directory, "volumeMounts": deduplicate_entries(mounts), } @@ -715,6 +717,10 @@ def check_watched_item(self, job_state): else: max_pod_retries = 1 + # make sure that we don't have any conditions by which the runner + # would wait forever for a pod that never gets sent. + max_pod_retries = min(max_pod_retries, self.runner_params["k8s_job_spec_back_off_limit"]) + # Check if job.obj['status'] is empty, # return job_state unchanged if this is the case # as probably this means that the k8s API server hasn't @@ -736,7 +742,7 @@ def check_watched_item(self, job_state): job_state.running = False self.mark_as_finished(job_state) return None - elif active > 0 and failed <= max_pod_retries: + elif active > 0 and failed < max_pod_retries + 1: if not job_state.running: if self.__job_pending_due_to_unschedulable_pod(job_state): if self.runner_params.get("k8s_unschedulable_walltime_limit"): @@ -760,7 +766,10 @@ def check_watched_item(self, job_state): job_state.job_wrapper.cleanup() return None else: - return self._handle_job_failure(job, job_state) + self._handle_job_failure(job, job_state) + # changes for resubmission (removed self.mark_as_failed from handle_job_failure) + self.work_queue.put((self.mark_as_failed, job_state)) + return None elif len(jobs.response["items"]) == 0: if job_state.job_wrapper.get_job().state == model.Job.states.DELETED: @@ -798,6 +807,8 @@ def _handle_unschedulable_job(self, job, job_state): def _handle_job_failure(self, job, job_state): # Figure out why job has failed with open(job_state.error_file, "a") as error_file: + # TODO we need to remove probably these error_file.writes, as they remove the stderr / stdout capture + # from failed Galaxy k8s jobs. if self.__job_failed_due_to_low_memory(job_state): error_file.write("Job killed after running out of memory. Try with more memory.\n") job_state.fail_message = "Tool failed due to insufficient memory. Try with more memory." @@ -809,8 +820,9 @@ def _handle_job_failure(self, job, job_state): else: error_file.write("Exceeded max number of Kubernetes pod retries allowed for job\n") job_state.fail_message = "More pods failed than allowed. See stdout for pods details." - job_state.running = False - self.mark_as_failed(job_state) + # changes for resubmission, to mimick what happens in the LSF-cli runner + # job_state.running = False + # self.mark_as_failed(job_state) try: if self.__has_guest_ports(job_state.job_wrapper): self.__cleanup_k8s_guest_ports(job_state.job_wrapper, job) @@ -855,11 +867,12 @@ def __job_failed_due_to_low_memory(self, job_state): if not pods.response["items"]: return False - pod = self._get_pod_for_job(job_state) + # pod = self._get_pod_for_job(job_state) # this was always None + pod = pods.response["items"][0] if ( pod - and pod.obj["status"]["phase"] == "Failed" - and pod.obj["status"]["containerStatuses"][0]["state"]["terminated"]["reason"] == "OOMKilled" + and "terminated" in pod["status"]["containerStatuses"][0]["state"] + and pod["status"]["containerStatuses"][0]["state"]["terminated"]["reason"] == "OOMKilled" ): return True From 4ed002540cafc6ef7411cf4ca9838f2a4680d24f Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Mon, 27 Feb 2023 11:38:05 +0000 Subject: [PATCH 02/20] Fix detection of stderr / stdout and placement on UI --- lib/galaxy/jobs/runners/kubernetes.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 87a28d815b8c..4cb85284bfc7 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -741,10 +741,12 @@ def check_watched_item(self, job_state): if succeeded > 0 or job_state == model.Job.states.STOPPED: job_state.running = False self.mark_as_finished(job_state) + log.debug("k8s job succeeded") return None elif active > 0 and failed < max_pod_retries + 1: if not job_state.running: if self.__job_pending_due_to_unschedulable_pod(job_state): + log.debug("k8s job pending..") if self.runner_params.get("k8s_unschedulable_walltime_limit"): creation_time_str = job.obj["metadata"].get("creationTimestamp") creation_time = datetime.strptime(creation_time_str, "%Y-%m-%dT%H:%M:%SZ") @@ -756,19 +758,28 @@ def check_watched_item(self, job_state): else: pass else: + log.debug("k8s job is running..") job_state.running = True job_state.job_wrapper.change_state(model.Job.states.RUNNING) return job_state elif job_persisted_state == model.Job.states.DELETED: # Job has been deleted via stop_job and job has not been deleted, # remove from watched_jobs by returning `None` + log.debug("PP Job is DELETED..") if job_state.job_wrapper.cleanup_job in ("always", "onsuccess"): job_state.job_wrapper.cleanup() return None else: + log.debug("k8s job is failed and not deleted, looking at failure") self._handle_job_failure(job, job_state) # changes for resubmission (removed self.mark_as_failed from handle_job_failure) self.work_queue.put((self.mark_as_failed, job_state)) + # If the job was not resubmitted after being put in the failed queue, + # we mark it as finished as well for stderr / stdout detection. + # Otherwise, the user doesn't see any stdout/stderr in the UI. + if job_state.job_wrapper.get_state() != model.Job.states.RESUBMITTED: + self.mark_as_finished(job_state) + return None elif len(jobs.response["items"]) == 0: @@ -807,22 +818,21 @@ def _handle_unschedulable_job(self, job, job_state): def _handle_job_failure(self, job, job_state): # Figure out why job has failed with open(job_state.error_file, "a") as error_file: - # TODO we need to remove probably these error_file.writes, as they remove the stderr / stdout capture - # from failed Galaxy k8s jobs. + log.debug("Trying with error file in _handle_job_failure") if self.__job_failed_due_to_low_memory(job_state): + log.debug("OOM condition reached") error_file.write("Job killed after running out of memory. Try with more memory.\n") job_state.fail_message = "Tool failed due to insufficient memory. Try with more memory." job_state.runner_state = JobState.runner_states.MEMORY_LIMIT_REACHED elif self.__job_failed_due_to_walltime_limit(job): + log.debug("Walltime condition reached") error_file.write("DeadlineExceeded") job_state.fail_message = "Job was active longer than specified deadline" job_state.runner_state = JobState.runner_states.WALLTIME_REACHED else: - error_file.write("Exceeded max number of Kubernetes pod retries allowed for job\n") - job_state.fail_message = "More pods failed than allowed. See stdout for pods details." - # changes for resubmission, to mimick what happens in the LSF-cli runner - # job_state.running = False - # self.mark_as_failed(job_state) + log.debug("Runner cannot detect a specific reason for failure, must be a tool failure.") + error_file.write("Exceeded max number of job retries allowed for job\n") + job_state.fail_message = "More job retries failed than allowed. See standard output and standard error within the info section for details." try: if self.__has_guest_ports(job_state.job_wrapper): self.__cleanup_k8s_guest_ports(job_state.job_wrapper, job) @@ -962,6 +972,7 @@ def recover(self, job, job_wrapper): ajs.old_state = model.Job.states.QUEUED ajs.running = False self.monitor_queue.put(ajs) + def finish_job(self, job_state): self._handle_metadata_externally(job_state.job_wrapper, resolve_requirements=True) From e4aeae0892ad88858ed228861ad99f295e74557e Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Wed, 8 Mar 2023 18:06:44 +0000 Subject: [PATCH 03/20] Everything working --- lib/galaxy/jobs/runners/kubernetes.py | 196 ++++++++++++++++++++------ 1 file changed, 152 insertions(+), 44 deletions(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 4cb85284bfc7..da212d2a68e6 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -3,14 +3,19 @@ """ import logging +import json # for debugging of API objects import math import os import re +import time from datetime import datetime import yaml from galaxy import model +from galaxy.util import ( + unicodify, +) from galaxy.jobs.runners import ( AsynchronousJobRunner, AsynchronousJobState, @@ -214,7 +219,11 @@ def queue_job(self, job_wrapper): self.monitor_queue.put(ajs) def __has_guest_ports(self, job_wrapper): - return bool(job_wrapper.guest_ports) + # Check if job has guest ports or interactive tool entry points that would signal that + # this is an interactive tool. + log.debug(f"Checking if job {job_wrapper.get_id_tag()} has guest ports: {job_wrapper.guest_ports}") + log.debug(f"Checking if job {job_wrapper.get_id_tag()} has interactive entry points: {job_wrapper.guest_ports}") + return bool(job_wrapper.guest_ports) or bool(job_wrapper.get_job().interactivetool_entry_points) def __configure_port_routing(self, ajs): # Configure interactive tool entry points first @@ -231,9 +240,19 @@ def __configure_port_routing(self, ajs): k8s_service_obj = service_object_dict(self.runner_params, k8s_job_name, self.__get_k8s_service_spec(ajs)) k8s_ingress_obj = ingress_object_dict(self.runner_params, k8s_job_name, self.__get_k8s_ingress_spec(ajs)) + # pretty print the objects for debugging + log.debug(f"Kubernetes service object: {json.dumps(k8s_service_obj, indent=4)}") + log.debug(f"Kubernetes ingress object: {json.dumps(k8s_ingress_obj, indent=4)}") + # We avoid creating service and ingress if they already exist (e.g. when Galaxy is restarted or resubmitting a job) service = Service(self._pykube_api, k8s_service_obj) + # if service.exists(): + # log.debug(f"Service {k8s_job_name} already exists, skipping creation") + # else: service.create() ingress = Ingress(self._pykube_api, k8s_ingress_obj) + # if ingress.exists(): + # log.debug(f"Ingress {k8s_job_name} already exists, skipping creation") + # else: ingress.create() def __get_overridable_params(self, job_wrapper, param_key): @@ -456,26 +475,27 @@ def __get_k8s_ingress_spec(self, ajs): "annotations": {"app.galaxyproject.org/tool_id": ajs.job_wrapper.tool.id}, }, "spec": { - "rules": [ - { - "host": ep["domain"], - "http": { - "paths": [ - { - "backend": { - "serviceName": self.__get_k8s_job_name( - self.__produce_k8s_job_prefix(), ajs.job_wrapper - ), - "servicePort": int(ep["tool_port"]), - }, - "path": ep.get("entry_path", "/"), - "pathType": "Prefix", - } - ] - }, - } - for ep in entry_points - ] + "ingressClassName": "nginx", + "rules":[ { + "host": ep["domain"], + "http": { + "paths": [ { + "backend": { + "service": { + "name": self.__get_k8s_job_name( + self.__produce_k8s_job_prefix(), ajs.job_wrapper + ), + "port": { "number": int(ep["tool_port"])} + } + }, + "path": ep.get("entry_path", "/"), + "pathType": "Prefix" + } + ] + }, + } + for ep in entry_points + ] }, } if self.runner_params.get("k8s_interactivetools_use_ssl"): @@ -741,12 +761,12 @@ def check_watched_item(self, job_state): if succeeded > 0 or job_state == model.Job.states.STOPPED: job_state.running = False self.mark_as_finished(job_state) - log.debug("k8s job succeeded") + log.debug("Job succeeded") return None elif active > 0 and failed < max_pod_retries + 1: if not job_state.running: if self.__job_pending_due_to_unschedulable_pod(job_state): - log.debug("k8s job pending..") + log.debug("PP Job pending..") if self.runner_params.get("k8s_unschedulable_walltime_limit"): creation_time_str = job.obj["metadata"].get("creationTimestamp") creation_time = datetime.strptime(creation_time_str, "%Y-%m-%dT%H:%M:%SZ") @@ -758,39 +778,39 @@ def check_watched_item(self, job_state): else: pass else: - log.debug("k8s job is running..") + log.debug("Job set to running..") job_state.running = True job_state.job_wrapper.change_state(model.Job.states.RUNNING) return job_state elif job_persisted_state == model.Job.states.DELETED: # Job has been deleted via stop_job and job has not been deleted, # remove from watched_jobs by returning `None` - log.debug("PP Job is DELETED..") + log.debug("Job is DELETED..") if job_state.job_wrapper.cleanup_job in ("always", "onsuccess"): job_state.job_wrapper.cleanup() return None else: - log.debug("k8s job is failed and not deleted, looking at failure") + log.debug(f"Job is failed and not deleted, looking at failure") + log.debug(f"Job state before handle job failure: {job_state.job_wrapper.get_state()}") self._handle_job_failure(job, job_state) # changes for resubmission (removed self.mark_as_failed from handle_job_failure) self.work_queue.put((self.mark_as_failed, job_state)) - # If the job was not resubmitted after being put in the failed queue, - # we mark it as finished as well for stderr / stdout detection. - # Otherwise, the user doesn't see any stdout/stderr in the UI. - if job_state.job_wrapper.get_state() != model.Job.states.RESUBMITTED: - self.mark_as_finished(job_state) return None elif len(jobs.response["items"]) == 0: if job_state.job_wrapper.get_job().state == model.Job.states.DELETED: - # Job has been deleted via stop_job and job has been deleted, - # cleanup and remove from watched_jobs by returning `None` if job_state.job_wrapper.cleanup_job in ("always", "onsuccess"): job_state.job_wrapper.cleanup() return None + if job_state.job_wrapper.get_job().state == model.Job.states.STOPPED and self.__has_guest_ports(job_state.job_wrapper): + # Interactive job has been stopped via stop_job (most likely by the user), + # cleanup and remove from watched_jobs by returning `None`. STOPPED jobs are cleaned up elsewhere. + # Marking as finished makes sure that the interactive job output is available in the UI. + self.mark_as_finished(job_state) + return None # there is no job responding to this job_id, it is either lost or something happened. - log.error("No Jobs are available under expected selector app=%s", job_state.job_id) + log.error(f"No Jobs are available under expected selector app={job_state.job_id} and they are not deleted or stopped either.") self.mark_as_failed(job_state) # job is no longer viable - remove from watched jobs return None @@ -818,21 +838,24 @@ def _handle_unschedulable_job(self, job, job_state): def _handle_job_failure(self, job, job_state): # Figure out why job has failed with open(job_state.error_file, "a") as error_file: - log.debug("Trying with error file in _handle_job_failure") + log.debug("PP Trying with error file in _handle_job_failure") if self.__job_failed_due_to_low_memory(job_state): - log.debug("OOM condition reached") + log.debug("PP OOM reached!") error_file.write("Job killed after running out of memory. Try with more memory.\n") job_state.fail_message = "Tool failed due to insufficient memory. Try with more memory." job_state.runner_state = JobState.runner_states.MEMORY_LIMIT_REACHED elif self.__job_failed_due_to_walltime_limit(job): - log.debug("Walltime condition reached") + log.debug("PP checking for walltime") error_file.write("DeadlineExceeded") job_state.fail_message = "Job was active longer than specified deadline" job_state.runner_state = JobState.runner_states.WALLTIME_REACHED else: - log.debug("Runner cannot detect a specific reason for failure, must be a tool failure.") + log.debug("PP no idea!") error_file.write("Exceeded max number of job retries allowed for job\n") - job_state.fail_message = "More job retries failed than allowed. See standard output and standard error within the info section for details." + job_state.fail_message = "More job retries failed than allowed. See standard output within the info section for details." + # changes for resubmission + # job_state.running = False + # self.mark_as_failed(job_state) try: if self.__has_guest_ports(job_state.job_wrapper): self.__cleanup_k8s_guest_ports(job_state.job_wrapper, job) @@ -845,11 +868,11 @@ def __cleanup_k8s_job(self, job): k8s_cleanup_job = self.runner_params["k8s_cleanup_job"] delete_job(job, k8s_cleanup_job) - def __cleanup_k8s_ingress(self, ingress, job_failed): + def __cleanup_k8s_ingress(self, ingress, job_failed=False): k8s_cleanup_job = self.runner_params["k8s_cleanup_job"] delete_ingress(ingress, k8s_cleanup_job, job_failed) - def __cleanup_k8s_service(self, service, job_failed): + def __cleanup_k8s_service(self, service, job_failed=False): k8s_cleanup_job = self.runner_params["k8s_cleanup_job"] delete_service(service, k8s_cleanup_job, job_failed) @@ -903,32 +926,48 @@ def __cleanup_k8s_guest_ports(self, job_wrapper, k8s_job): k8s_job_prefix = self.__produce_k8s_job_prefix() k8s_job_name = f"{k8s_job_prefix}-{self.__force_label_conformity(job_wrapper.get_id_tag())}" log.debug(f"Deleting service/ingress for job with ID {job_wrapper.get_id_tag()}") - job_failed = k8s_job.obj["status"]["failed"] > 0 if "failed" in k8s_job.obj["status"] else False ingress_to_delete = find_ingress_object_by_name( self._pykube_api, k8s_job_name, self.runner_params["k8s_namespace"] ) if ingress_to_delete and len(ingress_to_delete.response["items"]) > 0: k8s_ingress = Ingress(self._pykube_api, ingress_to_delete.response["items"][0]) - self.__cleanup_k8s_ingress(k8s_ingress, job_failed) + self.__cleanup_k8s_ingress(k8s_ingress) + else: + log.debug(f"No ingress found for job with k8s_job_name {k8s_job_name}") service_to_delete = find_service_object_by_name( self._pykube_api, k8s_job_name, self.runner_params["k8s_namespace"] ) if service_to_delete and len(service_to_delete.response["items"]) > 0: k8s_service = Service(self._pykube_api, service_to_delete.response["items"][0]) - self.__cleanup_k8s_service(k8s_service, job_failed) + self.__cleanup_k8s_service(k8s_service) + else: + log.debug(f"No service found for job with k8s_job_name {k8s_job_name}") + # remove the interactive environment entrypoints + eps = job_wrapper.get_job().interactivetool_entry_points + if eps: + log.debug(f"Removing entry points for job with ID {job_wrapper.get_id_tag()}") + self.app.interactivetool_manager.remove_entry_points(eps) def stop_job(self, job_wrapper): """Attempts to delete a dispatched job to the k8s cluster""" job = job_wrapper.get_job() try: + log.debug(f"Attempting to stop job {job.id} ({job.job_runner_external_id})") job_to_delete = find_job_object_by_name( self._pykube_api, job.get_job_runner_external_id(), self.runner_params["k8s_namespace"] ) if job_to_delete and len(job_to_delete.response["items"]) > 0: k8s_job = Job(self._pykube_api, job_to_delete.response["items"][0]) + log.debug(f"Found job with id {job.get_job_runner_external_id()} to delete") + # For interactive jobs, at this point because the job stopping has been partly handled by the + # interactive tool manager, the job wrapper no longer shows any guest ports. We need another way + # to check if the job is an interactive job. if self.__has_guest_ports(job_wrapper): + log.debug(f"Job {job.id} ({job.job_runner_external_id}) has guest ports, cleaning them up") self.__cleanup_k8s_guest_ports(job_wrapper, k8s_job) self.__cleanup_k8s_job(k8s_job) + else: + log.debug(f"Could not find job with id {job.get_job_runner_external_id()} to delete") # TODO assert whether job parallelism == 0 # assert not job_to_delete.exists(), "Could not delete job,"+job.job_runner_external_id+" it still exists" log.debug(f"({job.id}/{job.job_runner_external_id}) Terminated at user's request") @@ -972,6 +1011,75 @@ def recover(self, job, job_wrapper): ajs.old_state = model.Job.states.QUEUED ajs.running = False self.monitor_queue.put(ajs) + + # added to make sure that stdout and stderr is captured for Kubernetes + def fail_job(self, job_state: "JobState", exception=False, message="Job failed", full_status=None): + log.debug("PP Getting into fail_job in k8s runner") + job = job_state.job_wrapper.get_job() + + #### Get STDOUT and STDERR from the job and tool to be stored in the database #### + #### This is needed because when calling finish_job on a failed job, the check_output method + #### overrides the job error state and tries to figure it out from the job output files + #### breaking OOM resubmissions. + # To ensure that files below are readable, ownership must be reclaimed first + job_state.job_wrapper.reclaim_ownership() + + # wait for the files to appear + which_try = 0 + while which_try < self.app.config.retry_job_output_collection + 1: + try: + with open(job_state.output_file, "rb") as stdout_file, open(job_state.error_file, "rb") as stderr_file: + job_stdout = self._job_io_for_db(stdout_file) + job_stderr = self._job_io_for_db(stderr_file) + break + except Exception as e: + if which_try == self.app.config.retry_job_output_collection: + job_stdout = "" + job_stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER + log.error(f"{job.id}/{job.job_runner_external_id} {job_stderr}: {unicodify(e)}") + else: + time.sleep(1) + which_try += 1 + + # get stderr and stdout to database + outputs_directory = os.path.join(job_state.job_wrapper.working_directory, "outputs") + if not os.path.exists(outputs_directory): + outputs_directory = job_state.job_wrapper.working_directory + + tool_stdout_path = os.path.join(outputs_directory, "tool_stdout") + tool_stderr_path = os.path.join(outputs_directory, "tool_stderr") + + # TODO: These might not exist for running jobs at the upgrade to 19.XX, remove that + # assumption in 20.XX. + tool_stderr = None + if os.path.exists(tool_stdout_path): + with open(tool_stdout_path, "rb") as stdout_file: + tool_stdout = self._job_io_for_db(stdout_file) + else: + # Legacy job, were getting a merged output - assume it is mostly tool output. + tool_stdout = job_stdout + job_stdout = None + + tool_stdout = None + if os.path.exists(tool_stderr_path): + with open(tool_stderr_path, "rb") as stdout_file: + tool_stderr = self._job_io_for_db(stdout_file) + else: + # Legacy job, were getting a merged output - assume it is mostly tool output. + tool_stderr = job_stderr + job_stderr = None + + #### END Get STDOUT and STDERR from the job and tool to be stored in the database #### + + # full status empty leaves the UI without stderr/stdout + full_status = { "stderr" : tool_stderr, "stdout" : tool_stdout} + log.debug(f"({job.id}/{job.job_runner_external_id}) tool_stdout: {tool_stdout}") + log.debug(f"({job.id}/{job.job_runner_external_id}) tool_stderr: {tool_stderr}") + log.debug(f"({job.id}/{job.job_runner_external_id}) job_stdout: {job_stdout}") + log.debug(f"({job.id}/{job.job_runner_external_id}) job_stderr: {job_stderr}") + + # run super method + super().fail_job(job_state, exception, message, full_status) def finish_job(self, job_state): From 8100e6966f3709c886c2d51f72af758d83f71027 Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Tue, 3 Jan 2023 12:56:44 +0000 Subject: [PATCH 04/20] Move to pykube-ng (cherry picked from commit 652c04f315e45626570da6420606f74541f0117b) (cherry picked from commit 7d4b0b4b4e15654323aa30835ff307b12d4b40d1) --- lib/galaxy/dependencies/conditional-requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/galaxy/dependencies/conditional-requirements.txt b/lib/galaxy/dependencies/conditional-requirements.txt index 17b16118f5a8..392cbb23f7c2 100644 --- a/lib/galaxy/dependencies/conditional-requirements.txt +++ b/lib/galaxy/dependencies/conditional-requirements.txt @@ -35,7 +35,8 @@ custos-sdk chronos-python==1.2.1 # Kubernetes job runner -pykube==0.15.0 +# pykube==0.15.0 +pykube-ng==22.9.0 # Synnefo / Pithos+ object store client kamaki From 8ba802083dd12647bbbd92ca8b2cc392672fccaf Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Tue, 3 Jan 2023 22:41:12 +0000 Subject: [PATCH 05/20] Change ingress API version to current one (cherry picked from commit f1b92d911827c897cb8e65a060e99b44f9d4ebf5) --- lib/galaxy/jobs/runners/util/pykube_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/galaxy/jobs/runners/util/pykube_util.py b/lib/galaxy/jobs/runners/util/pykube_util.py index 7c3f32d87b09..28020a8c8dde 100644 --- a/lib/galaxy/jobs/runners/util/pykube_util.py +++ b/lib/galaxy/jobs/runners/util/pykube_util.py @@ -31,7 +31,7 @@ DEFAULT_JOB_API_VERSION = "batch/v1" DEFAULT_SERVICE_API_VERSION = "v1" -DEFAULT_INGRESS_API_VERSION = "extensions/v1beta1" +DEFAULT_INGRESS_API_VERSION = "networking.k8s.io/v1" DEFAULT_NAMESPACE = "default" INSTANCE_ID_INVALID_MESSAGE = ( "Galaxy instance [%s] is either too long " From c75b8123f7223c3ed9c6bf4ec521f14980ae9743 Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Thu, 9 Mar 2023 08:58:03 +0000 Subject: [PATCH 06/20] Missing stdout (cherry picked from commit a201abbb08bd855ecf85fe8250384e972077cb9b) --- lib/galaxy/jobs/runners/kubernetes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index da212d2a68e6..7c05d094635b 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -1051,7 +1051,8 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed", # TODO: These might not exist for running jobs at the upgrade to 19.XX, remove that # assumption in 20.XX. - tool_stderr = None + tool_stderr = "Galaxy issue: Stderr failed to be retrieved from the job working directory." + tool_stdout = "Galaxy issue: Stdout failed to be retrieved from the job working directory." if os.path.exists(tool_stdout_path): with open(tool_stdout_path, "rb") as stdout_file: tool_stdout = self._job_io_for_db(stdout_file) @@ -1060,7 +1061,6 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed", tool_stdout = job_stdout job_stdout = None - tool_stdout = None if os.path.exists(tool_stderr_path): with open(tool_stderr_path, "rb") as stdout_file: tool_stderr = self._job_io_for_db(stdout_file) From 63360bd5690bbf28d371e96be533da2a22a63e3a Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Wed, 15 Mar 2023 09:12:10 +0000 Subject: [PATCH 07/20] Apply suggestions from code review Mostly cleanups from Nuwan and Pablo. Co-authored-by: Nuwan Goonasekera <2070605+nuwang@users.noreply.github.com> --- .../dependencies/conditional-requirements.txt | 1 - lib/galaxy/jobs/runners/kubernetes.py | 32 +++++++------------ 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/lib/galaxy/dependencies/conditional-requirements.txt b/lib/galaxy/dependencies/conditional-requirements.txt index 392cbb23f7c2..40861584fc09 100644 --- a/lib/galaxy/dependencies/conditional-requirements.txt +++ b/lib/galaxy/dependencies/conditional-requirements.txt @@ -35,7 +35,6 @@ custos-sdk chronos-python==1.2.1 # Kubernetes job runner -# pykube==0.15.0 pykube-ng==22.9.0 # Synnefo / Pithos+ object store client diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 7c05d094635b..6d209a1b5b4d 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -100,7 +100,7 @@ def __init__(self, app, nworkers, **kwargs): map=str, valid=lambda s: s == "$gid" or isinstance(s, int) or not s or s.isdigit(), default=None ), k8s_cleanup_job=dict(map=str, valid=lambda s: s in {"onsuccess", "always", "never"}, default="always"), - k8s_pod_retries=dict(map=int, valid=lambda x: int(x) >= 0, default=1), # note that if the backOffLimit is lower, this paramer will have not effect. + k8s_pod_retries=dict(map=int, valid=lambda x: int(x) >= 0, default=1), # note that if the backOffLimit is lower, this paramer will have no effect. k8s_job_spec_back_off_limit=dict(map=int, valid=lambda x: int(x) >= 0, default=0), # this means that it will stop retrying after 1 failure. k8s_walltime_limit=dict(map=int, valid=lambda x: int(x) >= 0, default=172800), k8s_unschedulable_walltime_limit=dict(map=int, valid=lambda x: not x or int(x) >= 0, default=None), @@ -220,9 +220,7 @@ def queue_job(self, job_wrapper): def __has_guest_ports(self, job_wrapper): # Check if job has guest ports or interactive tool entry points that would signal that - # this is an interactive tool. - log.debug(f"Checking if job {job_wrapper.get_id_tag()} has guest ports: {job_wrapper.guest_ports}") - log.debug(f"Checking if job {job_wrapper.get_id_tag()} has interactive entry points: {job_wrapper.guest_ports}") + log.debug(f"Checking if job {job_wrapper.get_id_tag()} is an interactive tool. guest ports: {job_wrapper.guest_ports}. interactive entry points: {job_wrapper.interactivetool_entry_points}") return bool(job_wrapper.guest_ports) or bool(job_wrapper.get_job().interactivetool_entry_points) def __configure_port_routing(self, ajs): @@ -245,14 +243,8 @@ def __configure_port_routing(self, ajs): log.debug(f"Kubernetes ingress object: {json.dumps(k8s_ingress_obj, indent=4)}") # We avoid creating service and ingress if they already exist (e.g. when Galaxy is restarted or resubmitting a job) service = Service(self._pykube_api, k8s_service_obj) - # if service.exists(): - # log.debug(f"Service {k8s_job_name} already exists, skipping creation") - # else: service.create() ingress = Ingress(self._pykube_api, k8s_ingress_obj) - # if ingress.exists(): - # log.debug(f"Ingress {k8s_job_name} already exists, skipping creation") - # else: ingress.create() def __get_overridable_params(self, job_wrapper, param_key): @@ -766,7 +758,7 @@ def check_watched_item(self, job_state): elif active > 0 and failed < max_pod_retries + 1: if not job_state.running: if self.__job_pending_due_to_unschedulable_pod(job_state): - log.debug("PP Job pending..") + log.debug(f"Job id: {job_state.job_id} with k8s id: {job.name} pending...") if self.runner_params.get("k8s_unschedulable_walltime_limit"): creation_time_str = job.obj["metadata"].get("creationTimestamp") creation_time = datetime.strptime(creation_time_str, "%Y-%m-%dT%H:%M:%SZ") @@ -778,20 +770,20 @@ def check_watched_item(self, job_state): else: pass else: - log.debug("Job set to running..") + log.debug("Job set to running...") job_state.running = True job_state.job_wrapper.change_state(model.Job.states.RUNNING) return job_state elif job_persisted_state == model.Job.states.DELETED: # Job has been deleted via stop_job and job has not been deleted, # remove from watched_jobs by returning `None` - log.debug("Job is DELETED..") + log.debug(f"Job id: {job_state.job_id} has been already deleted...") if job_state.job_wrapper.cleanup_job in ("always", "onsuccess"): job_state.job_wrapper.cleanup() return None else: log.debug(f"Job is failed and not deleted, looking at failure") - log.debug(f"Job state before handle job failure: {job_state.job_wrapper.get_state()}") + log.debug(f"Job id: {job_state.job_id} failed but has not been deleted yet. Current state: {job_state.job_wrapper.get_state()}") self._handle_job_failure(job, job_state) # changes for resubmission (removed self.mark_as_failed from handle_job_failure) self.work_queue.put((self.mark_as_failed, job_state)) @@ -838,19 +830,19 @@ def _handle_unschedulable_job(self, job, job_state): def _handle_job_failure(self, job, job_state): # Figure out why job has failed with open(job_state.error_file, "a") as error_file: - log.debug("PP Trying with error file in _handle_job_failure") + log.debug("Trying with error file in _handle_job_failure") if self.__job_failed_due_to_low_memory(job_state): - log.debug("PP OOM reached!") + log.debug(f"OOM detected for job: {job_state.job_id}") error_file.write("Job killed after running out of memory. Try with more memory.\n") job_state.fail_message = "Tool failed due to insufficient memory. Try with more memory." job_state.runner_state = JobState.runner_states.MEMORY_LIMIT_REACHED elif self.__job_failed_due_to_walltime_limit(job): - log.debug("PP checking for walltime") + log.debug(f"Walltime limit reached for job: {job_state.job_id}") error_file.write("DeadlineExceeded") job_state.fail_message = "Job was active longer than specified deadline" job_state.runner_state = JobState.runner_states.WALLTIME_REACHED else: - log.debug("PP no idea!") + log.debug(f"Unknown error detected in job: {job_state.job_id}") error_file.write("Exceeded max number of job retries allowed for job\n") job_state.fail_message = "More job retries failed than allowed. See standard output within the info section for details." # changes for resubmission @@ -1051,8 +1043,8 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed", # TODO: These might not exist for running jobs at the upgrade to 19.XX, remove that # assumption in 20.XX. - tool_stderr = "Galaxy issue: Stderr failed to be retrieved from the job working directory." - tool_stdout = "Galaxy issue: Stdout failed to be retrieved from the job working directory." + tool_stderr = "Galaxy issue: stderr could not be retrieved from the job working directory." + tool_stdout = "Galaxy issue: stdout could not be retrieved from the job working directory." if os.path.exists(tool_stdout_path): with open(tool_stdout_path, "rb") as stdout_file: tool_stdout = self._job_io_for_db(stdout_file) From 5c3ccb01191e6dd6378593ec36c79c07ff3decaa Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Wed, 15 Mar 2023 09:18:07 +0000 Subject: [PATCH 08/20] Please linter --- lib/galaxy/jobs/runners/kubernetes.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 6d209a1b5b4d..af9d5551ec66 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -782,8 +782,7 @@ def check_watched_item(self, job_state): job_state.job_wrapper.cleanup() return None else: - log.debug(f"Job is failed and not deleted, looking at failure") - log.debug(f"Job id: {job_state.job_id} failed but has not been deleted yet. Current state: {job_state.job_wrapper.get_state()}") + log.debug(f"Job id: {job_state.job_id} failed and it is not a deletion case. Current state: {job_state.job_wrapper.get_state()}") self._handle_job_failure(job, job_state) # changes for resubmission (removed self.mark_as_failed from handle_job_failure) self.work_queue.put((self.mark_as_failed, job_state)) From bc2ac6b15004f8baa1d680027040f2415160d97d Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Wed, 15 Mar 2023 10:03:33 +0000 Subject: [PATCH 09/20] More linter pleasing --- lib/galaxy/jobs/runners/kubernetes.py | 59 +++++++++++++-------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index af9d5551ec66..2f30496432b5 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -3,7 +3,7 @@ """ import logging -import json # for debugging of API objects +import json # for debugging of API objects import math import os import re @@ -100,8 +100,8 @@ def __init__(self, app, nworkers, **kwargs): map=str, valid=lambda s: s == "$gid" or isinstance(s, int) or not s or s.isdigit(), default=None ), k8s_cleanup_job=dict(map=str, valid=lambda s: s in {"onsuccess", "always", "never"}, default="always"), - k8s_pod_retries=dict(map=int, valid=lambda x: int(x) >= 0, default=1), # note that if the backOffLimit is lower, this paramer will have no effect. - k8s_job_spec_back_off_limit=dict(map=int, valid=lambda x: int(x) >= 0, default=0), # this means that it will stop retrying after 1 failure. + k8s_pod_retries=dict(map=int, valid=lambda x: int(x) >= 0, default=1), # note that if the backOffLimit is lower, this paramer will have no effect. + k8s_job_spec_back_off_limit=dict(map=int, valid=lambda x: int(x) >= 0, default=0), # this means that it will stop retrying after 1 failure. k8s_walltime_limit=dict(map=int, valid=lambda x: int(x) >= 0, default=172800), k8s_unschedulable_walltime_limit=dict(map=int, valid=lambda x: not x or int(x) >= 0, default=None), k8s_interactivetools_use_ssl=dict(map=bool, default=False), @@ -468,26 +468,26 @@ def __get_k8s_ingress_spec(self, ajs): }, "spec": { "ingressClassName": "nginx", - "rules":[ { - "host": ep["domain"], - "http": { - "paths": [ { - "backend": { - "service": { - "name": self.__get_k8s_job_name( - self.__produce_k8s_job_prefix(), ajs.job_wrapper - ), - "port": { "number": int(ep["tool_port"])} - } - }, - "path": ep.get("entry_path", "/"), - "pathType": "Prefix" - } - ] - }, + "rules": [ { + "host": ep["domain"], + "http": { + "paths": [ { + "backend": { + "service": { + "name": self.__get_k8s_job_name( + self.__produce_k8s_job_prefix(), ajs.job_wrapper + ), + "port": { "number": int(ep["tool_port"])} + } + }, + "path": ep.get("entry_path", "/"), + "pathType": "Prefix" + } + ] + }, } for ep in entry_points - ] + ] }, } if self.runner_params.get("k8s_interactivetools_use_ssl"): @@ -1007,11 +1007,11 @@ def recover(self, job, job_wrapper): def fail_job(self, job_state: "JobState", exception=False, message="Job failed", full_status=None): log.debug("PP Getting into fail_job in k8s runner") job = job_state.job_wrapper.get_job() - - #### Get STDOUT and STDERR from the job and tool to be stored in the database #### - #### This is needed because when calling finish_job on a failed job, the check_output method - #### overrides the job error state and tries to figure it out from the job output files - #### breaking OOM resubmissions. + + # Get STDOUT and STDERR from the job and tool to be stored in the database # + # This is needed because when calling finish_job on a failed job, the check_output method + # overrides the job error state and tries to figure it out from the job output files + # breaking OOM resubmissions. # To ensure that files below are readable, ownership must be reclaimed first job_state.job_wrapper.reclaim_ownership() @@ -1039,7 +1039,7 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed", tool_stdout_path = os.path.join(outputs_directory, "tool_stdout") tool_stderr_path = os.path.join(outputs_directory, "tool_stderr") - + # TODO: These might not exist for running jobs at the upgrade to 19.XX, remove that # assumption in 20.XX. tool_stderr = "Galaxy issue: stderr could not be retrieved from the job working directory." @@ -1060,10 +1060,10 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed", tool_stderr = job_stderr job_stderr = None - #### END Get STDOUT and STDERR from the job and tool to be stored in the database #### + # END Get STDOUT and STDERR from the job and tool to be stored in the database # # full status empty leaves the UI without stderr/stdout - full_status = { "stderr" : tool_stderr, "stdout" : tool_stdout} + full_status = {"stderr" : tool_stderr, "stdout" : tool_stdout} log.debug(f"({job.id}/{job.job_runner_external_id}) tool_stdout: {tool_stdout}") log.debug(f"({job.id}/{job.job_runner_external_id}) tool_stderr: {tool_stderr}") log.debug(f"({job.id}/{job.job_runner_external_id}) job_stdout: {job_stdout}") @@ -1071,7 +1071,6 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed", # run super method super().fail_job(job_state, exception, message, full_status) - def finish_job(self, job_state): self._handle_metadata_externally(job_state.job_wrapper, resolve_requirements=True) From 80e254a4f2b7df22855a764a349804f55cd89ba7 Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Wed, 15 Mar 2023 10:25:18 +0000 Subject: [PATCH 10/20] Black + isort --- lib/galaxy/jobs/runners/kubernetes.py | 76 ++++++++++++++++----------- 1 file changed, 45 insertions(+), 31 deletions(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 2f30496432b5..a1281e7cea54 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -2,8 +2,8 @@ Offload jobs to a Kubernetes cluster. """ -import logging import json # for debugging of API objects +import logging import math import os import re @@ -13,9 +13,6 @@ import yaml from galaxy import model -from galaxy.util import ( - unicodify, -) from galaxy.jobs.runners import ( AsynchronousJobRunner, AsynchronousJobState, @@ -48,6 +45,7 @@ Service, service_object_dict, ) +from galaxy.util import unicodify from galaxy.util.bytesize import ByteSize log = logging.getLogger(__name__) @@ -100,8 +98,12 @@ def __init__(self, app, nworkers, **kwargs): map=str, valid=lambda s: s == "$gid" or isinstance(s, int) or not s or s.isdigit(), default=None ), k8s_cleanup_job=dict(map=str, valid=lambda s: s in {"onsuccess", "always", "never"}, default="always"), - k8s_pod_retries=dict(map=int, valid=lambda x: int(x) >= 0, default=1), # note that if the backOffLimit is lower, this paramer will have no effect. - k8s_job_spec_back_off_limit=dict(map=int, valid=lambda x: int(x) >= 0, default=0), # this means that it will stop retrying after 1 failure. + k8s_pod_retries=dict( + map=int, valid=lambda x: int(x) >= 0, default=1 + ), # note that if the backOffLimit is lower, this paramer will have no effect. + k8s_job_spec_back_off_limit=dict( + map=int, valid=lambda x: int(x) >= 0, default=0 + ), # this means that it will stop retrying after 1 failure. k8s_walltime_limit=dict(map=int, valid=lambda x: int(x) >= 0, default=172800), k8s_unschedulable_walltime_limit=dict(map=int, valid=lambda x: not x or int(x) >= 0, default=None), k8s_interactivetools_use_ssl=dict(map=bool, default=False), @@ -220,7 +222,9 @@ def queue_job(self, job_wrapper): def __has_guest_ports(self, job_wrapper): # Check if job has guest ports or interactive tool entry points that would signal that - log.debug(f"Checking if job {job_wrapper.get_id_tag()} is an interactive tool. guest ports: {job_wrapper.guest_ports}. interactive entry points: {job_wrapper.interactivetool_entry_points}") + log.debug( + f"Checking if job {job_wrapper.get_id_tag()} is an interactive tool. guest ports: {job_wrapper.guest_ports}. interactive entry points: {job_wrapper.interactivetool_entry_points}" + ) return bool(job_wrapper.guest_ports) or bool(job_wrapper.get_job().interactivetool_entry_points) def __configure_port_routing(self, ajs): @@ -468,26 +472,28 @@ def __get_k8s_ingress_spec(self, ajs): }, "spec": { "ingressClassName": "nginx", - "rules": [ { - "host": ep["domain"], - "http": { - "paths": [ { - "backend": { - "service": { - "name": self.__get_k8s_job_name( - self.__produce_k8s_job_prefix(), ajs.job_wrapper - ), - "port": { "number": int(ep["tool_port"])} - } - }, - "path": ep.get("entry_path", "/"), - "pathType": "Prefix" - } - ] + "rules": [ + { + "host": ep["domain"], + "http": { + "paths": [ + { + "backend": { + "service": { + "name": self.__get_k8s_job_name( + self.__produce_k8s_job_prefix(), ajs.job_wrapper + ), + "port": {"number": int(ep["tool_port"])}, + } }, - } - for ep in entry_points - ] + "path": ep.get("entry_path", "/"), + "pathType": "Prefix", + } + ] + }, + } + for ep in entry_points + ], }, } if self.runner_params.get("k8s_interactivetools_use_ssl"): @@ -782,7 +788,9 @@ def check_watched_item(self, job_state): job_state.job_wrapper.cleanup() return None else: - log.debug(f"Job id: {job_state.job_id} failed and it is not a deletion case. Current state: {job_state.job_wrapper.get_state()}") + log.debug( + f"Job id: {job_state.job_id} failed and it is not a deletion case. Current state: {job_state.job_wrapper.get_state()}" + ) self._handle_job_failure(job, job_state) # changes for resubmission (removed self.mark_as_failed from handle_job_failure) self.work_queue.put((self.mark_as_failed, job_state)) @@ -794,14 +802,18 @@ def check_watched_item(self, job_state): if job_state.job_wrapper.cleanup_job in ("always", "onsuccess"): job_state.job_wrapper.cleanup() return None - if job_state.job_wrapper.get_job().state == model.Job.states.STOPPED and self.__has_guest_ports(job_state.job_wrapper): + if job_state.job_wrapper.get_job().state == model.Job.states.STOPPED and self.__has_guest_ports( + job_state.job_wrapper + ): # Interactive job has been stopped via stop_job (most likely by the user), # cleanup and remove from watched_jobs by returning `None`. STOPPED jobs are cleaned up elsewhere. # Marking as finished makes sure that the interactive job output is available in the UI. self.mark_as_finished(job_state) return None # there is no job responding to this job_id, it is either lost or something happened. - log.error(f"No Jobs are available under expected selector app={job_state.job_id} and they are not deleted or stopped either.") + log.error( + f"No Jobs are available under expected selector app={job_state.job_id} and they are not deleted or stopped either." + ) self.mark_as_failed(job_state) # job is no longer viable - remove from watched jobs return None @@ -843,7 +855,9 @@ def _handle_job_failure(self, job, job_state): else: log.debug(f"Unknown error detected in job: {job_state.job_id}") error_file.write("Exceeded max number of job retries allowed for job\n") - job_state.fail_message = "More job retries failed than allowed. See standard output within the info section for details." + job_state.fail_message = ( + "More job retries failed than allowed. See standard output within the info section for details." + ) # changes for resubmission # job_state.running = False # self.mark_as_failed(job_state) @@ -1063,7 +1077,7 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed", # END Get STDOUT and STDERR from the job and tool to be stored in the database # # full status empty leaves the UI without stderr/stdout - full_status = {"stderr" : tool_stderr, "stdout" : tool_stdout} + full_status = {"stderr": tool_stderr, "stdout": tool_stdout} log.debug(f"({job.id}/{job.job_runner_external_id}) tool_stdout: {tool_stdout}") log.debug(f"({job.id}/{job.job_runner_external_id}) tool_stderr: {tool_stderr}") log.debug(f"({job.id}/{job.job_runner_external_id}) job_stdout: {job_stdout}") From 5dd8751c0579199caa6ce45176123b439edd01f5 Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Tue, 28 Mar 2023 14:15:25 +0100 Subject: [PATCH 11/20] Fix its issue on logging (cherry picked from commit d5e73b8130ea29d3da9c073e1d74d295e2c4c03a) --- lib/galaxy/jobs/runners/kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index a1281e7cea54..dab97fabacf3 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -223,7 +223,7 @@ def queue_job(self, job_wrapper): def __has_guest_ports(self, job_wrapper): # Check if job has guest ports or interactive tool entry points that would signal that log.debug( - f"Checking if job {job_wrapper.get_id_tag()} is an interactive tool. guest ports: {job_wrapper.guest_ports}. interactive entry points: {job_wrapper.interactivetool_entry_points}" + f"Checking if job {job_wrapper.get_id_tag()} is an interactive tool. guest ports: {job_wrapper.guest_ports}. interactive entry points: {job_wrapper.get_job().interactivetool_entry_points}" ) return bool(job_wrapper.guest_ports) or bool(job_wrapper.get_job().interactivetool_entry_points) From 5ca868703b394f7efbc2e9968cd6a29545d72e2f Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Tue, 28 Mar 2023 14:35:01 +0100 Subject: [PATCH 12/20] trace for larger json dump logs --- lib/galaxy/jobs/runners/kubernetes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index dab97fabacf3..7c4a04972142 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -243,8 +243,8 @@ def __configure_port_routing(self, ajs): k8s_ingress_obj = ingress_object_dict(self.runner_params, k8s_job_name, self.__get_k8s_ingress_spec(ajs)) # pretty print the objects for debugging - log.debug(f"Kubernetes service object: {json.dumps(k8s_service_obj, indent=4)}") - log.debug(f"Kubernetes ingress object: {json.dumps(k8s_ingress_obj, indent=4)}") + log.trace(f"Kubernetes service object: {json.dumps(k8s_service_obj, indent=4)}") + log.trace(f"Kubernetes ingress object: {json.dumps(k8s_ingress_obj, indent=4)}") # We avoid creating service and ingress if they already exist (e.g. when Galaxy is restarted or resubmitting a job) service = Service(self._pykube_api, k8s_service_obj) service.create() From a9cc2ff450887dee825fddaec10fda3ce4119f00 Mon Sep 17 00:00:00 2001 From: Pablo Moreno Date: Wed, 29 Mar 2023 21:37:13 +0100 Subject: [PATCH 13/20] Make ingress class configurable for ITs --- lib/galaxy/jobs/runners/kubernetes.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 7c4a04972142..3ee12585d071 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -108,6 +108,7 @@ def __init__(self, app, nworkers, **kwargs): k8s_unschedulable_walltime_limit=dict(map=int, valid=lambda x: not x or int(x) >= 0, default=None), k8s_interactivetools_use_ssl=dict(map=bool, default=False), k8s_interactivetools_ingress_annotations=dict(map=str), + k8s_interactivetools_ingress_class=dict(map=str, default=None), ) if "runner_param_specs" not in kwargs: @@ -471,7 +472,6 @@ def __get_k8s_ingress_spec(self, ajs): "annotations": {"app.galaxyproject.org/tool_id": ajs.job_wrapper.tool.id}, }, "spec": { - "ingressClassName": "nginx", "rules": [ { "host": ep["domain"], @@ -496,6 +496,9 @@ def __get_k8s_ingress_spec(self, ajs): ], }, } + default_ingress_class = self.runner_params.get("k8s_interactivetools_ingress_class") + if default_ingress_class: + k8s_spec_template["spec"]["ingressClassName"] = default_ingress_class if self.runner_params.get("k8s_interactivetools_use_ssl"): domains = list({e["domain"] for e in entry_points}) k8s_spec_template["spec"]["tls"] = [ From 6ca1a021f45ee9be07dc63c18124f3ae245a132c Mon Sep 17 00:00:00 2001 From: nuwang <2070605+nuwang@users.noreply.github.com> Date: Tue, 6 Jun 2023 00:51:06 +0530 Subject: [PATCH 14/20] Remove extra comments and minor tweaks to debug logs --- lib/galaxy/jobs/runners/kubernetes.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 3ee12585d071..f02608811cae 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -243,7 +243,6 @@ def __configure_port_routing(self, ajs): k8s_service_obj = service_object_dict(self.runner_params, k8s_job_name, self.__get_k8s_service_spec(ajs)) k8s_ingress_obj = ingress_object_dict(self.runner_params, k8s_job_name, self.__get_k8s_ingress_spec(ajs)) - # pretty print the objects for debugging log.trace(f"Kubernetes service object: {json.dumps(k8s_service_obj, indent=4)}") log.trace(f"Kubernetes ingress object: {json.dumps(k8s_ingress_obj, indent=4)}") # We avoid creating service and ingress if they already exist (e.g. when Galaxy is restarted or resubmitting a job) @@ -549,6 +548,7 @@ def __get_k8s_containers(self, ajs): # command line execution, separated by ;, which is what Galaxy does # to assemble the command. "command": [ajs.job_wrapper.shell], + # Make sure that the exit code is propagated to k8s, so k8s knows why the tool failed (e.g. OOM) "args": ["-c", f"{ajs.job_file}; exit $(cat {ajs.exit_code_file})"], "workingDir": ajs.job_wrapper.working_directory, "volumeMounts": deduplicate_entries(mounts), @@ -762,7 +762,7 @@ def check_watched_item(self, job_state): if succeeded > 0 or job_state == model.Job.states.STOPPED: job_state.running = False self.mark_as_finished(job_state) - log.debug("Job succeeded") + log.debug(f"Job id: {job_state.job_id} with k8s id: {job.name} succeeded") return None elif active > 0 and failed < max_pod_retries + 1: if not job_state.running: @@ -1077,8 +1077,6 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed", tool_stderr = job_stderr job_stderr = None - # END Get STDOUT and STDERR from the job and tool to be stored in the database # - # full status empty leaves the UI without stderr/stdout full_status = {"stderr": tool_stderr, "stdout": tool_stdout} log.debug(f"({job.id}/{job.job_runner_external_id}) tool_stdout: {tool_stdout}") From 8276216fa8752718ab26517d8ae0ed02016e8cd3 Mon Sep 17 00:00:00 2001 From: nuwang <2070605+nuwang@users.noreply.github.com> Date: Tue, 6 Jun 2023 12:18:55 +0530 Subject: [PATCH 15/20] Change trace to debug --- lib/galaxy/jobs/runners/kubernetes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index f02608811cae..6ef015f0d812 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -243,8 +243,8 @@ def __configure_port_routing(self, ajs): k8s_service_obj = service_object_dict(self.runner_params, k8s_job_name, self.__get_k8s_service_spec(ajs)) k8s_ingress_obj = ingress_object_dict(self.runner_params, k8s_job_name, self.__get_k8s_ingress_spec(ajs)) - log.trace(f"Kubernetes service object: {json.dumps(k8s_service_obj, indent=4)}") - log.trace(f"Kubernetes ingress object: {json.dumps(k8s_ingress_obj, indent=4)}") + log.debug(f"Kubernetes service object: {json.dumps(k8s_service_obj, indent=4)}") + log.debug(f"Kubernetes ingress object: {json.dumps(k8s_ingress_obj, indent=4)}") # We avoid creating service and ingress if they already exist (e.g. when Galaxy is restarted or resubmitting a job) service = Service(self._pykube_api, k8s_service_obj) service.create() From eeb4d25987a002fef154b66c2663751bfffefd0d Mon Sep 17 00:00:00 2001 From: nuwang <2070605+nuwang@users.noreply.github.com> Date: Wed, 21 Jun 2023 22:17:53 +0530 Subject: [PATCH 16/20] Don't mark job as failed if unknown exit code --- lib/galaxy/authnz/custos_authnz.py | 2 +- lib/galaxy/jobs/runners/kubernetes.py | 46 +++++++++++++++++++++++---- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/lib/galaxy/authnz/custos_authnz.py b/lib/galaxy/authnz/custos_authnz.py index 4b4d40ec6860..75d3187cc508 100644 --- a/lib/galaxy/authnz/custos_authnz.py +++ b/lib/galaxy/authnz/custos_authnz.py @@ -204,7 +204,7 @@ def callback(self, state_token, authz_code, trans, login_redirect_url): ): user = existing_user else: - message = f"There already exists a user with email {email}. To associate this external login, you must first be logged in as that existing account." + message = f"There already exists a user with email {email}. To associate this external login, you must first be logged in as that existing account." log.info(message) login_redirect_url = ( f"{login_redirect_url}login/start" diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 6ef015f0d812..6f033a1cd57f 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -794,9 +794,14 @@ def check_watched_item(self, job_state): log.debug( f"Job id: {job_state.job_id} failed and it is not a deletion case. Current state: {job_state.job_wrapper.get_state()}" ) - self._handle_job_failure(job, job_state) - # changes for resubmission (removed self.mark_as_failed from handle_job_failure) - self.work_queue.put((self.mark_as_failed, job_state)) + if self._handle_job_failure(job, job_state): + # changes for resubmission (removed self.mark_as_failed from handle_job_failure) + self.work_queue.put((self.mark_as_failed, job_state)) + else: + # Job failure was not due to a k8s issue or something that k8s can handle, so it's a tool error. + job_state.running = False + self.mark_as_finished(job_state) + return None return None @@ -843,6 +848,7 @@ def _handle_unschedulable_job(self, job, job_state): def _handle_job_failure(self, job, job_state): # Figure out why job has failed + mark_failed = True with open(job_state.error_file, "a") as error_file: log.debug("Trying with error file in _handle_job_failure") if self.__job_failed_due_to_low_memory(job_state): @@ -855,11 +861,18 @@ def _handle_job_failure(self, job, job_state): error_file.write("DeadlineExceeded") job_state.fail_message = "Job was active longer than specified deadline" job_state.runner_state = JobState.runner_states.WALLTIME_REACHED + elif self.__job_failed_due_to_unknown_exit_code(job_state): + msg = f"Job: {job_state.job_id} failed due to an unknown exit code from the tool." + log.debug(msg) + job_state.fail_message = msg + job_state.runner_state = JobState.runner_states.TOOL_DETECT_ERROR + mark_failed = False else: - log.debug(f"Unknown error detected in job: {job_state.job_id}") - error_file.write("Exceeded max number of job retries allowed for job\n") + msg = f"An unknown error occurred in this job and the maximum number of retries have been exceeded for job: {job_state.job_id}." + log.debug(msg) + error_file.write(msg) job_state.fail_message = ( - "More job retries failed than allowed. See standard output within the info section for details." + "An unknown error occurered with this job. See standard output within the info section for details." ) # changes for resubmission # job_state.running = False @@ -870,7 +883,7 @@ def _handle_job_failure(self, job, job_state): self.__cleanup_k8s_job(job) except Exception: log.exception("Could not clean up k8s batch job. Ignoring...") - return None + return mark_failed def __cleanup_k8s_job(self, job): k8s_cleanup_job = self.runner_params["k8s_cleanup_job"] @@ -930,6 +943,25 @@ def __job_pending_due_to_unschedulable_pod(self, job_state): pod = Pod(self._pykube_api, pods.response["items"][0]) return is_pod_unschedulable(self._pykube_api, pod, self.runner_params["k8s_namespace"]) + def __job_failed_due_to_unknown_exit_code(self, job_state): + """ + checks whether the pod exited prematurely due to an unknown exit code (i.e. not an exit code like OOM that + we can handle). This would mean that the tool failed, but the job should be considered to have succeeded. + """ + pods = find_pod_object_by_name(self._pykube_api, job_state.job_id, self.runner_params["k8s_namespace"]) + if not pods.response["items"]: + return False + + pod = pods.response["items"][0] + if ( + pod + and "terminated" in pod["status"]["containerStatuses"][0]["state"] + and pod["status"]["containerStatuses"][0]["state"].get("exitCode") + ): + return True + + return False + def __cleanup_k8s_guest_ports(self, job_wrapper, k8s_job): k8s_job_prefix = self.__produce_k8s_job_prefix() k8s_job_name = f"{k8s_job_prefix}-{self.__force_label_conformity(job_wrapper.get_id_tag())}" From 7724435df87af319351de967074e707290e1c1a0 Mon Sep 17 00:00:00 2001 From: nuwang <2070605+nuwang@users.noreply.github.com> Date: Wed, 21 Jun 2023 23:58:29 +0530 Subject: [PATCH 17/20] Get exitCode from correct dict entry --- lib/galaxy/jobs/runners/kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 6f033a1cd57f..7c6c6f153f84 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -956,7 +956,7 @@ def __job_failed_due_to_unknown_exit_code(self, job_state): if ( pod and "terminated" in pod["status"]["containerStatuses"][0]["state"] - and pod["status"]["containerStatuses"][0]["state"].get("exitCode") + and pod["status"]["containerStatuses"][0]["state"]["terminated"].get("exitCode") ): return True From c1b3b275344feaef63be73f0696b9d5dc4455c15 Mon Sep 17 00:00:00 2001 From: nuwang <2070605+nuwang@users.noreply.github.com> Date: Thu, 22 Jun 2023 02:49:37 +0530 Subject: [PATCH 18/20] Bump pykube version --- lib/galaxy/dependencies/conditional-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/galaxy/dependencies/conditional-requirements.txt b/lib/galaxy/dependencies/conditional-requirements.txt index 40861584fc09..1bea9abfe5a2 100644 --- a/lib/galaxy/dependencies/conditional-requirements.txt +++ b/lib/galaxy/dependencies/conditional-requirements.txt @@ -35,7 +35,7 @@ custos-sdk chronos-python==1.2.1 # Kubernetes job runner -pykube-ng==22.9.0 +pykube-ng==23.6.0 # Synnefo / Pithos+ object store client kamaki From 452a042d672d11a76ab666eca8b52bb064b3f8a7 Mon Sep 17 00:00:00 2001 From: nuwang <2070605+nuwang@users.noreply.github.com> Date: Thu, 22 Jun 2023 17:41:31 +0530 Subject: [PATCH 19/20] Fix conditional requirement for pykube-ng --- lib/galaxy/dependencies/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/galaxy/dependencies/__init__.py b/lib/galaxy/dependencies/__init__.py index e46c6fee60c9..091fb4da57dc 100644 --- a/lib/galaxy/dependencies/__init__.py +++ b/lib/galaxy/dependencies/__init__.py @@ -203,7 +203,7 @@ def check_total_perspective_vortex(self): def check_pbs_python(self): return "galaxy.jobs.runners.pbs:PBSJobRunner" in self.job_runners - def check_pykube(self): + def check_pykube_ng(self): return "galaxy.jobs.runners.kubernetes:KubernetesJobRunner" in self.job_runners or which("kubectl") def check_chronos_python(self): From 250be5db661ee7f8a6f43ca8086dacbbf6f7e7e1 Mon Sep 17 00:00:00 2001 From: nuwang <2070605+nuwang@users.noreply.github.com> Date: Thu, 22 Jun 2023 17:53:36 +0530 Subject: [PATCH 20/20] Set a pykube version that's available --- lib/galaxy/dependencies/conditional-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/galaxy/dependencies/conditional-requirements.txt b/lib/galaxy/dependencies/conditional-requirements.txt index 1bea9abfe5a2..4d6eccc54007 100644 --- a/lib/galaxy/dependencies/conditional-requirements.txt +++ b/lib/galaxy/dependencies/conditional-requirements.txt @@ -35,7 +35,7 @@ custos-sdk chronos-python==1.2.1 # Kubernetes job runner -pykube-ng==23.6.0 +pykube-ng==21.3.0 # Synnefo / Pithos+ object store client kamaki