diff --git a/tesk/api/ga4gh/tes/models.py b/tesk/api/ga4gh/tes/models.py index 54bb6ba..4acb31a 100644 --- a/tesk/api/ga4gh/tes/models.py +++ b/tesk/api/ga4gh/tes/models.py @@ -478,6 +478,13 @@ class TesServiceInfo(Service): type: Optional[TesServiceType] = None # type: ignore +class TesTaskMinimal(BaseModel): + """TaskMinimal describes a minimal task object.""" + + id: str = Field(..., description="Task identifier assigned by the server.") + state: TesState = TesState.UNKNOWN + + class TesTask(BaseModel): """Task describes a task to be run.""" @@ -568,3 +575,11 @@ class TesListTasksResponse(BaseModel): description="Token used to return the next page of results. This value can be " "used\nin the `page_token` field of the next ListTasks request.", ) + + +class TaskView(str, Enum): + """Task view type.""" + + BASIC = "basic" + MINIMAL = "minimal" + FULL = "full" diff --git a/tesk/api/ga4gh/tes/task/get_task.py b/tesk/api/ga4gh/tes/task/get_task.py new file mode 100644 index 0000000..26dcfd1 --- /dev/null +++ b/tesk/api/ga4gh/tes/task/get_task.py @@ -0,0 +1,77 @@ +"""TESK API module to retrieve a task.""" + +import logging +from typing import Optional + +from kubernetes.client.models import V1Job, V1JobList, V1PodList + +from tesk.api.ga4gh.tes.models import ( + TaskView, +) +from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest +from tesk.k8s.converter.data.task_builder import TaskBuilder + +logger = logging.getLogger(__name__) + + +class GetTesTask(TesTaskRequest): + """Retrieve a TES task. + + Arguments: + task_id: TES task ID. + task_view: Verbosity of the task in the list. + """ + + def __init__( + self, + task_id: str, + task_view: TaskView, + ): + """Initialize the CreateTask class. + + Args: + task_id: TES task ID. + task_view: Verbosity of the task in the list. + """ + super().__init__() + self.task_id = task_id + self.task_view = task_view + + def handle_request(self): + """Return the response.""" + taskmaster_job: V1Job = self.kubernetes_client_wrapper.read_taskmaster_job( + self.task_id + ) + + taskmaster_job_name: Optional[str] = taskmaster_job.metadata.name + assert taskmaster_job_name is not None + + execution_job_list: V1JobList = ( + self.kubernetes_client_wrapper.list_single_task_executor_jobs( + taskmaster_job_name + ) + ) + taskmaster_pods: V1PodList = ( + self.kubernetes_client_wrapper.list_single_job_pods(taskmaster_job) + ) + + task_builder = ( + TaskBuilder.new_single_task() + .add_job(taskmaster_job) + .add_job_list(execution_job_list.items) + .add_pod_list(taskmaster_pods.items) + ) + + for executor_job in execution_job_list.items: + executor_job_pods: V1PodList = ( + self.kubernetes_client_wrapper.list_single_job_pods(executor_job) + ) + task_builder.add_pod_list(executor_job_pods.items) + + output_filer_job_list: Optional[V1Job] = ( + self.kubernetes_client_wrapper.get_single_task_output_filer_job( + taskmaster_job_name + ) + ) + + return self._get_task(task_builder, output_filer_job_list, self.task_view) diff --git a/tesk/api/ga4gh/tes/task/list_tasks.py b/tesk/api/ga4gh/tes/task/list_tasks.py new file mode 100644 index 0000000..e8e482d --- /dev/null +++ b/tesk/api/ga4gh/tes/task/list_tasks.py @@ -0,0 +1,81 @@ +"""TESK API modeule to list tasks.""" + +from typing import List + +from kubernetes.client.models import V1JobList + +from tesk.api.ga4gh.tes.models import TaskView, TesListTasksResponse, TesTask +from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest +from tesk.k8s.converter.data.task_builder import TaskBuilder + + +class ListTesTask(TesTaskRequest): + """Gets a full list of tasks. + + Performs Kubernetes API batch calls (all taskmasters, all executors, all pods), + combines them together into valid Task objects and converts to result with + means of the converter. + + Arguments: + page_size: The maximum number of tasks to return in a single page. + page_token: The continuation token, which is used to page through large result + sets. + view: Verbosity of the task in the list. + namePrefix: The prefix of the task name. + """ + + def __init__( + self, + page_size: int, + page_token: str, + view: TaskView, + namePrefix: str, + ): + """Initialise the ListTesTask class. + + Args: + page_size: The maximum number of tasks to return in a single page. + page_token: The continuation token, which is used to page through large + result sets. + view: Verbosity of the task in the list. + namePrefix: The prefix of the task name. + """ + self.page_size = page_size + self.page_token = page_token + self.view = view + self.namePrefix = namePrefix + + def handle_request(self) -> TesListTasksResponse: + """Business logic for the list tasks request. + + Returns: + TesListTasksResponse: The response containing the list of tasks and next + page token. + """ + taskmaster_jobs: V1JobList = ( + self.kubernetes_client_wrapper.list_all_taskmaster_jobs_for_user( + page_token=self.page_token, + items_per_page=self.page_size, + ) + ) + executor_jobs: V1JobList = ( + self.kubernetes_client_wrapper.list_all_task_executor_jobs() + ) + filer_jobs: V1JobList = self.kubernetes_client_wrapper.list_all_filer_jobs() + job_pods = self.kubernetes_client_wrapper.list_all_job_pods() + task_list_builder: TaskBuilder = ( + TaskBuilder.new_task_list() + .add_job_list(taskmaster_jobs.items()) + .add_job_list(executor_jobs.items()) + .add_job_list(filer_jobs.items()) + .add_pod_list(job_pods.items()) + ) + + tasks: List[TesTask] = [ + self._get_task(task, self.view, True) + for task in task_list_builder.get_task_list() + ] + + next_page_token = taskmaster_jobs.metadata._continue() + + return TesListTasksResponse(tasks=tasks, next_page_token=next_page_token) diff --git a/tesk/api/ga4gh/tes/task/task_request.py b/tesk/api/ga4gh/tes/task/task_request.py index 0c6657b..67b7a6a 100644 --- a/tesk/api/ga4gh/tes/task/task_request.py +++ b/tesk/api/ga4gh/tes/task/task_request.py @@ -9,6 +9,9 @@ from tesk.k8s.constants import tesk_k8s_constants from tesk.k8s.converter.converter import TesKubernetesConverter from tesk.k8s.wrapper import KubernetesClientWrapper +from tesk.k8s.converter.data.task import Task +from tesk.api.ga4gh.tes.models import TaskView +from tesk.api.ga4gh.tes.models import TesTask logger = logging.getLogger(__name__) @@ -43,3 +46,47 @@ def response(self) -> dict: except (TypeError, ValueError) as e: logger.info(e) return response.dict() + + def _get_task(self, task_object: Task, view: TaskView, is_list: bool) -> TesTask: + """Shared method to process Task objects into TesTask responses. + + Args: + task_object: The Task object containing Kubernetes resources. + view: Verbosity level for the response. + is_list: Whether the response is a list of tasks. + + Returns: + TesTask: The processed TES task. + """ + if view == TaskView.MINIMAL: + return self.tes_kubernetes_converter.from_k8s_jobs_to_tes_task_minimal( + task_object, is_list + ) + + task = self.tes_kubernetes_converter.from_k8s_jobs_to_tes_task_basic( + task_object, view == TaskView.BASIC + ) + + if view == TaskView.BASIC: + return task + + for i, executor_job in enumerate(task_object.get_executors()): + if executor_job.has_pods(): + executor_log = task.logs[0].logs[i] + if view == TaskView.FULL: + executor_pod_name = executor_job.get_first_pod().metadata.name + executor_pod_log = self.kubernetes_client_wrapper.read_pod_log( + executor_pod_name + ) + if executor_pod_log: + executor_log.stdout = executor_pod_log + + if task_object.taskmaster.has_pods(): + taskmaster_pod_name = task_object.taskmaster.get_first_pod().metadata.name + taskmaster_pod_log = self.kubernetes_client_wrapper.read_pod_log( + taskmaster_pod_name + ) + if taskmaster_pod_log: + task.logs[0].system_logs.append(taskmaster_pod_log) + + return task diff --git a/tesk/k8s/converter/converter.py b/tesk/k8s/converter/converter.py index 6ab704c..b72c4b3 100644 --- a/tesk/k8s/converter/converter.py +++ b/tesk/k8s/converter/converter.py @@ -15,7 +15,9 @@ V1Container, V1EnvVar, V1JobSpec, + V1JobStatus, V1ObjectMeta, + V1PodList, V1PodSpec, V1PodTemplateSpec, V1ResourceRequirements, @@ -23,20 +25,26 @@ ) from kubernetes.client.models import V1Job from kubernetes.utils.quantity import parse_quantity # type: ignore +from pydantic import ValidationError from tesk.api.ga4gh.tes.models import ( TesExecutor, + TesExecutorLog, TesResources, + TesState, TesTask, + TesTaskLog, + TesTaskMinimal, ) from tesk.custom_config import Taskmaster from tesk.k8s.constants import tesk_k8s_constants -from tesk.k8s.converter.data.job import Job +from tesk.k8s.converter.data.job import Job, JobStatus from tesk.k8s.converter.data.task import Task from tesk.k8s.converter.executor_command_wrapper import ExecutorCommandWrapper from tesk.k8s.converter.template import KubernetesTemplateSupplier from tesk.k8s.wrapper import KubernetesClientWrapper from tesk.utils import ( + format_datetime, get_taskmaster_env_property, pydantic_model_list_dict, ) @@ -350,3 +358,258 @@ def from_tes_executor_to_k8s_job( # noqa: PLR0913, PLR0912 executor_job.spec.template.spec.volumes = [] return executor_job + + def is_job_in_status( + self, tested_object: V1JobStatus, test_objective: JobStatus + ) -> bool: + """Check if the job is in the given status. + + Args: + tested_object: tested object, V1JobStatus of a job + test_objective: test object, status to be checked against + + Returns: + bool: True if the job is in the given status, False otherwise + """ + no_of_pods_in_state: Optional[int] = None + + match test_objective: + case JobStatus.ACTIVE: + no_of_pods_in_state = tested_object.active() + case JobStatus.SUCCEEDED: + no_of_pods_in_state = tested_object.succeeded() + case JobStatus.FAILED: + succeeded_pods = tested_object.succeeded() + if succeeded_pods and succeeded_pods > 0: + # if there are any successful, the job has not FAILED + return False + no_of_pods_in_state = tested_object.failed() + + return no_of_pods_in_state is not None and no_of_pods_in_state > 0 + + def extract_state_from_k8s_jobs(self, taskmaster_with_executors: Task) -> TesState: + """Derives TES task's status from task's object graph. + + Uses status of taskmaster's and executor's jobs and taskmaster's and executor's + pods. + + Args: + taskmaster_with_executors: taskMaster's full object graph + + Returns: + TesState: TES task's state + """ + taskmaster_job = taskmaster_with_executors.get_taskmaster().get_job() + last_executor: Optional[Job] = taskmaster_with_executors.get_last_executor() + output_filer: Optional[Job] = taskmaster_with_executors.get_output_filer() + + taskmaster_cancelled: bool = ( + taskmaster_job.metadata.labels.get( + self.tesk_k8s_constants.label_constants.LABEL_TASKSTATE_KEY + ) + == self.tesk_k8s_constants.label_constants.LABEL_TASKSTATE_VALUE_CANC + ) + taskmaster_running = self.is_job_in_status( + taskmaster_job.status, JobStatus.ACTIVE + ) + taskmaster_completed = self.is_job_in_status( + taskmaster_job.status, JobStatus.SUCCEEDED + ) + + executor_present = last_executor is not None + last_executor_failed = executor_present and self.is_job_in_status( + last_executor.get_job().status, JobStatus.FAILED + ) + last_executor_completed = executor_present and self.is_job_in_status( + last_executor.get_job().status, JobStatus.SUCCEEDED + ) + output_filer_failed = output_filer is not None and self.is_job_in_status( + output_filer.get_job().status, JobStatus.FAILED + ) + + pending = self.tesk_k8s_constants.k8s_constants.PodPhase.PENDING.get_code() + taskmaster_pending = any( + pod.status.phase == pending + for pod in taskmaster_with_executors.get_taskmaster().get_pods() + ) + last_executor_pending = executor_present and any( + pod.status.phase == pending for pod in last_executor.get_pods() + ) + + state = TesState.SYSTEM_ERROR + + if taskmaster_cancelled: + state = TesState.CANCELED + elif taskmaster_completed and output_filer_failed: + state = TesState.SYSTEM_ERROR + elif taskmaster_completed and last_executor_completed: + state = TesState.COMPLETE + elif taskmaster_completed and last_executor_failed: + state = TesState.EXECUTOR_ERROR + elif taskmaster_pending: + state = TesState.QUEUED + elif taskmaster_running and not executor_present: + state = TesState.INITIALIZING + elif last_executor_pending: + state = TesState.QUEUED + elif taskmaster_running: + state = TesState.RUNNING + + return state + + def executor_logs_from_k8s_job_and_pod(executor: Job) -> TesExecutorLog: + """Extracts TesExecutorLog from executor job and pod objects. + + Args: + executor: executor job object + + Note: + - Does not contain stdout (which needs access to pod log). + - If exit code is not available, it is set to -1. + + Returns: + TesExecutorLog: executor log object (part of basic and full output) + """ + # Initialize with a sentinel value + exit_code: int = -1 + + if executor.has_pods(): + pod_status = executor.get_first_pod().status + if pod_status: + container_statuses = pod_status.container_statuses + if container_statuses and len(container_statuses) > 0: + container_status = container_statuses[0] + state = container_status.state + if state and state.terminated: + exit_code = state.terminated.exit_code + log = TesExecutorLog(exit_code=exit_code) + + executor_job = executor.get_job() + + # Set start_time and end_time with proper handling + start_time = executor_job.status.start_time + log.start_time = format_datetime(start_time) if start_time else None + + completion_time = executor_job.status.completion_time + log.end_time = format_datetime(completion_time) if completion_time else None + + return log + + def from_k8s_jobs_to_tes_task_minimal( + self, taskmaster_with_executors: Task, is_list: bool + ) -> TesTaskMinimal: + """Extracts a minimal view of TesTask from taskmaster's, executor's and pod. + + Minimal view of the task will only include the ID and the state of the task. + + Args: + taskmaster_with_executors: The Task object containing Kubernetes resources. + is_list: Flag indicating if the method is called in a list context. + + Returns: + TesTask: The minimal TesTask object. + """ + task_master_job = taskmaster_with_executors.get_taskmaster().get_job() + + metadata = task_master_job.metadata + id = metadata.name + + task = TesTaskMinimal(id=id) + task.state = self.extract_state_from_k8s_jobs(taskmaster_with_executors) + + if not is_list: + # NOTE: Java implementation add TesTaskLog to the response here + # but it is not clear what it is for, assuming it is for + # post-authentication logging, is so it can be ommitted. + # cf. https://github.com/elixir-cloud-aai/tesk-api/blob/12754b840a0931d4d51a5619161f7b3684ccfded/src/main/java/uk/ac/ebi/tsc/tesk/k8s/convert/TesKubernetesConverter.java#L309C1-L317C10 + logger.info("Returning a single task") + + return task + + def from_k8s_jobs_to_tes_task_basic( + self, taskmaster_with_executors: Task, nullify_input_content: bool + ) -> TesTask: + """Extracts a basic view of TesTask from taskmaster's. executors' job and pod. + + Args: + taskmaster_with_executors: The Task object containing k8s resources. + nullify_input_content: Flag to remove input content in the result. + + Returns: + TesTask: The basic TesTask object. + """ + task_master_job = taskmaster_with_executors.get_taskmaster().get_job() + task_master_job_metadata = task_master_job.metadata + annotations = task_master_job_metadata.annotations or {} + input_json = annotations.get( + self.tesk_k8s_constants.annotation_constants.ANN_JSON_INPUT_KEY, "" + ) + + try: + if input_json: + task = TesTask.parse_raw(input_json) + if nullify_input_content and task.inputs: + for input_item in task.inputs: + input_item.content = None + except ValidationError as ex: + logger.info( + f"Deserializing task {task_master_job_metadata.name} from JSON failed;" + f"{ex}" + ) + task = TesTask() + + task.id = task_master_job_metadata.name + task.state = self.extract_state_from_k8s_jobs(taskmaster_with_executors) + task.creation_time = ( + format_datetime(task_master_job_metadata.creation_timestamp) + if task_master_job_metadata.creation_timestamp + else None + ) + + # Create and append TesTaskLog + log = TesTaskLog() + task.logs.append(log) + + user_id = task_master_job_metadata.labels.get( + self.tesk_k8s_constants.label_constants.LABEL_USERID_KEY + ) + if user_id: + log.metadata[self.tesk_k8s_constants.label_constants.LABEL_USERID_KEY] = ( + user_id + ) + + group_name = task_master_job_metadata.labels.get( + self.tesk_k8s_constants.label_constants.LABEL_GROUPNAME_KEY + ) + if group_name: + log.metadata[ + self.tesk_k8s_constants.label_constants.LABEL_GROUPNAME_KEY + ] = group_name + + # Set start_time and end_time + start_time = task_master_job.status.start_time + log.start_time = format_datetime(start_time) if start_time else None + + completion_time = task_master_job.status.completion_time + log.end_time = format_datetime(completion_time) if completion_time else None + + # Extract and add executor logs + for executor_job in taskmaster_with_executors.get_executors(): + executor_log = self.extract_executor_log_from_k8s_job_and_pod(executor_job) + log.logs.append(executor_log) + + return task + + def get_name_of_first_running_pod(pod_list: V1PodList) -> Optional[str]: + """Retrieves the name of the first pod in the 'Running' phase. + + Args: + pod_list (V1PodList): The list of pods to search. + + Returns: + Optional[str]: The name of the first running pod, or None if not found. + """ + for pod in pod_list.items: + if pod.status.phase == "Running": + return pod.metadata.name + return None diff --git a/tesk/k8s/converter/data/build_strategy.py b/tesk/k8s/converter/data/build_strategy.py new file mode 100644 index 0000000..31ddc98 --- /dev/null +++ b/tesk/k8s/converter/data/build_strategy.py @@ -0,0 +1,77 @@ +"""Baseclass of strategy for building Kubernetes structure of task or list of tasks.""" + +from abc import ABC, abstractmethod +from typing import List + +from tesk.k8s.converter.data.job import Job +from tesk.k8s.converter.data.task import Task + + +class BuildStrategy(ABC): + """Baseclass of strategy for building Kubernetes structure of task or list of tasks. + + Aimed at building Kubernetes object structure of a task or a list of tasks, by + gradually adding to it objects returned by calls to Kubernetes API (jobs and pods). + Implementing classes are responsible for creating, storing, and maintaining the + actual Task object or Task object's list. + """ + + @abstractmethod + def add_taskmaster_job(self, taskmaster_job: Job) -> None: + """Add taskmaster job to the strategy. + + Implementing method should optionally filter and then place + the passed taskmaster's job object in the resulting structure. + + Args: + taskmaster_job: Job object representing the taskmaster's job. + """ + pass + + @abstractmethod + def add_executor_job(self, executor_job: Job) -> None: + """Add executor job to the strategy. + + Implementing method should optionally filter and then place + the passed executor's job object in the resulting structure (and match it to + appropriate taskmaster). + + Args: + executor_job: Job object representing the executor's job. + """ + pass + + @abstractmethod + def add_output_filer_job(self, filer_job: Job) -> None: + """Add output filer job to the strategy. + + Implementing method should filter and then place the passed filer's job + object in the resulting structure (and match it to + appropriate taskmaster). + + Args: + filer_job: Job object representing the filer's job. + """ + pass + + @abstractmethod + def get_task(self) -> Task: + """Get the task. + + Return a single Task composite object. + + Returns: + Task: Task object. + """ + pass + + @abstractmethod + def get_task_list(self) -> List[Task]: + """Get the task list. + + Return a list of Task composite objects. + + Returns: + List[Task]: List of Task objects. + """ + pass diff --git a/tesk/k8s/converter/data/job.py b/tesk/k8s/converter/data/job.py index 73251dd..b1a56a3 100644 --- a/tesk/k8s/converter/data/job.py +++ b/tesk/k8s/converter/data/job.py @@ -75,6 +75,6 @@ def get_job_name(self) -> Optional[str]: class JobStatus(Enum): """State of job.""" - ACTIVE = "Active" - SUCCEEDED = "Succeeded" - FAILED = "Failed" + ACTIVE = "active" + SUCCEEDED = "succeeded" + FAILED = "failed" diff --git a/tesk/k8s/converter/data/single_task_strategy.py b/tesk/k8s/converter/data/single_task_strategy.py new file mode 100644 index 0000000..a70d58a --- /dev/null +++ b/tesk/k8s/converter/data/single_task_strategy.py @@ -0,0 +1,79 @@ +"""Single task strategy for building Kubernetes object structure of a single task.""" + +from typing import List + +from tesk.k8s.converter.data.build_strategy import BuildStrategy +from tesk.k8s.converter.data.job import Job +from tesk.k8s.converter.data.task import Task + + +class SingleTaskStrategy(BuildStrategy): + """Single task strategy for building Kubernetes object structure of a single task. + + Aimed at building Kubernetes object structure of a single task. Job objects passed + to its methods must be prefiltered and belong to a single task (the class does not + perform job objects filtering itself). Pods must be added, when all jobs have + already been added. Thus, correct order of calls: + 1) taskmaster (`TaskBuilder.add_job_list` or `TaskBuilder.add_job`) + 2) executors and outputFiler (`TaskBuilder.add_job_list` or + `TaskBuilder.add_job`) + 3) pods by (`TaskBuilder.add_pod_list`) + + Arguments: + task: Task object. + """ + + def __init__(self): + """Initialise the SingleTaskStrategy.""" + self.task: Task = None + + def add_taskmaster_job(self, taskmaster_job: Job) -> None: + """Add taskmaster job to the strategy. + + Args: + taskmaster_job: Taskmaster job. + + Returns: + None + """ + self.task = Task(taskmaster=taskmaster_job) + + def add_executor_job(self, executor_job: Job) -> None: + """Add executor job to the strategy. + + Args: + executor_job: Executor job. + + Returns: + None + """ + if self.task: + self.task.add_executor(executor_job) + + def add_output_filer_job(self, executor_job: Job) -> None: + """Add output filer job to the strategy. + + Args: + executor_job: Output filer job. + + Returns: + None + """ + if self.task: + self.task.set_output_filer(executor_job) + + def get_task(self) -> Task: + """Get the task. + + Returns: + Task: Task object. + """ + return self.task + + def get_task_list(self) -> List[Task]: + """Get the task list. + + Returns: + List[Task]: List of Task objects. + """ + return [self.task] diff --git a/tesk/k8s/converter/data/task_builder.py b/tesk/k8s/converter/data/task_builder.py new file mode 100644 index 0000000..eee4e60 --- /dev/null +++ b/tesk/k8s/converter/data/task_builder.py @@ -0,0 +1,152 @@ +"""Builder aimed at building k8s object structure of a task or a list of tasks.""" + +from typing import Dict, List, Optional + +from kubernetes.client.models import V1Job, V1Pod, V1PodList + +from tesk.k8s.constants import tesk_k8s_constants +from tesk.k8s.converter.data.build_strategy import BuildStrategy +from tesk.k8s.converter.data.job import Job +from tesk.k8s.converter.data.single_task_strategy import SingleTaskStrategy +from tesk.k8s.converter.data.task import Task +from tesk.k8s.converter.data.task_list_strategy import TaskListStrategy + + +class TaskBuilder: + """Builder aimed at building k8s object structure of a task or a list of tasks. + + Done by gradually adding to it objects returned by calls to Kubernetes API (jobs and + pods).This class takes care of matching jobs with corresponding pods and holds a + flat collection (mapped by name) of resulting Job objects (can be both taskmasters + and executors belonging to the same or different task). Accepts a BuildStrategy, + which implementing classes are responsible of creating, storing and maintaining the + actual Task object or Task object's list by implementing + BuildStrategy.addTaskMasterJob(Job)and BuildStrategy.addExecutorJob(Job). + + Arguments: + build_strategy: BuildStrategy object responsible for creating the actual Task + object. + all_jobs_by_name: Dictionary of all jobs added to the builder, mapped by job name. + """ + + def __init__(self, build_strategy): + """Initialise the TaskBuilder. + + Args: + build_strategy: BuildStrategy object responsible for creating the actual + Task object. + """ + self.build_strategy: BuildStrategy = build_strategy + self.all_jobs_by_name: Dict[str, Job] = {} + + @classmethod + def new_single_task(cls): + """Use SingleTaskStrategy for building a single task.""" + return cls(SingleTaskStrategy()) + + @classmethod + def new_task_list(cls): + """Use TaskListStrategy for building a list of tasks.""" + return cls(TaskListStrategy()) + + def add_job(self, job: V1Job) -> "TaskBuilder": + """Add a job to the composite. + + Args: + job: K8s V1Job object for any of the job, ie taskmaster + executor or output filer. + """ + wrapped_job = Job(job) + labels: Optional[Dict[str, str]] = job.metadata.labels + assert labels, "Job metadata labels must be present" + job_type_label: str = labels.get( + tesk_k8s_constants.label_constants.LABEL_JOB_TYPE_KEY + ) + + # Add job to the strategy based on its type + match job_type_label: + case tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM: + self.build_strategy.add_taskmaster_job(wrapped_job) + case tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_EXEC: + self.build_strategy.add_executor_job(wrapped_job) + case _: + self.build_strategy.add_output_filer_job(wrapped_job) + + # If job is not already in the collection, add it + name: Optional[str] = job.metadata.name + assert name, "Job metadata name must be present" + if name not in self.all_jobs_by_name: + self.all_jobs_by_name[job.metadata.name] = wrapped_job + + return self + + def add_job_list(self, jobs: List[V1Job]) -> "TaskBuilder": + """Adds a list of jobs to the composite. + + Args: + jobs: List of K8s V1Job objects. + """ + for job in jobs: + self.add_job(job) + return self + + def add_pod_list(self, pods: List[V1PodList]) -> "TaskBuilder": + """Adds a list of pods to the composite. + + Tries to find a matching job for each pod, If there is a match, a pod is placed + in the Job object. Will accept a collection of any pods, the ones that don't + match get discarded. + + Args: + pods: List of K8s V1PodList objects. + """ + for pod in pods: + self.add_pod(pod) + return self + + def add_pod(self, pod: V1Pod) -> None: + """Adds a pod to the composite. + + Tries to find a matching job for it. If there is a match, a pod is placed in + the matching Job object. Match is done by comparing match labels + (only match expressions are ignored) of job's selector and comparing them with + labels of the pod. If all selectors of the job are present in pod's label set, + match is detected (first job-pod match stops search).Will accept also unmatching + pod, which won't get stored. + + Args: + pod: K8s V1Pod object. + """ + for job in self.all_jobs_by_name.values(): + selectors = job.get_job().spec.selector.match_labels or {} + pod_labels = pod.metadata.labels or {} + # Check if all selector key-value pairs are present in the pod's labels + if selectors.items() <= pod_labels.items(): + # Found a matching job; add the pod to this job + job.add_pod(pod) + break + + def get_task(self) -> Task: + """Get the task. + + Returns: + Task: Task object according to the strategy. + """ + return self.build_strategy.get_task() + + def get_task_list(self) -> List[Task]: + """Get the task list. + + Returns: + List[Task]: List of Task objects according to the strategy. + """ + return self.build_strategy.get_task_list() + + def get_all_jobs_by_name(self) -> Dict[str, Job]: + """Get all jobs. + + Returns: + Dict[str, Job]: Dictionary of all jobs added to the builder, + mapped by job name. + """ + return self.all_jobs_by_name diff --git a/tesk/k8s/converter/data/task_list_strategy.py b/tesk/k8s/converter/data/task_list_strategy.py new file mode 100644 index 0000000..96b9d76 --- /dev/null +++ b/tesk/k8s/converter/data/task_list_strategy.py @@ -0,0 +1,110 @@ +"""Strategy aimed at building Kubernetes object structure of a list of tasks.""" + +import logging +from typing import Dict, Optional + +from kubernetes.client.models import V1Job + +from tesk.k8s.constants import tesk_k8s_constants +from tesk.k8s.converter.data.build_strategy import BuildStrategy +from tesk.k8s.converter.data.job import Job +from tesk.k8s.converter.data.task import Task + +logger = logging.getLogger(__name__) + + +class TaskListStrategy(BuildStrategy): + """Strategy aimed at building Kubernetes object structure of a list of tasks. + + All taskmaster jobs with unique names passed to it will get stored. + Only those executor jobs that match already stored taskmaster jobs will be stored + (filtering done by taskmaster's name and corresponding executor's label). + Pods must be added, when all jobs have already been added. + Thus, correct order of calls: + 1) taskmasters (`TaskBuilder.add_job_list`) + 2) executors and outputFilers (`TaskBuilder.add_job_list`) + 3) pods by (`TaskBuilder.add_pod_list`) + + Arguments: + tasks_by_id: Dictionary of tasks by their IDs + """ + + def __init__(self): + """Initialise the TaskListStrategy.""" + self.tasks_by_id: Dict[str, Task] = {} + + def add_taskmaster_job(self, taskmaster_job: Job) -> None: + """Add taskmaster job as task the list. + + Args: + taskmaster_job: Taskmaster job. + + Returns: + None + """ + job: V1Job = taskmaster_job.get_job() + name: Optional[str] = job.metadata.labels.get( + tesk_k8s_constants.label_constants.LABEL_TESTASK_ID_KEY + ) + assert ( + name is not None + ), "LABEL_TESTASK_ID_KEY must be present in job metadata labels" + + if name not in self.tasks_by_id: + self.tasks_by_id[name] = Task(taskmaster=taskmaster_job) + + def add_executor_job(self, executor_job) -> None: + """Add executor job to its corresponding taskmaster. + + Args: + executor_job: Executor job. + + Returns: + None + """ + job: V1Job = executor_job.get_job() + name: Optional[str] = job.metadata.labels.get( + tesk_k8s_constants.label_constants.LABEL_TESTASK_ID_KEY + ) + assert ( + name is not None + ), "LABEL_TESTASK_ID_KEY must be present in job metadata labels" + + if name in self.tasks_by_id: + self.tasks_by_id[name].add_executor(executor_job) + + def add_output_filer_job(self, executor_job: Job) -> None: + """Add output filer job to its corresponding taskmaster. + + Args: + executor_job: Output filer job. + + Returns: + None + """ + try: + output_filer_suffix: int = executor_job.get_job_name().find( + tesk_k8s_constants.job_constants.JOB_NAME_FILER_SUF + ) + except ValueError as e: + logger.debug(f"Output filer job name does not contain suffix: {e}") + return + job_name = executor_job.get_job_name() + assert job_name is not None, "Job name should not be None" + taskmaster_name: str = job_name[:output_filer_suffix] + + taskmaster: Task = self.tasks_by_id.get(taskmaster_name) + if taskmaster: + taskmaster.set_output_filer(executor_job) + + def get_task(self): + """Unsupported method for TaskListStrategy.""" + raise NotImplementedError("Method is not supported for TaskListStrategy/") + + def get_task_list(self): + """Get the task list. + + Returns: + List[Task]: List of Task objects. + """ + return list(self.tasks_by_id.values()) diff --git a/tesk/utils.py b/tesk/utils.py index 8d0091c..da89d83 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -2,6 +2,7 @@ import json import os +from datetime import datetime, timezone from pathlib import Path from typing import List, Sequence @@ -187,3 +188,15 @@ def get_taskmaster_env_property() -> Taskmaster: def pydantic_model_list_dict(model_list: Sequence[BaseModel]) -> List[str]: """Convert a list of pydantic models to a list of dictionaries.""" return [json.loads(item.json()) for item in model_list] + + +def format_datetime(dt: datetime) -> str: + """Formats a datetime object into time with UTC timezone. + + Args: + dt: The datetime object to format. + + Returns: + str: The formatted datetime string. + """ + return dt.astimezone(timezone.utc).isoformat()