Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: list and get tasks #208

Open
wants to merge 1 commit into
base: createT
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions tesk/api/ga4gh/tes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@ class TesServiceInfo(Service):
type: Optional[TesServiceType] = None # type: ignore


class TesTaskMinimal(BaseModel):
"""TaskMinimal describes a minimal task object."""

id: str = Field(..., description="Task identifier assigned by the server.")
state: TesState = TesState.UNKNOWN


class TesTask(BaseModel):
"""Task describes a task to be run."""

Expand Down Expand Up @@ -568,3 +575,11 @@ class TesListTasksResponse(BaseModel):
description="Token used to return the next page of results. This value can be "
"used\nin the `page_token` field of the next ListTasks request.",
)


class TaskView(str, Enum):
"""Task view type."""

BASIC = "basic"
MINIMAL = "minimal"
FULL = "full"
77 changes: 77 additions & 0 deletions tesk/api/ga4gh/tes/task/get_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""TESK API module to retrieve a task."""

import logging
from typing import Optional

from kubernetes.client.models import V1Job, V1JobList, V1PodList

from tesk.api.ga4gh.tes.models import (
TaskView,
)
from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest
from tesk.k8s.converter.data.task_builder import TaskBuilder

logger = logging.getLogger(__name__)


class GetTesTask(TesTaskRequest):
"""Retrieve a TES task.

Arguments:
task_id: TES task ID.
task_view: Verbosity of the task in the list.
"""

def __init__(
self,
task_id: str,
task_view: TaskView,
):
"""Initialize the CreateTask class.

Args:
task_id: TES task ID.
task_view: Verbosity of the task in the list.
"""
super().__init__()
self.task_id = task_id
self.task_view = task_view

def handle_request(self):
"""Return the response."""
taskmaster_job: V1Job = self.kubernetes_client_wrapper.read_taskmaster_job(
self.task_id
)

taskmaster_job_name: Optional[str] = taskmaster_job.metadata.name
assert taskmaster_job_name is not None

execution_job_list: V1JobList = (
self.kubernetes_client_wrapper.list_single_task_executor_jobs(
taskmaster_job_name
)
)
taskmaster_pods: V1PodList = (
self.kubernetes_client_wrapper.list_single_job_pods(taskmaster_job)
)

task_builder = (
TaskBuilder.new_single_task()
.add_job(taskmaster_job)
.add_job_list(execution_job_list.items)
.add_pod_list(taskmaster_pods.items)
)

for executor_job in execution_job_list.items:
executor_job_pods: V1PodList = (
self.kubernetes_client_wrapper.list_single_job_pods(executor_job)
)
task_builder.add_pod_list(executor_job_pods.items)

output_filer_job_list: Optional[V1Job] = (
self.kubernetes_client_wrapper.get_single_task_output_filer_job(
taskmaster_job_name
)
)

return self._get_task(task_builder, output_filer_job_list, self.task_view)
81 changes: 81 additions & 0 deletions tesk/api/ga4gh/tes/task/list_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""TESK API modeule to list tasks."""

from typing import List

from kubernetes.client.models import V1JobList

from tesk.api.ga4gh.tes.models import TaskView, TesListTasksResponse, TesTask
from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest
from tesk.k8s.converter.data.task_builder import TaskBuilder


class ListTesTask(TesTaskRequest):
"""Gets a full list of tasks.

Performs Kubernetes API batch calls (all taskmasters, all executors, all pods),
combines them together into valid Task objects and converts to result with
means of the converter.

Arguments:
page_size: The maximum number of tasks to return in a single page.
page_token: The continuation token, which is used to page through large result
sets.
view: Verbosity of the task in the list.
namePrefix: The prefix of the task name.
"""

def __init__(
self,
page_size: int,
page_token: str,
view: TaskView,
namePrefix: str,
):
"""Initialise the ListTesTask class.

Args:
page_size: The maximum number of tasks to return in a single page.
page_token: The continuation token, which is used to page through large
result sets.
view: Verbosity of the task in the list.
namePrefix: The prefix of the task name.
"""
self.page_size = page_size
self.page_token = page_token
self.view = view
self.namePrefix = namePrefix

def handle_request(self) -> TesListTasksResponse:
"""Business logic for the list tasks request.

Returns:
TesListTasksResponse: The response containing the list of tasks and next
page token.
"""
taskmaster_jobs: V1JobList = (
self.kubernetes_client_wrapper.list_all_taskmaster_jobs_for_user(
page_token=self.page_token,
items_per_page=self.page_size,
)
)
executor_jobs: V1JobList = (
self.kubernetes_client_wrapper.list_all_task_executor_jobs()
)
filer_jobs: V1JobList = self.kubernetes_client_wrapper.list_all_filer_jobs()
job_pods = self.kubernetes_client_wrapper.list_all_job_pods()
task_list_builder: TaskBuilder = (
TaskBuilder.new_task_list()
.add_job_list(taskmaster_jobs.items())
.add_job_list(executor_jobs.items())
.add_job_list(filer_jobs.items())
.add_pod_list(job_pods.items())
)

tasks: List[TesTask] = [
self._get_task(task, self.view, True)
for task in task_list_builder.get_task_list()
]

next_page_token = taskmaster_jobs.metadata._continue()

return TesListTasksResponse(tasks=tasks, next_page_token=next_page_token)
47 changes: 47 additions & 0 deletions tesk/api/ga4gh/tes/task/task_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from tesk.k8s.constants import tesk_k8s_constants
from tesk.k8s.converter.converter import TesKubernetesConverter
from tesk.k8s.wrapper import KubernetesClientWrapper
from tesk.k8s.converter.data.task import Task
from tesk.api.ga4gh.tes.models import TaskView
from tesk.api.ga4gh.tes.models import TesTask

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,3 +46,47 @@ def response(self) -> dict:
except (TypeError, ValueError) as e:
logger.info(e)
return response.dict()

def _get_task(self, task_object: Task, view: TaskView, is_list: bool) -> TesTask:
"""Shared method to process Task objects into TesTask responses.

Args:
task_object: The Task object containing Kubernetes resources.
view: Verbosity level for the response.
is_list: Whether the response is a list of tasks.

Returns:
TesTask: The processed TES task.
"""
if view == TaskView.MINIMAL:
return self.tes_kubernetes_converter.from_k8s_jobs_to_tes_task_minimal(
task_object, is_list
)

task = self.tes_kubernetes_converter.from_k8s_jobs_to_tes_task_basic(
task_object, view == TaskView.BASIC
)

if view == TaskView.BASIC:
return task

for i, executor_job in enumerate(task_object.get_executors()):
if executor_job.has_pods():
executor_log = task.logs[0].logs[i]
if view == TaskView.FULL:
executor_pod_name = executor_job.get_first_pod().metadata.name
executor_pod_log = self.kubernetes_client_wrapper.read_pod_log(
executor_pod_name
)
if executor_pod_log:
executor_log.stdout = executor_pod_log

if task_object.taskmaster.has_pods():
taskmaster_pod_name = task_object.taskmaster.get_first_pod().metadata.name
taskmaster_pod_log = self.kubernetes_client_wrapper.read_pod_log(
taskmaster_pod_name
)
if taskmaster_pod_log:
task.logs[0].system_logs.append(taskmaster_pod_log)

return task
Loading