From 61f3db22e4da80c7d38169491ed7a6cd6cb7d8e6 Mon Sep 17 00:00:00 2001 From: Nuwan Goonasekera <2070605+nuwang@users.noreply.github.com> Date: Thu, 18 Jan 2024 17:17:32 +0530 Subject: [PATCH] Revert "[23.1] Enable job resubmissions in k8s runner" --- lib/galaxy/authnz/custos_authnz.py | 2 +- lib/galaxy/dependencies/__init__.py | 2 +- .../dependencies/conditional-requirements.txt | 2 +- lib/galaxy/jobs/runners/kubernetes.py | 221 +++--------------- lib/galaxy/jobs/runners/util/pykube_util.py | 2 +- 5 files changed, 30 insertions(+), 199 deletions(-) diff --git a/lib/galaxy/authnz/custos_authnz.py b/lib/galaxy/authnz/custos_authnz.py index 3c19ab60a682..c066fb0d1e59 100644 --- a/lib/galaxy/authnz/custos_authnz.py +++ b/lib/galaxy/authnz/custos_authnz.py @@ -194,7 +194,7 @@ def callback(self, state_token, authz_code, trans, login_redirect_url): if trans.app.config.fixed_delegated_auth: user = existing_user else: - message = f"There already exists a user with email {email}. To associate this external login, you must first be logged in as that existing account." + message = f"There already exists a user with email {email}. To associate this external login, you must first be logged in as that existing account." log.info(message) login_redirect_url = ( f"{login_redirect_url}login/start" diff --git a/lib/galaxy/dependencies/__init__.py b/lib/galaxy/dependencies/__init__.py index c7b9f7db0602..eb484b9dcbdf 100644 --- a/lib/galaxy/dependencies/__init__.py +++ b/lib/galaxy/dependencies/__init__.py @@ -205,7 +205,7 @@ def check_total_perspective_vortex(self): def check_pbs_python(self): return "galaxy.jobs.runners.pbs:PBSJobRunner" in self.job_runners - def check_pykube_ng(self): + def check_pykube(self): return "galaxy.jobs.runners.kubernetes:KubernetesJobRunner" in self.job_runners or which("kubectl") def check_chronos_python(self): diff --git a/lib/galaxy/dependencies/conditional-requirements.txt b/lib/galaxy/dependencies/conditional-requirements.txt index ef6307f9a946..0806672bfc16 100644 --- a/lib/galaxy/dependencies/conditional-requirements.txt +++ b/lib/galaxy/dependencies/conditional-requirements.txt @@ -35,7 +35,7 @@ custos-sdk chronos-python==1.2.1 # Kubernetes job runner -pykube-ng==21.3.0 +pykube==0.15.0 # Synnefo / Pithos+ object store client kamaki diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 7c6c6f153f84..1f9e984605c3 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -2,12 +2,10 @@ Offload jobs to a Kubernetes cluster. """ -import json # for debugging of API objects import logging import math import os import re -import time from datetime import datetime import yaml @@ -45,7 +43,6 @@ Service, service_object_dict, ) -from galaxy.util import unicodify from galaxy.util.bytesize import ByteSize log = logging.getLogger(__name__) @@ -98,17 +95,11 @@ def __init__(self, app, nworkers, **kwargs): map=str, valid=lambda s: s == "$gid" or isinstance(s, int) or not s or s.isdigit(), default=None ), k8s_cleanup_job=dict(map=str, valid=lambda s: s in {"onsuccess", "always", "never"}, default="always"), - k8s_pod_retries=dict( - map=int, valid=lambda x: int(x) >= 0, default=1 - ), # note that if the backOffLimit is lower, this paramer will have no effect. - k8s_job_spec_back_off_limit=dict( - map=int, valid=lambda x: int(x) >= 0, default=0 - ), # this means that it will stop retrying after 1 failure. + k8s_pod_retries=dict(map=int, valid=lambda x: int(x) >= 0, default=3), k8s_walltime_limit=dict(map=int, valid=lambda x: int(x) >= 0, default=172800), k8s_unschedulable_walltime_limit=dict(map=int, valid=lambda x: not x or int(x) >= 0, default=None), k8s_interactivetools_use_ssl=dict(map=bool, default=False), k8s_interactivetools_ingress_annotations=dict(map=str), - k8s_interactivetools_ingress_class=dict(map=str, default=None), ) if "runner_param_specs" not in kwargs: @@ -222,11 +213,7 @@ def queue_job(self, job_wrapper): self.monitor_queue.put(ajs) def __has_guest_ports(self, job_wrapper): - # Check if job has guest ports or interactive tool entry points that would signal that - log.debug( - f"Checking if job {job_wrapper.get_id_tag()} is an interactive tool. guest ports: {job_wrapper.guest_ports}. interactive entry points: {job_wrapper.get_job().interactivetool_entry_points}" - ) - return bool(job_wrapper.guest_ports) or bool(job_wrapper.get_job().interactivetool_entry_points) + return bool(job_wrapper.guest_ports) def __configure_port_routing(self, ajs): # Configure interactive tool entry points first @@ -243,9 +230,6 @@ def __configure_port_routing(self, ajs): k8s_service_obj = service_object_dict(self.runner_params, k8s_job_name, self.__get_k8s_service_spec(ajs)) k8s_ingress_obj = ingress_object_dict(self.runner_params, k8s_job_name, self.__get_k8s_ingress_spec(ajs)) - log.debug(f"Kubernetes service object: {json.dumps(k8s_service_obj, indent=4)}") - log.debug(f"Kubernetes ingress object: {json.dumps(k8s_ingress_obj, indent=4)}") - # We avoid creating service and ingress if they already exist (e.g. when Galaxy is restarted or resubmitting a job) service = Service(self._pykube_api, k8s_service_obj) service.create() ingress = Ingress(self._pykube_api, k8s_ingress_obj) @@ -339,7 +323,6 @@ def __get_k8s_job_spec(self, ajs): job_ttl = self.runner_params["k8s_job_ttl_secs_after_finished"] if self.runner_params["k8s_cleanup_job"] != "never" and job_ttl is not None: k8s_job_spec["ttlSecondsAfterFinished"] = job_ttl - k8s_job_spec["backoffLimit"] = self.runner_params["k8s_job_spec_back_off_limit"] return k8s_job_spec def __force_label_conformity(self, value): @@ -478,12 +461,10 @@ def __get_k8s_ingress_spec(self, ajs): "paths": [ { "backend": { - "service": { - "name": self.__get_k8s_job_name( - self.__produce_k8s_job_prefix(), ajs.job_wrapper - ), - "port": {"number": int(ep["tool_port"])}, - } + "serviceName": self.__get_k8s_job_name( + self.__produce_k8s_job_prefix(), ajs.job_wrapper + ), + "servicePort": int(ep["tool_port"]), }, "path": ep.get("entry_path", "/"), "pathType": "Prefix", @@ -492,12 +473,9 @@ def __get_k8s_ingress_spec(self, ajs): }, } for ep in entry_points - ], + ] }, } - default_ingress_class = self.runner_params.get("k8s_interactivetools_ingress_class") - if default_ingress_class: - k8s_spec_template["spec"]["ingressClassName"] = default_ingress_class if self.runner_params.get("k8s_interactivetools_use_ssl"): domains = list({e["domain"] for e in entry_points}) k8s_spec_template["spec"]["tls"] = [ @@ -548,8 +526,7 @@ def __get_k8s_containers(self, ajs): # command line execution, separated by ;, which is what Galaxy does # to assemble the command. "command": [ajs.job_wrapper.shell], - # Make sure that the exit code is propagated to k8s, so k8s knows why the tool failed (e.g. OOM) - "args": ["-c", f"{ajs.job_file}; exit $(cat {ajs.exit_code_file})"], + "args": ["-c", ajs.job_file], "workingDir": ajs.job_wrapper.working_directory, "volumeMounts": deduplicate_entries(mounts), } @@ -738,10 +715,6 @@ def check_watched_item(self, job_state): else: max_pod_retries = 1 - # make sure that we don't have any conditions by which the runner - # would wait forever for a pod that never gets sent. - max_pod_retries = min(max_pod_retries, self.runner_params["k8s_job_spec_back_off_limit"]) - # Check if job.obj['status'] is empty, # return job_state unchanged if this is the case # as probably this means that the k8s API server hasn't @@ -762,12 +735,10 @@ def check_watched_item(self, job_state): if succeeded > 0 or job_state == model.Job.states.STOPPED: job_state.running = False self.mark_as_finished(job_state) - log.debug(f"Job id: {job_state.job_id} with k8s id: {job.name} succeeded") return None - elif active > 0 and failed < max_pod_retries + 1: + elif active > 0 and failed <= max_pod_retries: if not job_state.running: if self.__job_pending_due_to_unschedulable_pod(job_state): - log.debug(f"Job id: {job_state.job_id} with k8s id: {job.name} pending...") if self.runner_params.get("k8s_unschedulable_walltime_limit"): creation_time_str = job.obj["metadata"].get("creationTimestamp") creation_time = datetime.strptime(creation_time_str, "%Y-%m-%dT%H:%M:%SZ") @@ -779,49 +750,27 @@ def check_watched_item(self, job_state): else: pass else: - log.debug("Job set to running...") job_state.running = True job_state.job_wrapper.change_state(model.Job.states.RUNNING) return job_state elif job_persisted_state == model.Job.states.DELETED: # Job has been deleted via stop_job and job has not been deleted, # remove from watched_jobs by returning `None` - log.debug(f"Job id: {job_state.job_id} has been already deleted...") if job_state.job_wrapper.cleanup_job in ("always", "onsuccess"): job_state.job_wrapper.cleanup() return None else: - log.debug( - f"Job id: {job_state.job_id} failed and it is not a deletion case. Current state: {job_state.job_wrapper.get_state()}" - ) - if self._handle_job_failure(job, job_state): - # changes for resubmission (removed self.mark_as_failed from handle_job_failure) - self.work_queue.put((self.mark_as_failed, job_state)) - else: - # Job failure was not due to a k8s issue or something that k8s can handle, so it's a tool error. - job_state.running = False - self.mark_as_finished(job_state) - return None - - return None + return self._handle_job_failure(job, job_state) elif len(jobs.response["items"]) == 0: if job_state.job_wrapper.get_job().state == model.Job.states.DELETED: + # Job has been deleted via stop_job and job has been deleted, + # cleanup and remove from watched_jobs by returning `None` if job_state.job_wrapper.cleanup_job in ("always", "onsuccess"): job_state.job_wrapper.cleanup() return None - if job_state.job_wrapper.get_job().state == model.Job.states.STOPPED and self.__has_guest_ports( - job_state.job_wrapper - ): - # Interactive job has been stopped via stop_job (most likely by the user), - # cleanup and remove from watched_jobs by returning `None`. STOPPED jobs are cleaned up elsewhere. - # Marking as finished makes sure that the interactive job output is available in the UI. - self.mark_as_finished(job_state) - return None # there is no job responding to this job_id, it is either lost or something happened. - log.error( - f"No Jobs are available under expected selector app={job_state.job_id} and they are not deleted or stopped either." - ) + log.error("No Jobs are available under expected selector app=%s", job_state.job_id) self.mark_as_failed(job_state) # job is no longer viable - remove from watched jobs return None @@ -848,52 +797,37 @@ def _handle_unschedulable_job(self, job, job_state): def _handle_job_failure(self, job, job_state): # Figure out why job has failed - mark_failed = True with open(job_state.error_file, "a") as error_file: - log.debug("Trying with error file in _handle_job_failure") if self.__job_failed_due_to_low_memory(job_state): - log.debug(f"OOM detected for job: {job_state.job_id}") error_file.write("Job killed after running out of memory. Try with more memory.\n") job_state.fail_message = "Tool failed due to insufficient memory. Try with more memory." job_state.runner_state = JobState.runner_states.MEMORY_LIMIT_REACHED elif self.__job_failed_due_to_walltime_limit(job): - log.debug(f"Walltime limit reached for job: {job_state.job_id}") error_file.write("DeadlineExceeded") job_state.fail_message = "Job was active longer than specified deadline" job_state.runner_state = JobState.runner_states.WALLTIME_REACHED - elif self.__job_failed_due_to_unknown_exit_code(job_state): - msg = f"Job: {job_state.job_id} failed due to an unknown exit code from the tool." - log.debug(msg) - job_state.fail_message = msg - job_state.runner_state = JobState.runner_states.TOOL_DETECT_ERROR - mark_failed = False else: - msg = f"An unknown error occurred in this job and the maximum number of retries have been exceeded for job: {job_state.job_id}." - log.debug(msg) - error_file.write(msg) - job_state.fail_message = ( - "An unknown error occurered with this job. See standard output within the info section for details." - ) - # changes for resubmission - # job_state.running = False - # self.mark_as_failed(job_state) + error_file.write("Exceeded max number of Kubernetes pod retries allowed for job\n") + job_state.fail_message = "More pods failed than allowed. See stdout for pods details." + job_state.running = False + self.mark_as_failed(job_state) try: if self.__has_guest_ports(job_state.job_wrapper): self.__cleanup_k8s_guest_ports(job_state.job_wrapper, job) self.__cleanup_k8s_job(job) except Exception: log.exception("Could not clean up k8s batch job. Ignoring...") - return mark_failed + return None def __cleanup_k8s_job(self, job): k8s_cleanup_job = self.runner_params["k8s_cleanup_job"] delete_job(job, k8s_cleanup_job) - def __cleanup_k8s_ingress(self, ingress, job_failed=False): + def __cleanup_k8s_ingress(self, ingress, job_failed): k8s_cleanup_job = self.runner_params["k8s_cleanup_job"] delete_ingress(ingress, k8s_cleanup_job, job_failed) - def __cleanup_k8s_service(self, service, job_failed=False): + def __cleanup_k8s_service(self, service, job_failed): k8s_cleanup_job = self.runner_params["k8s_cleanup_job"] delete_service(service, k8s_cleanup_job, job_failed) @@ -921,12 +855,11 @@ def __job_failed_due_to_low_memory(self, job_state): if not pods.response["items"]: return False - # pod = self._get_pod_for_job(job_state) # this was always None - pod = pods.response["items"][0] + pod = self._get_pod_for_job(job_state) if ( pod - and "terminated" in pod["status"]["containerStatuses"][0]["state"] - and pod["status"]["containerStatuses"][0]["state"]["terminated"]["reason"] == "OOMKilled" + and pod.obj["status"]["phase"] == "Failed" + and pod.obj["status"]["containerStatuses"][0]["state"]["terminated"]["reason"] == "OOMKilled" ): return True @@ -943,71 +876,36 @@ def __job_pending_due_to_unschedulable_pod(self, job_state): pod = Pod(self._pykube_api, pods.response["items"][0]) return is_pod_unschedulable(self._pykube_api, pod, self.runner_params["k8s_namespace"]) - def __job_failed_due_to_unknown_exit_code(self, job_state): - """ - checks whether the pod exited prematurely due to an unknown exit code (i.e. not an exit code like OOM that - we can handle). This would mean that the tool failed, but the job should be considered to have succeeded. - """ - pods = find_pod_object_by_name(self._pykube_api, job_state.job_id, self.runner_params["k8s_namespace"]) - if not pods.response["items"]: - return False - - pod = pods.response["items"][0] - if ( - pod - and "terminated" in pod["status"]["containerStatuses"][0]["state"] - and pod["status"]["containerStatuses"][0]["state"]["terminated"].get("exitCode") - ): - return True - - return False - def __cleanup_k8s_guest_ports(self, job_wrapper, k8s_job): k8s_job_prefix = self.__produce_k8s_job_prefix() k8s_job_name = f"{k8s_job_prefix}-{self.__force_label_conformity(job_wrapper.get_id_tag())}" log.debug(f"Deleting service/ingress for job with ID {job_wrapper.get_id_tag()}") + job_failed = k8s_job.obj["status"]["failed"] > 0 if "failed" in k8s_job.obj["status"] else False ingress_to_delete = find_ingress_object_by_name( self._pykube_api, k8s_job_name, self.runner_params["k8s_namespace"] ) if ingress_to_delete and len(ingress_to_delete.response["items"]) > 0: k8s_ingress = Ingress(self._pykube_api, ingress_to_delete.response["items"][0]) - self.__cleanup_k8s_ingress(k8s_ingress) - else: - log.debug(f"No ingress found for job with k8s_job_name {k8s_job_name}") + self.__cleanup_k8s_ingress(k8s_ingress, job_failed) service_to_delete = find_service_object_by_name( self._pykube_api, k8s_job_name, self.runner_params["k8s_namespace"] ) if service_to_delete and len(service_to_delete.response["items"]) > 0: k8s_service = Service(self._pykube_api, service_to_delete.response["items"][0]) - self.__cleanup_k8s_service(k8s_service) - else: - log.debug(f"No service found for job with k8s_job_name {k8s_job_name}") - # remove the interactive environment entrypoints - eps = job_wrapper.get_job().interactivetool_entry_points - if eps: - log.debug(f"Removing entry points for job with ID {job_wrapper.get_id_tag()}") - self.app.interactivetool_manager.remove_entry_points(eps) + self.__cleanup_k8s_service(k8s_service, job_failed) def stop_job(self, job_wrapper): """Attempts to delete a dispatched job to the k8s cluster""" job = job_wrapper.get_job() try: - log.debug(f"Attempting to stop job {job.id} ({job.job_runner_external_id})") job_to_delete = find_job_object_by_name( self._pykube_api, job.get_job_runner_external_id(), self.runner_params["k8s_namespace"] ) if job_to_delete and len(job_to_delete.response["items"]) > 0: k8s_job = Job(self._pykube_api, job_to_delete.response["items"][0]) - log.debug(f"Found job with id {job.get_job_runner_external_id()} to delete") - # For interactive jobs, at this point because the job stopping has been partly handled by the - # interactive tool manager, the job wrapper no longer shows any guest ports. We need another way - # to check if the job is an interactive job. if self.__has_guest_ports(job_wrapper): - log.debug(f"Job {job.id} ({job.job_runner_external_id}) has guest ports, cleaning them up") self.__cleanup_k8s_guest_ports(job_wrapper, k8s_job) self.__cleanup_k8s_job(k8s_job) - else: - log.debug(f"Could not find job with id {job.get_job_runner_external_id()} to delete") # TODO assert whether job parallelism == 0 # assert not job_to_delete.exists(), "Could not delete job,"+job.job_runner_external_id+" it still exists" log.debug(f"({job.id}/{job.job_runner_external_id}) Terminated at user's request") @@ -1052,73 +950,6 @@ def recover(self, job, job_wrapper): ajs.running = False self.monitor_queue.put(ajs) - # added to make sure that stdout and stderr is captured for Kubernetes - def fail_job(self, job_state: "JobState", exception=False, message="Job failed", full_status=None): - log.debug("PP Getting into fail_job in k8s runner") - job = job_state.job_wrapper.get_job() - - # Get STDOUT and STDERR from the job and tool to be stored in the database # - # This is needed because when calling finish_job on a failed job, the check_output method - # overrides the job error state and tries to figure it out from the job output files - # breaking OOM resubmissions. - # To ensure that files below are readable, ownership must be reclaimed first - job_state.job_wrapper.reclaim_ownership() - - # wait for the files to appear - which_try = 0 - while which_try < self.app.config.retry_job_output_collection + 1: - try: - with open(job_state.output_file, "rb") as stdout_file, open(job_state.error_file, "rb") as stderr_file: - job_stdout = self._job_io_for_db(stdout_file) - job_stderr = self._job_io_for_db(stderr_file) - break - except Exception as e: - if which_try == self.app.config.retry_job_output_collection: - job_stdout = "" - job_stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER - log.error(f"{job.id}/{job.job_runner_external_id} {job_stderr}: {unicodify(e)}") - else: - time.sleep(1) - which_try += 1 - - # get stderr and stdout to database - outputs_directory = os.path.join(job_state.job_wrapper.working_directory, "outputs") - if not os.path.exists(outputs_directory): - outputs_directory = job_state.job_wrapper.working_directory - - tool_stdout_path = os.path.join(outputs_directory, "tool_stdout") - tool_stderr_path = os.path.join(outputs_directory, "tool_stderr") - - # TODO: These might not exist for running jobs at the upgrade to 19.XX, remove that - # assumption in 20.XX. - tool_stderr = "Galaxy issue: stderr could not be retrieved from the job working directory." - tool_stdout = "Galaxy issue: stdout could not be retrieved from the job working directory." - if os.path.exists(tool_stdout_path): - with open(tool_stdout_path, "rb") as stdout_file: - tool_stdout = self._job_io_for_db(stdout_file) - else: - # Legacy job, were getting a merged output - assume it is mostly tool output. - tool_stdout = job_stdout - job_stdout = None - - if os.path.exists(tool_stderr_path): - with open(tool_stderr_path, "rb") as stdout_file: - tool_stderr = self._job_io_for_db(stdout_file) - else: - # Legacy job, were getting a merged output - assume it is mostly tool output. - tool_stderr = job_stderr - job_stderr = None - - # full status empty leaves the UI without stderr/stdout - full_status = {"stderr": tool_stderr, "stdout": tool_stdout} - log.debug(f"({job.id}/{job.job_runner_external_id}) tool_stdout: {tool_stdout}") - log.debug(f"({job.id}/{job.job_runner_external_id}) tool_stderr: {tool_stderr}") - log.debug(f"({job.id}/{job.job_runner_external_id}) job_stdout: {job_stdout}") - log.debug(f"({job.id}/{job.job_runner_external_id}) job_stderr: {job_stderr}") - - # run super method - super().fail_job(job_state, exception, message, full_status) - def finish_job(self, job_state): self._handle_metadata_externally(job_state.job_wrapper, resolve_requirements=True) super().finish_job(job_state) diff --git a/lib/galaxy/jobs/runners/util/pykube_util.py b/lib/galaxy/jobs/runners/util/pykube_util.py index 28020a8c8dde..7c3f32d87b09 100644 --- a/lib/galaxy/jobs/runners/util/pykube_util.py +++ b/lib/galaxy/jobs/runners/util/pykube_util.py @@ -31,7 +31,7 @@ DEFAULT_JOB_API_VERSION = "batch/v1" DEFAULT_SERVICE_API_VERSION = "v1" -DEFAULT_INGRESS_API_VERSION = "networking.k8s.io/v1" +DEFAULT_INGRESS_API_VERSION = "extensions/v1beta1" DEFAULT_NAMESPACE = "default" INSTANCE_ID_INVALID_MESSAGE = ( "Galaxy instance [%s] is either too long "