diff --git a/lib/galaxy/dependencies/__init__.py b/lib/galaxy/dependencies/__init__.py index aca4c3fd8e84..0cfa3f7148ea 100644 --- a/lib/galaxy/dependencies/__init__.py +++ b/lib/galaxy/dependencies/__init__.py @@ -207,7 +207,7 @@ def check_total_perspective_vortex(self): def check_pbs_python(self): return "galaxy.jobs.runners.pbs:PBSJobRunner" in self.job_runners - def check_pykube(self): + def check_pykube_ng(self): return "galaxy.jobs.runners.kubernetes:KubernetesJobRunner" in self.job_runners or which("kubectl") def check_chronos_python(self): diff --git a/lib/galaxy/dependencies/conditional-requirements.txt b/lib/galaxy/dependencies/conditional-requirements.txt index 52278d07cbd3..0f5f82b4efcc 100644 --- a/lib/galaxy/dependencies/conditional-requirements.txt +++ b/lib/galaxy/dependencies/conditional-requirements.txt @@ -35,7 +35,7 @@ custos-sdk chronos-python==1.2.1 # Kubernetes job runner -pykube==0.15.0 +pykube-ng==21.3.0 # Synnefo / Pithos+ object store client kamaki diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 5709e12bfce9..ffac91490a2f 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -2,10 +2,12 @@ Offload jobs to a Kubernetes cluster. """ +import json # for debugging of API objects import logging import math import os import re +import time from datetime import datetime import yaml @@ -43,6 +45,7 @@ Service, service_object_dict, ) +from galaxy.util import unicodify from galaxy.util.bytesize import ByteSize log = logging.getLogger(__name__) @@ -95,11 +98,17 @@ def __init__(self, app, nworkers, **kwargs): map=str, valid=lambda s: s == "$gid" or isinstance(s, int) or not s or s.isdigit(), default=None ), k8s_cleanup_job=dict(map=str, valid=lambda s: s in {"onsuccess", "always", "never"}, default="always"), - k8s_pod_retries=dict(map=int, valid=lambda x: int(x) >= 0, default=3), + k8s_pod_retries=dict( + map=int, valid=lambda x: int(x) >= 0, default=1 + ), # note that if the backOffLimit is lower, this paramer will have no effect. + k8s_job_spec_back_off_limit=dict( + map=int, valid=lambda x: int(x) >= 0, default=0 + ), # this means that it will stop retrying after 1 failure. k8s_walltime_limit=dict(map=int, valid=lambda x: int(x) >= 0, default=172800), k8s_unschedulable_walltime_limit=dict(map=int, valid=lambda x: not x or int(x) >= 0, default=None), k8s_interactivetools_use_ssl=dict(map=bool, default=False), k8s_interactivetools_ingress_annotations=dict(map=str), + k8s_interactivetools_ingress_class=dict(map=str, default=None), ) if "runner_param_specs" not in kwargs: @@ -213,7 +222,11 @@ def queue_job(self, job_wrapper): self.monitor_queue.put(ajs) def __has_guest_ports(self, job_wrapper): - return bool(job_wrapper.guest_ports) + # Check if job has guest ports or interactive tool entry points that would signal that + log.debug( + f"Checking if job {job_wrapper.get_id_tag()} is an interactive tool. guest ports: {job_wrapper.guest_ports}. interactive entry points: {job_wrapper.get_job().interactivetool_entry_points}" + ) + return bool(job_wrapper.guest_ports) or bool(job_wrapper.get_job().interactivetool_entry_points) def __configure_port_routing(self, ajs): # Configure interactive tool entry points first @@ -230,6 +243,9 @@ def __configure_port_routing(self, ajs): k8s_service_obj = service_object_dict(self.runner_params, k8s_job_name, self.__get_k8s_service_spec(ajs)) k8s_ingress_obj = ingress_object_dict(self.runner_params, k8s_job_name, self.__get_k8s_ingress_spec(ajs)) + log.debug(f"Kubernetes service object: {json.dumps(k8s_service_obj, indent=4)}") + log.debug(f"Kubernetes ingress object: {json.dumps(k8s_ingress_obj, indent=4)}") + # We avoid creating service and ingress if they already exist (e.g. when Galaxy is restarted or resubmitting a job) service = Service(self._pykube_api, k8s_service_obj) service.create() ingress = Ingress(self._pykube_api, k8s_ingress_obj) @@ -319,6 +335,7 @@ def __get_k8s_job_spec(self, ajs): job_ttl = self.runner_params["k8s_job_ttl_secs_after_finished"] if self.runner_params["k8s_cleanup_job"] != "never" and job_ttl is not None: k8s_job_spec["ttlSecondsAfterFinished"] = job_ttl + k8s_job_spec["backoffLimit"] = self.runner_params["k8s_job_spec_back_off_limit"] return k8s_job_spec def __force_label_conformity(self, value): @@ -457,10 +474,12 @@ def __get_k8s_ingress_spec(self, ajs): "paths": [ { "backend": { - "serviceName": self.__get_k8s_job_name( - self.__produce_k8s_job_prefix(), ajs.job_wrapper - ), - "servicePort": int(ep["tool_port"]), + "service": { + "name": self.__get_k8s_job_name( + self.__produce_k8s_job_prefix(), ajs.job_wrapper + ), + "port": {"number": int(ep["tool_port"])}, + } }, "path": ep.get("entry_path", "/"), "pathType": "Prefix", @@ -469,9 +488,12 @@ def __get_k8s_ingress_spec(self, ajs): }, } for ep in entry_points - ] + ], }, } + default_ingress_class = self.runner_params.get("k8s_interactivetools_ingress_class") + if default_ingress_class: + k8s_spec_template["spec"]["ingressClassName"] = default_ingress_class if self.runner_params.get("k8s_interactivetools_use_ssl"): domains = list({e["domain"] for e in entry_points}) k8s_spec_template["spec"]["tls"] = [ @@ -522,7 +544,8 @@ def __get_k8s_containers(self, ajs): # command line execution, separated by ;, which is what Galaxy does # to assemble the command. "command": [ajs.job_wrapper.shell], - "args": ["-c", ajs.job_file], + # Make sure that the exit code is propagated to k8s, so k8s knows why the tool failed (e.g. OOM) + "args": ["-c", f"{ajs.job_file}; exit $(cat {ajs.exit_code_file})"], "workingDir": ajs.job_wrapper.working_directory, "volumeMounts": deduplicate_entries(mounts), } @@ -704,6 +727,10 @@ def check_watched_item(self, job_state): else: max_pod_retries = 1 + # make sure that we don't have any conditions by which the runner + # would wait forever for a pod that never gets sent. + max_pod_retries = min(max_pod_retries, self.runner_params["k8s_job_spec_back_off_limit"]) + # Check if job.obj['status'] is empty, # return job_state unchanged if this is the case # as probably this means that the k8s API server hasn't @@ -724,10 +751,12 @@ def check_watched_item(self, job_state): if succeeded > 0 or job_state == model.Job.states.STOPPED: job_state.running = False self.mark_as_finished(job_state) + log.debug(f"Job id: {job_state.job_id} with k8s id: {job.name} succeeded") return None - elif active > 0 and failed <= max_pod_retries: + elif active > 0 and failed < max_pod_retries + 1: if not job_state.running: if self.__job_pending_due_to_unschedulable_pod(job_state): + log.debug(f"Job id: {job_state.job_id} with k8s id: {job.name} pending...") if self.runner_params.get("k8s_unschedulable_walltime_limit"): creation_time_str = job.obj["metadata"].get("creationTimestamp") creation_time = datetime.strptime(creation_time_str, "%Y-%m-%dT%H:%M:%SZ") @@ -739,27 +768,49 @@ def check_watched_item(self, job_state): else: pass else: + log.debug("Job set to running...") job_state.running = True job_state.job_wrapper.change_state(model.Job.states.RUNNING) return job_state elif job_persisted_state == model.Job.states.DELETED: # Job has been deleted via stop_job and job has not been deleted, # remove from watched_jobs by returning `None` + log.debug(f"Job id: {job_state.job_id} has been already deleted...") if job_state.job_wrapper.cleanup_job in ("always", "onsuccess"): job_state.job_wrapper.cleanup() return None else: - return self._handle_job_failure(job, job_state) + log.debug( + f"Job id: {job_state.job_id} failed and it is not a deletion case. Current state: {job_state.job_wrapper.get_state()}" + ) + if self._handle_job_failure(job, job_state): + # changes for resubmission (removed self.mark_as_failed from handle_job_failure) + self.work_queue.put((self.mark_as_failed, job_state)) + else: + # Job failure was not due to a k8s issue or something that k8s can handle, so it's a tool error. + job_state.running = False + self.mark_as_finished(job_state) + return None + + return None elif len(jobs.response["items"]) == 0: if job_state.job_wrapper.get_job().state == model.Job.states.DELETED: - # Job has been deleted via stop_job and job has been deleted, - # cleanup and remove from watched_jobs by returning `None` if job_state.job_wrapper.cleanup_job in ("always", "onsuccess"): job_state.job_wrapper.cleanup() return None + if job_state.job_wrapper.get_job().state == model.Job.states.STOPPED and self.__has_guest_ports( + job_state.job_wrapper + ): + # Interactive job has been stopped via stop_job (most likely by the user), + # cleanup and remove from watched_jobs by returning `None`. STOPPED jobs are cleaned up elsewhere. + # Marking as finished makes sure that the interactive job output is available in the UI. + self.mark_as_finished(job_state) + return None # there is no job responding to this job_id, it is either lost or something happened. - log.error("No Jobs are available under expected selector app=%s", job_state.job_id) + log.error( + f"No Jobs are available under expected selector app={job_state.job_id} and they are not deleted or stopped either." + ) self.mark_as_failed(job_state) # job is no longer viable - remove from watched jobs return None @@ -786,37 +837,52 @@ def _handle_unschedulable_job(self, job, job_state): def _handle_job_failure(self, job, job_state): # Figure out why job has failed + mark_failed = True with open(job_state.error_file, "a") as error_file: + log.debug("Trying with error file in _handle_job_failure") if self.__job_failed_due_to_low_memory(job_state): + log.debug(f"OOM detected for job: {job_state.job_id}") error_file.write("Job killed after running out of memory. Try with more memory.\n") job_state.fail_message = "Tool failed due to insufficient memory. Try with more memory." job_state.runner_state = JobState.runner_states.MEMORY_LIMIT_REACHED elif self.__job_failed_due_to_walltime_limit(job): + log.debug(f"Walltime limit reached for job: {job_state.job_id}") error_file.write("DeadlineExceeded") job_state.fail_message = "Job was active longer than specified deadline" job_state.runner_state = JobState.runner_states.WALLTIME_REACHED + elif self.__job_failed_due_to_unknown_exit_code(job_state): + msg = f"Job: {job_state.job_id} failed due to an unknown exit code from the tool." + log.debug(msg) + job_state.fail_message = msg + job_state.runner_state = JobState.runner_states.TOOL_DETECT_ERROR + mark_failed = False else: - error_file.write("Exceeded max number of Kubernetes pod retries allowed for job\n") - job_state.fail_message = "More pods failed than allowed. See stdout for pods details." - job_state.running = False - self.mark_as_failed(job_state) + msg = f"An unknown error occurred in this job and the maximum number of retries have been exceeded for job: {job_state.job_id}." + log.debug(msg) + error_file.write(msg) + job_state.fail_message = ( + "An unknown error occurered with this job. See standard output within the info section for details." + ) + # changes for resubmission + # job_state.running = False + # self.mark_as_failed(job_state) try: if self.__has_guest_ports(job_state.job_wrapper): self.__cleanup_k8s_guest_ports(job_state.job_wrapper, job) self.__cleanup_k8s_job(job) except Exception: log.exception("Could not clean up k8s batch job. Ignoring...") - return None + return mark_failed def __cleanup_k8s_job(self, job): k8s_cleanup_job = self.runner_params["k8s_cleanup_job"] delete_job(job, k8s_cleanup_job) - def __cleanup_k8s_ingress(self, ingress, job_failed): + def __cleanup_k8s_ingress(self, ingress, job_failed=False): k8s_cleanup_job = self.runner_params["k8s_cleanup_job"] delete_ingress(ingress, k8s_cleanup_job, job_failed) - def __cleanup_k8s_service(self, service, job_failed): + def __cleanup_k8s_service(self, service, job_failed=False): k8s_cleanup_job = self.runner_params["k8s_cleanup_job"] delete_service(service, k8s_cleanup_job, job_failed) @@ -844,11 +910,12 @@ def __job_failed_due_to_low_memory(self, job_state): if not pods.response["items"]: return False - pod = self._get_pod_for_job(job_state) + # pod = self._get_pod_for_job(job_state) # this was always None + pod = pods.response["items"][0] if ( pod - and pod.obj["status"]["phase"] == "Failed" - and pod.obj["status"]["containerStatuses"][0]["state"]["terminated"]["reason"] == "OOMKilled" + and "terminated" in pod["status"]["containerStatuses"][0]["state"] + and pod["status"]["containerStatuses"][0]["state"]["terminated"]["reason"] == "OOMKilled" ): return True @@ -865,36 +932,71 @@ def __job_pending_due_to_unschedulable_pod(self, job_state): pod = Pod(self._pykube_api, pods.response["items"][0]) return is_pod_unschedulable(self._pykube_api, pod, self.runner_params["k8s_namespace"]) + def __job_failed_due_to_unknown_exit_code(self, job_state): + """ + checks whether the pod exited prematurely due to an unknown exit code (i.e. not an exit code like OOM that + we can handle). This would mean that the tool failed, but the job should be considered to have succeeded. + """ + pods = find_pod_object_by_name(self._pykube_api, job_state.job_id, self.runner_params["k8s_namespace"]) + if not pods.response["items"]: + return False + + pod = pods.response["items"][0] + if ( + pod + and "terminated" in pod["status"]["containerStatuses"][0]["state"] + and pod["status"]["containerStatuses"][0]["state"]["terminated"].get("exitCode") + ): + return True + + return False + def __cleanup_k8s_guest_ports(self, job_wrapper, k8s_job): k8s_job_prefix = self.__produce_k8s_job_prefix() k8s_job_name = f"{k8s_job_prefix}-{self.__force_label_conformity(job_wrapper.get_id_tag())}" log.debug(f"Deleting service/ingress for job with ID {job_wrapper.get_id_tag()}") - job_failed = k8s_job.obj["status"]["failed"] > 0 if "failed" in k8s_job.obj["status"] else False ingress_to_delete = find_ingress_object_by_name( self._pykube_api, k8s_job_name, self.runner_params["k8s_namespace"] ) if ingress_to_delete and len(ingress_to_delete.response["items"]) > 0: k8s_ingress = Ingress(self._pykube_api, ingress_to_delete.response["items"][0]) - self.__cleanup_k8s_ingress(k8s_ingress, job_failed) + self.__cleanup_k8s_ingress(k8s_ingress) + else: + log.debug(f"No ingress found for job with k8s_job_name {k8s_job_name}") service_to_delete = find_service_object_by_name( self._pykube_api, k8s_job_name, self.runner_params["k8s_namespace"] ) if service_to_delete and len(service_to_delete.response["items"]) > 0: k8s_service = Service(self._pykube_api, service_to_delete.response["items"][0]) - self.__cleanup_k8s_service(k8s_service, job_failed) + self.__cleanup_k8s_service(k8s_service) + else: + log.debug(f"No service found for job with k8s_job_name {k8s_job_name}") + # remove the interactive environment entrypoints + eps = job_wrapper.get_job().interactivetool_entry_points + if eps: + log.debug(f"Removing entry points for job with ID {job_wrapper.get_id_tag()}") + self.app.interactivetool_manager.remove_entry_points(eps) def stop_job(self, job_wrapper): """Attempts to delete a dispatched job to the k8s cluster""" job = job_wrapper.get_job() try: + log.debug(f"Attempting to stop job {job.id} ({job.job_runner_external_id})") job_to_delete = find_job_object_by_name( self._pykube_api, job.get_job_runner_external_id(), self.runner_params["k8s_namespace"] ) if job_to_delete and len(job_to_delete.response["items"]) > 0: k8s_job = Job(self._pykube_api, job_to_delete.response["items"][0]) + log.debug(f"Found job with id {job.get_job_runner_external_id()} to delete") + # For interactive jobs, at this point because the job stopping has been partly handled by the + # interactive tool manager, the job wrapper no longer shows any guest ports. We need another way + # to check if the job is an interactive job. if self.__has_guest_ports(job_wrapper): + log.debug(f"Job {job.id} ({job.job_runner_external_id}) has guest ports, cleaning them up") self.__cleanup_k8s_guest_ports(job_wrapper, k8s_job) self.__cleanup_k8s_job(k8s_job) + else: + log.debug(f"Could not find job with id {job.get_job_runner_external_id()} to delete") # TODO assert whether job parallelism == 0 # assert not job_to_delete.exists(), "Could not delete job,"+job.job_runner_external_id+" it still exists" log.debug(f"({job.id}/{job.job_runner_external_id}) Terminated at user's request") @@ -939,6 +1041,73 @@ def recover(self, job, job_wrapper): ajs.running = False self.monitor_queue.put(ajs) + # added to make sure that stdout and stderr is captured for Kubernetes + def fail_job(self, job_state: "JobState", exception=False, message="Job failed", full_status=None): + log.debug("PP Getting into fail_job in k8s runner") + job = job_state.job_wrapper.get_job() + + # Get STDOUT and STDERR from the job and tool to be stored in the database # + # This is needed because when calling finish_job on a failed job, the check_output method + # overrides the job error state and tries to figure it out from the job output files + # breaking OOM resubmissions. + # To ensure that files below are readable, ownership must be reclaimed first + job_state.job_wrapper.reclaim_ownership() + + # wait for the files to appear + which_try = 0 + while which_try < self.app.config.retry_job_output_collection + 1: + try: + with open(job_state.output_file, "rb") as stdout_file, open(job_state.error_file, "rb") as stderr_file: + job_stdout = self._job_io_for_db(stdout_file) + job_stderr = self._job_io_for_db(stderr_file) + break + except Exception as e: + if which_try == self.app.config.retry_job_output_collection: + job_stdout = "" + job_stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER + log.error(f"{job.id}/{job.job_runner_external_id} {job_stderr}: {unicodify(e)}") + else: + time.sleep(1) + which_try += 1 + + # get stderr and stdout to database + outputs_directory = os.path.join(job_state.job_wrapper.working_directory, "outputs") + if not os.path.exists(outputs_directory): + outputs_directory = job_state.job_wrapper.working_directory + + tool_stdout_path = os.path.join(outputs_directory, "tool_stdout") + tool_stderr_path = os.path.join(outputs_directory, "tool_stderr") + + # TODO: These might not exist for running jobs at the upgrade to 19.XX, remove that + # assumption in 20.XX. + tool_stderr = "Galaxy issue: stderr could not be retrieved from the job working directory." + tool_stdout = "Galaxy issue: stdout could not be retrieved from the job working directory." + if os.path.exists(tool_stdout_path): + with open(tool_stdout_path, "rb") as stdout_file: + tool_stdout = self._job_io_for_db(stdout_file) + else: + # Legacy job, were getting a merged output - assume it is mostly tool output. + tool_stdout = job_stdout + job_stdout = None + + if os.path.exists(tool_stderr_path): + with open(tool_stderr_path, "rb") as stdout_file: + tool_stderr = self._job_io_for_db(stdout_file) + else: + # Legacy job, were getting a merged output - assume it is mostly tool output. + tool_stderr = job_stderr + job_stderr = None + + # full status empty leaves the UI without stderr/stdout + full_status = {"stderr": tool_stderr, "stdout": tool_stdout} + log.debug(f"({job.id}/{job.job_runner_external_id}) tool_stdout: {tool_stdout}") + log.debug(f"({job.id}/{job.job_runner_external_id}) tool_stderr: {tool_stderr}") + log.debug(f"({job.id}/{job.job_runner_external_id}) job_stdout: {job_stdout}") + log.debug(f"({job.id}/{job.job_runner_external_id}) job_stderr: {job_stderr}") + + # run super method + super().fail_job(job_state, exception, message, full_status) + def finish_job(self, job_state): self._handle_metadata_externally(job_state.job_wrapper, resolve_requirements=True) super().finish_job(job_state) diff --git a/lib/galaxy/jobs/runners/util/pykube_util.py b/lib/galaxy/jobs/runners/util/pykube_util.py index 7c3f32d87b09..28020a8c8dde 100644 --- a/lib/galaxy/jobs/runners/util/pykube_util.py +++ b/lib/galaxy/jobs/runners/util/pykube_util.py @@ -31,7 +31,7 @@ DEFAULT_JOB_API_VERSION = "batch/v1" DEFAULT_SERVICE_API_VERSION = "v1" -DEFAULT_INGRESS_API_VERSION = "extensions/v1beta1" +DEFAULT_INGRESS_API_VERSION = "networking.k8s.io/v1" DEFAULT_NAMESPACE = "default" INSTANCE_ID_INVALID_MESSAGE = ( "Galaxy instance [%s] is either too long "