From 1f5c7c003173bc0748a2110477fc32682d437e48 Mon Sep 17 00:00:00 2001 From: Ivan Zubenko Date: Fri, 10 May 2024 11:16:47 +0200 Subject: [PATCH] ENG-29 get_available_jobs_count should take all pods into account (#909) --- .../templates/_helpers.tpl | 4 - .../templates/fluent-bit.yaml | 2 +- charts/platform-monitoring/values.yaml | 1 - minikube.sh | 1 - platform_monitoring/api.py | 1 - platform_monitoring/config.py | 1 - platform_monitoring/config_factory.py | 3 - platform_monitoring/jobs_service.py | 119 ++-- platform_monitoring/kube_client.py | 550 ++++++++++++------ setup.cfg | 4 +- tests/integration/conftest_kube.py | 7 +- tests/integration/test_kube.py | 10 +- tests/k8s/cluster.sh | 1 - tests/unit/test_config.py | 2 - tests/unit/test_jobs_service.py | 114 ++-- tests/unit/test_kube.py | 267 ++++++--- 16 files changed, 711 insertions(+), 376 deletions(-) diff --git a/charts/platform-monitoring/templates/_helpers.tpl b/charts/platform-monitoring/templates/_helpers.tpl index 4bb0a632..37b615da 100644 --- a/charts/platform-monitoring/templates/_helpers.tpl +++ b/charts/platform-monitoring/templates/_helpers.tpl @@ -120,10 +120,6 @@ release: {{ .Release.Name | quote }} - name: SENTRY_SAMPLE_RATE value: {{ .Values.sentry.sampleRate | default 0 | quote }} {{- end }} -{{- if .Values.nodeLabels.job }} -- name: NP_MONITORING_NODE_LABEL_JOB - value: {{ .Values.nodeLabels.job }} -{{- end }} {{- if .Values.nodeLabels.nodePool }} - name: NP_MONITORING_NODE_LABEL_NODE_POOL value: {{ .Values.nodeLabels.nodePool }} diff --git a/charts/platform-monitoring/templates/fluent-bit.yaml b/charts/platform-monitoring/templates/fluent-bit.yaml index 094983bc..cd39ab29 100644 --- a/charts/platform-monitoring/templates/fluent-bit.yaml +++ b/charts/platform-monitoring/templates/fluent-bit.yaml @@ -128,7 +128,7 @@ spec: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - - key: {{ .Values.nodeLabels.job | quote }} + - key: {{ .Values.nodeLabels.nodePool | quote }} operator: Exists {{- if .Values.fluentbit.tolerations }} tolerations: {{ toYaml .Values.fluentbit.tolerations | nindent 8 }} diff --git a/charts/platform-monitoring/values.yaml b/charts/platform-monitoring/values.yaml index 74c7f1d1..74f990e5 100644 --- a/charts/platform-monitoring/values.yaml +++ b/charts/platform-monitoring/values.yaml @@ -49,7 +49,6 @@ sentry: sampleRate: 0.002 nodeLabels: - job: platform.neuromation.io/job nodePool: platform.neuromation.io/nodepool fluentbit: diff --git a/minikube.sh b/minikube.sh index a47e3339..6027850e 100755 --- a/minikube.sh +++ b/minikube.sh @@ -15,7 +15,6 @@ function minikube::start { --wait-timeout=5m kubectl config use-context minikube kubectl get nodes -o name | xargs -I {} kubectl label {} --overwrite \ - platform.neuromation.io/job=true \ platform.neuromation.io/nodepool=minikube } diff --git a/platform_monitoring/api.py b/platform_monitoring/api.py index 777f0670..df132431 100644 --- a/platform_monitoring/api.py +++ b/platform_monitoring/api.py @@ -820,7 +820,6 @@ async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]: kube_client=kube_client, container_runtime_client_registry=container_runtime_client_registry, cluster_name=config.cluster_name, - kube_job_label=config.kube.job_label, kube_node_pool_label=config.kube.node_pool_label, ) app[MONITORING_APP_KEY][JOBS_SERVICE_KEY] = jobs_service diff --git a/platform_monitoring/config.py b/platform_monitoring/config.py index 426d8142..42913503 100644 --- a/platform_monitoring/config.py +++ b/platform_monitoring/config.py @@ -86,7 +86,6 @@ class KubeConfig: kubelet_node_port: int = 10250 nvidia_dcgm_node_port: int | None = None - job_label: str = "platform.neuromation.io/job" node_pool_label: str = "platform.neuromation.io/nodepool" diff --git a/platform_monitoring/config_factory.py b/platform_monitoring/config_factory.py index 90d8ac36..9b2d7631 100644 --- a/platform_monitoring/config_factory.py +++ b/platform_monitoring/config_factory.py @@ -170,9 +170,6 @@ def _create_kube(self) -> KubeConfig: if "NP_MONITORING_K8S_NVIDIA_DCGM_PORT" in self._environ else KubeConfig.nvidia_dcgm_node_port ), - job_label=self._environ.get( - "NP_MONITORING_NODE_LABEL_JOB", KubeConfig.job_label - ), node_pool_label=self._environ.get( "NP_MONITORING_NODE_LABEL_NODE_POOL", KubeConfig.node_pool_label ), diff --git a/platform_monitoring/jobs_service.py b/platform_monitoring/jobs_service.py index 0e98044d..3513b38e 100644 --- a/platform_monitoring/jobs_service.py +++ b/platform_monitoring/jobs_service.py @@ -1,8 +1,8 @@ import asyncio +from collections import defaultdict from collections.abc import AsyncGenerator, AsyncIterator, Mapping, Sequence from contextlib import AbstractAsyncContextManager, asynccontextmanager from dataclasses import dataclass -from functools import reduce import aiohttp from neuro_config_client import ConfigClient, ResourcePoolType @@ -14,7 +14,14 @@ ContainerRuntimeClientRegistry, ContainerRuntimeError, ) -from .kube_client import JobNotFoundException, KubeClient, Pod, Resources +from .kube_client import ( + DEFAULT_MAX_PODS_PER_NODE, + ContainerResources, + JobNotFoundException, + KubeClient, + NodeResources, + Pod, +) from .user import User from .utils import KubeHelper, asyncgeneratorcontextmanager @@ -44,12 +51,12 @@ def __init__(self, name: str) -> None: class JobsService: def __init__( self, + *, config_client: ConfigClient, jobs_client: JobsClient, kube_client: KubeClient, container_runtime_client_registry: ContainerRuntimeClientRegistry, cluster_name: str, - kube_job_label: str = KubeConfig.job_label, kube_node_pool_label: str = KubeConfig.node_pool_label, ) -> None: self._config_client = config_client @@ -58,7 +65,6 @@ def __init__( self._kube_helper = KubeHelper() self._container_runtime_client_registry = container_runtime_client_registry self._cluster_name = cluster_name - self._kube_job_label = kube_job_label self._kube_node_pool_label = kube_node_pool_label async def get(self, job_id: str) -> Job: @@ -91,7 +97,10 @@ async def save( cont_id = pod.get_container_id(pod_name) assert cont_id - runtime_client = await self._container_runtime_client_registry.get(pod.host_ip) + assert pod.status.host_ip + runtime_client = await self._container_runtime_client_registry.get( + pod.status.host_ip + ) try: async with runtime_client.commit( @@ -138,7 +147,10 @@ async def attach( if not pod.tty: tty = False - runtime_client = await self._container_runtime_client_registry.get(pod.host_ip) + assert pod.status.host_ip + runtime_client = await self._container_runtime_client_registry.get( + pod.status.host_ip + ) async with runtime_client.attach( cont_id, tty=tty, stdin=stdin, stdout=stdout, stderr=stderr @@ -161,7 +173,10 @@ async def exec( cont_id = pod.get_container_id(pod_name) assert cont_id - runtime_client = await self._container_runtime_client_registry.get(pod.host_ip) + assert pod.status.host_ip + runtime_client = await self._container_runtime_client_registry.get( + pod.status.host_ip + ) async with runtime_client.exec( cont_id, cmd, tty=tty, stdin=stdin, stdout=stdout, stderr=stderr @@ -175,7 +190,10 @@ async def kill(self, job: Job) -> None: cont_id = pod.get_container_id(pod_name) assert cont_id - runtime_client = await self._container_runtime_client_registry.get(pod.host_ip) + assert pod.status.host_ip + runtime_client = await self._container_runtime_client_registry.get( + pod.status.host_ip + ) await runtime_client.kill(cont_id) @@ -184,14 +202,14 @@ async def port_forward( ) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: pod_name = self._kube_helper.get_job_pod_name(job) pod = await self._get_running_jobs_pod(pod_name) - reader, writer = await asyncio.open_connection(pod.pod_ip, port) + reader, writer = await asyncio.open_connection(pod.status.pod_ip, port) return reader, writer async def _get_running_jobs_pod(self, job_id: str) -> Pod: pod: Pod | None try: pod = await self._kube_client.get_pod(job_id) - if not pod.is_phase_running: + if not pod.status.is_running: pod = None except JobNotFoundException: # job's pod does not exist: it might be already garbage-collected @@ -207,39 +225,47 @@ async def get_available_jobs_counts(self) -> Mapping[str, int]: result: dict[str, int] = {} cluster = await self._config_client.get_cluster(self._cluster_name) assert cluster.orchestrator is not None - resource_requests = await self._get_resource_requests_by_node_pool() + available_resources = await self._get_available_resources_by_node_pool() pool_types = {p.name: p for p in cluster.orchestrator.resource_pool_types} for preset in cluster.orchestrator.resource_presets: available_jobs_count = 0 - preset_resources = Resources( + preset_resources = ContainerResources( cpu_m=int(preset.cpu * 1000), memory=preset.memory, - gpu=preset.gpu or 0, + nvidia_gpu=preset.nvidia_gpu or 0, + amd_gpu=preset.amd_gpu or 0, ) - preset_pool_types = [pool_types[r] for r in preset.resource_affinity] - for node_pool in preset_pool_types: + node_pools = [pool_types[r] for r in preset.available_resource_pool_names] + for node_pool in node_pools: node_resource_limit = self._get_node_resource_limit(node_pool) - node_resource_requests = resource_requests.get(node_pool.name, []) - running_nodes_count = len(node_resource_requests) + node_pool_available_resources = available_resources.get( + node_pool.name, [] + ) + running_nodes_count = len(node_pool_available_resources) free_nodes_count = node_pool.max_size - running_nodes_count # get number of jobs that can be scheduled on running nodes # in the current node pool - for request in node_resource_requests: - available_resources = node_resource_limit.available(request) - available_jobs_count += available_resources.count(preset_resources) + for node_available_resources in node_pool_available_resources: + available_jobs_count += min( + node_available_resources // preset_resources, + node_available_resources.pods, + ) # get number of jobs that can be scheduled on free nodes # in the current node pool if free_nodes_count > 0: - available_jobs_count += ( - free_nodes_count * node_resource_limit.count(preset_resources) + available_jobs_count += free_nodes_count * min( + DEFAULT_MAX_PODS_PER_NODE, + node_resource_limit // preset_resources, ) result[preset.name] = available_jobs_count return result - async def _get_resource_requests_by_node_pool(self) -> dict[str, list[Resources]]: - result: dict[str, list[Resources]] = {} + async def _get_available_resources_by_node_pool( + self, + ) -> dict[str, list[NodeResources]]: + result: dict[str, list[NodeResources]] = defaultdict(list) pods = await self._kube_client.get_pods( - label_selector=self._kube_job_label, + all_namespaces=True, field_selector=",".join( ( "status.phase!=Failed", @@ -248,36 +274,41 @@ async def _get_resource_requests_by_node_pool(self) -> dict[str, list[Resources] ), ), ) - nodes = await self._kube_client.get_nodes(label_selector=self._kube_job_label) - for node_name, node_pods in self._group_pods_by_node(pods).items(): - if not node_name: - continue + nodes = await self._kube_client.get_nodes( + label_selector=self._kube_node_pool_label + ) + for node_name, node_pods in self._get_pods_by_node(pods).items(): for node in nodes: - if node.name == node_name: + if node.metadata.name == node_name: break else: raise NodeNotFoundException(node_name) - node_pool_name = node.get_label(self._kube_node_pool_label) + node_pool_name = node.metadata.labels.get(self._kube_node_pool_label) if not node_pool_name: # pragma: no coverage continue - pod_resources = [p.resource_requests for p in node_pods] - node_resources = reduce(Resources.add, pod_resources, Resources()) - result.setdefault(node_pool_name, []).append(node_resources) + resource_requests = sum( + (pod.resource_requests for pod in node_pods), ContainerResources() + ) + available_resources = node.status.allocatable - resource_requests + available_resources = available_resources.with_pods( + available_resources.pods - len(node_pods) + ) + result[node_pool_name].append(available_resources) return result - def _group_pods_by_node(self, pods: Sequence[Pod]) -> dict[str | None, list[Pod]]: - result: dict[str | None, list[Pod]] = {} + def _get_pods_by_node(self, pods: Sequence[Pod]) -> dict[str, list[Pod]]: + result: dict[str, list[Pod]] = defaultdict(list) for pod in pods: - group = result.get(pod.node_name) - if not group: - group = [] - result[pod.node_name] = group - group.append(pod) + if pod.spec.node_name: + result[pod.spec.node_name].append(pod) return result - def _get_node_resource_limit(self, node_pool: ResourcePoolType) -> Resources: - return Resources( + def _get_node_resource_limit( + self, node_pool: ResourcePoolType + ) -> ContainerResources: + return ContainerResources( cpu_m=int(node_pool.available_cpu * 1000), memory=node_pool.available_memory, - gpu=node_pool.gpu or 0, + nvidia_gpu=node_pool.nvidia_gpu or 0, + amd_gpu=node_pool.amd_gpu or 0, ) diff --git a/platform_monitoring/kube_client.py b/platform_monitoring/kube_client.py index a0c9d401..127e6acb 100644 --- a/platform_monitoring/kube_client.py +++ b/platform_monitoring/kube_client.py @@ -7,16 +7,15 @@ import re import ssl import typing as t -from collections.abc import AsyncIterator, Sequence +from collections.abc import AsyncIterator from contextlib import asynccontextmanager, suppress -from dataclasses import dataclass, field +from dataclasses import asdict, dataclass, field, replace from datetime import datetime from pathlib import Path from urllib.parse import quote_plus, urlsplit import aiohttp from aiohttp import ContentTypeError -from yarl import URL from .base import JobStats, Telemetry from .config import KubeClientAuthType, KubeConfig @@ -27,12 +26,26 @@ DEFAULT_MAX_PODS_PER_NODE = 110 +JSON: t.TypeAlias = dict[str, t.Any] + class KubeClientException(Exception): pass -class KubeClientUnauthorized(KubeClientException): +class KubeClientUnauthorizedException(KubeClientException): + pass + + +class ExpiredException(KubeClientException): + pass + + +class ResourceGoneException(KubeClientException): + pass + + +class ConflictException(KubeClientException): pass @@ -48,7 +61,47 @@ class JobNotFoundException(JobException): pass -class PodPhase(str, enum.Enum): +class Resource(t.Protocol): + @classmethod + def from_primitive(cls, payload: JSON) -> t.Self: ... + + +TResource = t.TypeVar("TResource", bound=Resource) + + +@dataclass(frozen=True) +class Metadata: + name: str | None = None + resource_version: str | None = None + labels: t.Mapping[str, str] = field(default_factory=dict) + + @classmethod + def from_primitive(cls, payload: JSON) -> Metadata: + return cls( + name=payload.get("name"), + resource_version=payload.get("resourceVersion"), + labels=payload.get("labels", {}), + ) + + +@dataclass(frozen=True) +class ListResult(t.Generic[TResource]): + metadata: Metadata + items: list[TResource] + + @classmethod + def from_primitive( + cls, payload: JSON, *, resource_cls: type[TResource] + ) -> ListResult[TResource]: + return cls( + metadata=Metadata.from_primitive(payload.get("metadata", {})), + items=[ + resource_cls.from_primitive(item) for item in payload.get("items", ()) + ], + ) + + +class PodPhase(enum.StrEnum): PENDING = "Pending" RUNNING = "Running" SUCCEEDED = "Succeeded" @@ -57,209 +110,308 @@ class PodPhase(str, enum.Enum): @dataclass(frozen=True) -class Resources: +class ContainerResources: cpu_m: int = 0 memory: int = 0 - gpu: int = 0 + nvidia_gpu: int = 0 + amd_gpu: int = 0 + + nvidia_gpu_key: t.ClassVar[str] = "nvidia.com/gpu" + amd_gpu_key: t.ClassVar[str] = "amd.com/gpu" + + @classmethod + def from_primitive(cls, payload: JSON) -> t.Self: + return cls( + cpu_m=cls._parse_cpu_m(str(payload.get("cpu", "0"))), + memory=cls._parse_memory(str(payload.get("memory", "0"))), + nvidia_gpu=int(payload.get(cls.nvidia_gpu_key, 0)), + amd_gpu=int(payload.get(cls.amd_gpu_key, 0)), + ) - def add(self, other: Resources) -> Resources: - return self.__class__( + @classmethod + def _parse_cpu_m(cls, value: str) -> int: + if value.endswith("m"): + return int(value[:-1]) + return int(float(value) * 1000) + + @classmethod + def _parse_memory(cls, memory: str) -> int: + try: + return int(memory) + except ValueError: + pass + for suffix, power in (("K", 1), ("M", 2), ("G", 3), ("T", 4), ("P", 5)): + if memory.endswith(suffix): + return int(memory[:-1]) * 1000**power + if memory.endswith(f"{suffix}i"): + return int(memory[:-2]) * 1024**power + msg = f"Memory unit for {memory} is not supported" + raise KubeClientException(msg) + + @property + def has_gpu(self) -> bool: + return self.nvidia_gpu != 0 or self.amd_gpu != 0 + + def __bool__(self) -> bool: + return ( + self.cpu_m != 0 + or self.memory != 0 + or self.nvidia_gpu != 0 + or self.amd_gpu != 0 + ) + + def __add__(self, other: ContainerResources) -> t.Self: + return replace( + self, cpu_m=self.cpu_m + other.cpu_m, memory=self.memory + other.memory, - gpu=self.gpu + other.gpu, + nvidia_gpu=self.nvidia_gpu + other.nvidia_gpu, + amd_gpu=self.amd_gpu + other.amd_gpu, ) - def available(self, used: Resources) -> Resources: - """Get amount of unused resources. - - Returns: - Resources: the difference between resources in {self} and {used} - """ - return self.__class__( + def __sub__(self, used: ContainerResources) -> t.Self: + return replace( + self, cpu_m=max(0, self.cpu_m - used.cpu_m), memory=max(0, self.memory - used.memory), - gpu=max(0, self.gpu - used.gpu), + nvidia_gpu=max(0, self.nvidia_gpu - used.nvidia_gpu), + amd_gpu=max(0, self.amd_gpu - used.amd_gpu), ) - def count(self, resources: Resources) -> int: - """Get the number of times a client can be provided - with the specified resources. - - Returns: - int: count - """ - if self.cpu_m == 0 and self.memory == 0 and self.gpu == 0: + def __floordiv__(self, resources: ContainerResources) -> int: + if not self: return 0 result = DEFAULT_MAX_PODS_PER_NODE if resources.cpu_m: result = min(result, self.cpu_m // resources.cpu_m) if resources.memory: result = min(result, self.memory // resources.memory) - if resources.gpu: - result = min(result, self.gpu // resources.gpu) + if resources.nvidia_gpu: + result = min(result, self.nvidia_gpu // resources.nvidia_gpu) + if resources.amd_gpu: + result = min(result, self.amd_gpu // resources.amd_gpu) return result -@dataclass(frozen=True) -class ProxyClient: - url: URL - session: aiohttp.ClientSession +class PodRestartPolicy(enum.StrEnum): + ALWAYS = "Always" + NEVER = "Never" + ON_FAILURE = "OnFailure" -class Pod: - def __init__(self, payload: dict[str, t.Any]) -> None: - self._payload = payload +@dataclass(frozen=True) +class Container: + resource_requests: ContainerResources + stdin: bool | None + stdin_once: bool | None + tty: bool | None - @property - def name(self) -> str: - return self._payload["metadata"]["name"] + @classmethod + def from_primitive(cls, payload: JSON) -> t.Self: + return cls( + resource_requests=ContainerResources.from_primitive( + payload.get("resources", {}).get("requests", {}) + ), + stdin=payload.get("stdin"), + stdin_once=payload.get("stdinOnce"), + tty=payload.get("tty"), + ) - @property - def node_name(self) -> str | None: - return self._payload["spec"].get("nodeName") - @property - def restart_policy(self) -> str: - return self._payload["spec"].get("restartPolicy") or "Never" +@dataclass(frozen=True) +class PodSpec: + node_name: str | None = None + restart_policy: PodRestartPolicy = PodRestartPolicy.ALWAYS + containers: t.Sequence[Container] = field(default_factory=list) - @property - def _status_payload(self) -> dict[str, t.Any]: - payload = self._payload.get("status") - if not payload: - msg = "Missing pod status" - raise ValueError(msg) - return payload + @classmethod + def from_primitive(cls, payload: JSON) -> t.Self: + return cls( + node_name=payload.get("nodeName"), + restart_policy=PodRestartPolicy( + payload.get("restartPolicy", cls.restart_policy) + ), + containers=[ + Container.from_primitive(c) for c in payload.get("containers", ()) + ], + ) - def get_container_status(self, name: str) -> dict[str, t.Any]: - for payload in self._status_payload.get("containerStatuses", []): - if payload["name"] == name: - return payload - return {} - def get_container_id(self, name: str) -> str | None: - id_ = self.get_container_status(name).get("containerID", "") - # NOTE: URL(id_).host is failing because the container id is too long - return id_.replace("docker://", "") or None +@dataclass(frozen=True) +class PodStatus: + phase: PodPhase + pod_ip: str | None + host_ip: str | None + container_statuses: t.Sequence[ContainerStatus] = field(default_factory=list) - @property - def phase(self) -> PodPhase: - return PodPhase(self._status_payload.get("phase", PodPhase.PENDING.value)) + @classmethod + def from_primitive(cls, payload: JSON) -> t.Self: + return cls( + phase=PodPhase(payload.get("phase", PodPhase.PENDING.value)), + pod_ip=payload.get("podIP") or None, + host_ip=payload.get("hostIP") or None, + container_statuses=[ + ContainerStatus.from_primitive(s) + for s in payload.get("containerStatuses", ()) + ], + ) @property - def is_phase_running(self) -> bool: - return self._status_payload.get("phase") == "Running" + def is_running(self) -> bool: + return self.phase == PodPhase.RUNNING - @property - def pod_ip(self) -> str: - return self._status_payload["podIP"] - @property - def host_ip(self) -> str: - return self._status_payload["hostIP"] +@dataclass(frozen=True) +class Pod(Resource): + metadata: Metadata + spec: PodSpec + status: PodStatus - @property - def resource_requests(self) -> Resources: - cpu_m = 0 - memory = 0 - gpu = 0 - for container in self._payload["spec"]["containers"]: - requests = container.get("resources", {}).get("requests") - if requests: - cpu_m += self._parse_cpu_m(requests.get("cpu", "0")) - memory += self._parse_memory(requests.get("memory", "0Mi")) - gpu += int(requests.get("nvidia.com/gpu", 0)) - return Resources(cpu_m=cpu_m, memory=memory, gpu=gpu) - - def _parse_cpu_m(self, value: str) -> int: - if value.endswith("m"): - return int(value[:-1]) - return int(float(value) * 1000) + @classmethod + def from_primitive(cls, payload: JSON) -> t.Self: + return cls( + metadata=Metadata.from_primitive(payload["metadata"]), + spec=PodSpec.from_primitive(payload["spec"]), + status=PodStatus.from_primitive(payload.get("status", {})), + ) - def _parse_memory(self, memory: str) -> int: - try: - memory_b = int(memory) - except ValueError as exc: - if memory.endswith("Ki"): - memory_b = int(memory[:-2]) * 1024 - elif memory.endswith("k"): - memory_b = int(memory[:-1]) * 1000 - elif memory.endswith("Mi"): - memory_b = int(memory[:-2]) * 1024**2 - elif memory.endswith("M"): - memory_b = int(memory[:-1]) * 1000**2 - elif memory.endswith("Gi"): - memory_b = int(memory[:-2]) * 1024**3 - elif memory.endswith("G"): - memory_b = int(memory[:-1]) * 1000**3 - elif memory.endswith("Ti"): - memory_b = int(memory[:-2]) * 1024**4 - elif memory.endswith("T"): - memory_b = int(memory[:-1]) * 1000**4 - else: - msg = "Memory unit is not supported" - raise KubeClientException(msg) from exc - return memory_b + def get_container_status(self, name: str) -> ContainerStatus: + for status in self.status.container_statuses: + if status.name == name: + break + else: + status = ContainerStatus(name=name) + return status.with_pod_restart_policy(self.spec.restart_policy) + + def get_container_id(self, name: str) -> str | None: + for status in self.status.container_statuses: + if status.name == name: + return status.container_id + return None + + @property + def resource_requests(self) -> ContainerResources: + return sum( + (c.resource_requests for c in self.spec.containers), ContainerResources() + ) @property def stdin(self) -> bool: - for container in self._payload["spec"]["containers"]: - stdin = container.get("stdin") - if stdin is not None: - return stdin + for container in self.spec.containers: + if container.stdin is not None: + return container.stdin return False @property def stdin_once(self) -> bool: - for container in self._payload["spec"]["containers"]: - stdin_once = container.get("stdinOnce") - if stdin_once is not None: - return stdin_once + for container in self.spec.containers: + if container.stdin_once is not None: + return container.stdin_once return False @property def tty(self) -> bool: - for container in self._payload["spec"]["containers"]: - tty = container.get("tty") - if tty is not None: - return tty + for container in self.spec.containers: + if container.tty is not None: + return container.tty return False -class Node: - def __init__(self, payload: dict[str, t.Any]) -> None: - self._payload = payload +@dataclass(frozen=True) +class NodeResources(ContainerResources): + pods: int = DEFAULT_MAX_PODS_PER_NODE + ephemeral_storage: int = 0 - @property - def name(self) -> str: - return self._payload["metadata"]["name"] + @classmethod + def from_primitive(cls, payload: JSON) -> t.Self: + resources = super().from_primitive(payload) + return cls( + **{ + **asdict(resources), + "pods": int(payload.get("pods", cls.pods)), + "ephemeral_storage": cls._parse_memory( + payload.get("ephemeral-storage", cls.ephemeral_storage) + ), + } + ) - @property - def container_runtime_version(self) -> str: - return self._payload["status"]["nodeInfo"]["containerRuntimeVersion"] + def with_pods(self, value: int) -> t.Self: + return replace(self, pods=max(0, value)) - def get_label(self, key: str) -> str | None: - return self._payload["metadata"].get("labels", {}).get(key) +@dataclass(frozen=True) +class NodeStatus: + @dataclass(frozen=True) + class NodeInfo: + container_runtime_version: str + + @classmethod + def from_primitive(cls, payload: JSON) -> t.Self: + return cls( + container_runtime_version=payload["containerRuntimeVersion"], + ) + + capacity: NodeResources + allocatable: NodeResources + node_info: NodeInfo + + @classmethod + def from_primitive(cls, payload: JSON) -> t.Self: + return cls( + capacity=NodeResources.from_primitive(payload.get("capacity", {})), + allocatable=NodeResources.from_primitive(payload.get("allocatable", {})), + node_info=cls.NodeInfo.from_primitive(payload["nodeInfo"]), + ) + + +@dataclass(frozen=True) +class Node(Resource): + metadata: Metadata + status: NodeStatus + @classmethod + def from_primitive(cls, payload: JSON) -> Node: + return cls( + metadata=Metadata.from_primitive(payload["metadata"]), + status=NodeStatus.from_primitive(payload["status"]), + ) + + +@dataclass(frozen=True) class ContainerStatus: - def __init__(self, payload: dict[str, t.Any], restart_policy: str) -> None: - self._payload = payload - self._restart_policy = restart_policy + name: str + container_id: str | None = None + restart_count: int = 0 + state: t.Mapping[str, t.Any] = field(default_factory=dict) + last_state: t.Mapping[str, t.Any] = field(default_factory=dict) + + pod_restart_policy: PodRestartPolicy = PodRestartPolicy.ALWAYS + + @classmethod + def from_primitive(cls, payload: JSON) -> t.Self: + return cls( + name=payload["name"], + container_id=payload.get("containerID", "").replace("docker://", "") + or None, + restart_count=payload.get("restartCount", 0), + state=payload.get("state", {}), + last_state=payload.get("lastState", {}), + ) + + def with_pod_restart_policy(self, value: PodRestartPolicy) -> t.Self: + return replace(self, pod_restart_policy=value) @property def is_waiting(self) -> bool: - state = self._payload.get("state") - return "waiting" in state if state else True + return "waiting" in self.state if self.state else True @property def is_running(self) -> bool: - state = self._payload.get("state") - return "running" in state if state else False + return "running" in self.state @property def is_terminated(self) -> bool: - state = self._payload.get("state") - return "terminated" in state if state else False + return "terminated" in self.state @property def is_pod_terminated(self) -> bool: @@ -267,27 +419,23 @@ def is_pod_terminated(self) -> bool: @property def can_restart(self) -> bool: - if self._restart_policy == "Never": + if self.pod_restart_policy == PodRestartPolicy.NEVER: return False - if self._restart_policy == "Always": + if self.pod_restart_policy == PodRestartPolicy.ALWAYS: return True - assert self._restart_policy == "OnFailure" + assert self.pod_restart_policy == PodRestartPolicy.ON_FAILURE try: - return self._payload["state"]["terminated"]["exitCode"] != 0 + return self.state["terminated"]["exitCode"] != 0 except KeyError: return True - @property - def restart_count(self) -> int: - return self._payload.get("restartCount") or 0 - @property def started_at(self) -> datetime | None: try: if self.is_running: - date_str = self._payload["state"]["running"]["startedAt"] + date_str = self.state["running"]["startedAt"] else: - date_str = self._payload["state"]["terminated"]["startedAt"] + date_str = self.state["terminated"]["startedAt"] if not date_str: return None except KeyError: @@ -299,9 +447,9 @@ def started_at(self) -> datetime | None: def finished_at(self) -> datetime | None: try: if self.is_terminated: - date_str = self._payload["state"]["terminated"]["finishedAt"] + date_str = self.state["terminated"]["finishedAt"] else: - date_str = self._payload["lastState"]["terminated"]["finishedAt"] + date_str = self.last_state["terminated"]["finishedAt"] if not date_str: return None except KeyError: @@ -445,10 +593,14 @@ def _namespace_url(self) -> str: @property def _pods_url(self) -> str: + return f"{self._api_v1_url}/pods" + + @property + def _namespaced_pods_url(self) -> str: return f"{self._namespace_url}/pods" def _generate_pod_url(self, pod_name: str) -> str: - return f"{self._pods_url}/{pod_name}" + return f"{self._namespaced_pods_url}/{pod_name}" def _generate_node_proxy_url(self, name: str, port: int) -> str: return f"{self._api_v1_url}/nodes/{name}:{port}/proxy" @@ -493,19 +645,17 @@ async def get_raw_pod(self, pod_name: str) -> dict[str, t.Any]: return payload async def get_pod(self, pod_name: str) -> Pod: - return Pod(await self.get_raw_pod(pod_name)) + payload = await self.get_raw_pod(pod_name) + return Pod.from_primitive(payload) async def _get_raw_container_state(self, pod_name: str) -> dict[str, t.Any]: pod = await self.get_pod(pod_name) container_status = pod.get_container_status(pod_name) - return container_status.get("state", {}) + return {**container_status.state} async def get_container_status(self, name: str) -> ContainerStatus: pod = await self.get_pod(name) - return ContainerStatus( - pod.get_container_status(name), - restart_policy=pod.restart_policy, - ) + return pod.get_container_status(name) async def wait_pod_is_running( self, pod_name: str, *, timeout_s: float = 10.0 * 60, interval_s: float = 1.0 @@ -619,13 +769,22 @@ async def create_pod_container_logs_stream( yield response.content async def get_pods( - self, label_selector: str = "", field_selector: str = "" - ) -> Sequence[Pod]: + self, + *, + all_namespaces: bool = False, + label_selector: str | None = None, + field_selector: str | None = None, + ) -> list[Pod]: + url = self._pods_url if all_namespaces else self._namespaced_pods_url params = {} if label_selector: params["labelSelector"] = label_selector if field_selector: params["fieldSelector"] = field_selector + payload = await self._request(method="get", url=url, params=params) + self._assert_resource_kind("PodList", payload) + pod_list = ListResult.from_primitive(payload, resource_cls=Pod) + return pod_list.items payload = await self._request(method="get", url=self._pods_url, params=params) self._assert_resource_kind("PodList", payload) return [Pod(p) for p in payload["items"]] @@ -633,18 +792,21 @@ async def get_pods( async def get_node(self, name: str) -> Node: payload = await self._request(method="get", url=self._generate_node_url(name)) self._assert_resource_kind("Node", payload) - return Node(payload) + return Node.from_primitive(payload) - async def get_nodes(self, label_selector: str = "") -> Sequence[Node]: + async def get_nodes(self, *, label_selector: str | None = None) -> list[Node]: params = None if label_selector: params = {"labelSelector": label_selector} payload = await self._request(method="get", url=self._nodes_url, params=params) self._assert_resource_kind("NodeList", payload) + node_list = ListResult.from_primitive(payload, resource_cls=Node) + return node_list.items + self._assert_resource_kind("NodeList", payload) return [Node(item) for item in payload["items"]] async def _check_response_status( - self, response: aiohttp.ClientResponse, job_id: str = "" + self, response: aiohttp.ClientResponse, job_id: str | None = None ) -> None: if not 200 <= response.status < 300: payload = await response.text() @@ -653,42 +815,50 @@ async def _check_response_status( except ValueError: pod = {"code": response.status, "message": payload} self._raise_for_status(pod, job_id=job_id) - raise KubeClientException(payload) def _assert_resource_kind( - self, expected_kind: str, payload: dict[str, t.Any], job_id: str = "" + self, expected_kind: str, payload: JSON, job_id: str | None = None ) -> None: kind = payload["kind"] if kind == "Status": self._raise_for_status(payload, job_id=job_id) - msg = "unexpected error" - raise JobError(msg) - if kind != expected_kind: + elif kind != expected_kind: msg = f"unknown kind: {kind}" raise ValueError(msg) - def _raise_for_status(self, payload: dict[str, t.Any], job_id: str | None) -> None: - if payload["code"] == 400: + def _raise_for_status(self, payload: JSON, job_id: str | None = None) -> t.NoReturn: # noqa: C901 + code = payload["code"] + reason = payload.get("reason") + if code == 400 and job_id: if "ContainerCreating" in payload["message"]: - msg = f"job '{job_id}' was not created yet" + msg = f"Job '{job_id}' has not been created yet" raise JobNotFoundException(msg) if "is not available" in payload["message"]: - msg = f"job '{job_id}' has not created yet" + msg = f"Job '{job_id}' is not available" raise JobNotFoundException(msg) if "is terminated" in payload["message"]: - msg = f"job '{job_id}' is terminated" + msg = f"Job '{job_id}' is terminated" raise JobNotFoundException(msg) - elif payload["code"] == 401: - raise KubeClientUnauthorized(payload) - elif payload["code"] == 404: - msg = f"job '{job_id}' was not found" + elif code == 401: + raise KubeClientUnauthorizedException(payload) + elif code == 404 and job_id: + msg = f"Job '{job_id}' not found" raise JobNotFoundException(msg) - elif payload["code"] == 409: - msg = f"job '{job_id}' already exist" + elif code == 404: + raise JobNotFoundException(payload) + elif code == 409 and job_id: + msg = f"Job '{job_id}' already exists" raise JobError(msg) - elif payload["code"] == 422: - msg = f"can not create job with id '{job_id}'" + elif code == 409: + raise ConflictException(payload) + elif code == 410: + raise ResourceGoneException(payload) + elif code == 422: + msg = f"Cannot create job with id '{job_id}'" raise JobError(msg) + elif reason == "Expired": + raise ExpiredException(payload) + raise KubeClientException(payload) @dataclass(frozen=True) @@ -827,11 +997,11 @@ def __init__( async def get_latest_stats(self) -> JobStats | None: pod = await self._kube_client.get_pod(self._pod_name) - if not pod.node_name: + if not pod.spec.node_name: return None - if pod.resource_requests.gpu: - return await self._get_latest_gpu_pod_stats(pod.node_name) - return await self._get_latest_cpu_pod_stats(pod.node_name) + if pod.resource_requests.has_gpu: + return await self._get_latest_gpu_pod_stats(pod.spec.node_name) + return await self._get_latest_cpu_pod_stats(pod.spec.node_name) async def _get_latest_cpu_pod_stats(self, node_name: str) -> JobStats | None: pod_stats = await self._kube_client.get_pod_container_stats( diff --git a/setup.cfg b/setup.cfg index 81a5081c..6aabf19e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,7 +21,7 @@ install_requires = elasticsearch<8.0.0 iso8601==2.1.0 neuro-auth-client==22.6.1 - neuro-config-client==23.9.0 + neuro-config-client==24.4.3 neuro-logging==24.4.0 neuro-sdk==22.7.1 orjson @@ -41,7 +41,7 @@ console_scripts = dev = mypy pre-commit - pytest==7.4.0 + pytest==8.2.0 pytest-asyncio==0.21.2 pytest-cov==5.0.0 python-jose==3.3.0 diff --git a/tests/integration/conftest_kube.py b/tests/integration/conftest_kube.py index 1769fd65..d05ece7f 100644 --- a/tests/integration/conftest_kube.py +++ b/tests/integration/conftest_kube.py @@ -23,7 +23,7 @@ class MyKubeClient(KubeClient): async def create_pod(self, job_pod_descriptor: dict[str, Any]) -> str: payload = await self._request( - method="POST", url=self._pods_url, json=job_pod_descriptor + method="POST", url=self._namespaced_pods_url, json=job_pod_descriptor ) self._assert_resource_kind(expected_kind="Pod", payload=payload) return self._parse_pod_status(payload) @@ -248,11 +248,12 @@ async def _kube_node(kube_client: KubeClient) -> Node: @pytest.fixture() async def kube_node_name(_kube_node: Node) -> str: - return _kube_node.name + assert _kube_node.metadata.name + return _kube_node.metadata.name @pytest.fixture() async def kube_container_runtime(_kube_node: Node) -> str: - version = _kube_node.container_runtime_version + version = _kube_node.status.node_info.container_runtime_version end = version.find("://") return version[0:end] diff --git a/tests/integration/test_kube.py b/tests/integration/test_kube.py index 40a28054..50ee66f8 100644 --- a/tests/integration/test_kube.py +++ b/tests/integration/test_kube.py @@ -572,7 +572,9 @@ async def test_get_nodes(self, kube_client: MyKubeClient) -> None: nodes = await kube_client.get_nodes(label_selector="kubernetes.io/os=linux") assert nodes - assert all(node.get_label("kubernetes.io/os") == "linux" for node in nodes) + assert all( + node.metadata.labels.get("kubernetes.io/os") == "linux" for node in nodes + ) async def test_get_pods( self, kube_client: MyKubeClient, job_pod: MyPodDescriptor @@ -582,11 +584,11 @@ async def test_get_pods( pods = await kube_client.get_pods() assert pods - assert any(pod.name == job_pod.name for pod in pods) + assert any(pod.metadata.name == job_pod.name for pod in pods) pods = await kube_client.get_pods(label_selector=f"job={job_pod.name}") assert len(pods) == 1 - assert pods[0].name == job_pod.name + assert pods[0].metadata.name == job_pod.name pods = await kube_client.get_pods( field_selector=",".join( @@ -599,7 +601,7 @@ async def test_get_pods( ) assert pods assert all( - pod.phase in (PodPhase.PENDING, PodPhase.RUNNING) for pod in pods + pod.status.phase in (PodPhase.PENDING, PodPhase.RUNNING) for pod in pods ) finally: await kube_client.delete_pod(job_pod.name) diff --git a/tests/k8s/cluster.sh b/tests/k8s/cluster.sh index e742121c..62ee8260 100755 --- a/tests/k8s/cluster.sh +++ b/tests/k8s/cluster.sh @@ -32,7 +32,6 @@ function k8s::start { --wait-timeout=5m kubectl config use-context minikube kubectl get nodes -o name | xargs -I {} kubectl label {} --overwrite \ - platform.neuromation.io/job=true \ platform.neuromation.io/nodepool=minikube } diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index b833544e..ea36fd2a 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -116,12 +116,10 @@ def test_create_without_auth_url(environ: dict[str, Any]) -> None: def test_create_with_kubernetes_labels(environ: dict[str, Any]) -> None: - environ["NP_MONITORING_NODE_LABEL_JOB"] = "job" environ["NP_MONITORING_NODE_LABEL_NODE_POOL"] = "node-pool" config = EnvironConfigFactory(environ).create() - assert config.kube.job_label == "job" assert config.kube.node_pool_label == "node-pool" diff --git a/tests/unit/test_jobs_service.py b/tests/unit/test_jobs_service.py index ce9c76c5..77be5662 100644 --- a/tests/unit/test_jobs_service.py +++ b/tests/unit/test_jobs_service.py @@ -21,27 +21,45 @@ from platform_monitoring.kube_client import KubeClient, Node, Pod -def create_node(node_pool_name: str, node_name: str) -> Node: - return Node( +def create_node( + node_pool_name: str, + node_name: str, + cpu: float, + memory: int, + nvidia_gpu: int = 0, + amd_gpu: int = 0, +) -> Node: + return Node.from_primitive( { "metadata": { "name": node_name, "labels": { - "platform.neuromation.io/job": "true", "platform.neuromation.io/nodepool": node_pool_name, }, - } + }, + "status": { + "allocatable": { + "cpu": str(cpu), + "memory": str(memory), + "nvidia.com/gpu": str(nvidia_gpu), + "amd.com/gpu": str(amd_gpu), + }, + "nodeInfo": {"containerRuntimeVersion": "containerd"}, + }, } ) def get_pods_factory( *pods: Pod, -) -> Callable[[str, str], Awaitable[Sequence[Pod]]]: +) -> Callable[[bool, str, str], Awaitable[Sequence[Pod]]]: async def _get_pods( - label_selector: str = "", field_selector: str = "" + all_namespaces: bool = False, # noqa: FBT001 FBT002 + label_selector: str | None = None, + field_selector: str | None = None, ) -> Sequence[Pod]: - assert label_selector == "platform.neuromation.io/job" + assert all_namespaces is True + assert label_selector is None assert field_selector == ( "status.phase!=Failed,status.phase!=Succeeded,status.phase!=Unknown" ) @@ -54,15 +72,18 @@ def create_pod( node_name: str | None, cpu_m: int, memory: int, - gpu: int = 0, + nvidia_gpu: int = 0, + amd_gpu: int = 0, ) -> Pod: job_id = f"job-{uuid.uuid4()}" resources = { "cpu": f"{cpu_m}m", "memory": f"{memory}", } - if gpu: - resources["nvidia.com/gpu"] = str(gpu) + if nvidia_gpu: + resources["nvidia.com/gpu"] = str(nvidia_gpu) + if amd_gpu: + resources["amd.com/gpu"] = str(amd_gpu) payload: dict[str, Any] = { "metadata": { "name": job_id, @@ -73,7 +94,7 @@ def create_pod( } if node_name: payload["spec"]["nodeName"] = node_name - return Pod(payload) + return Pod.from_primitive(payload) @pytest.fixture() @@ -89,19 +110,14 @@ def cluster() -> Cluster: job_schedule_timeout_s=1, job_schedule_scale_up_timeout_s=1, resource_pool_types=[ - ResourcePoolType( - name="minikube-cpu", - max_size=2, - available_cpu=1, - available_memory=2**30, - ), + ResourcePoolType(name="minikube-cpu", max_size=2, cpu=1, memory=2**30), ResourcePoolType( name="minikube-gpu", max_size=2, - available_cpu=1, - available_memory=2**30, - gpu=1, - gpu_model="nvidia-tesla-k80", + cpu=1, + memory=2**30, + nvidia_gpu=1, + amd_gpu=2, ), ], resource_presets=[ @@ -110,7 +126,7 @@ def cluster() -> Cluster: credits_per_hour=Decimal(10), cpu=0.2, memory=100 * 2**20, - resource_affinity=["minikube-cpu"], + available_resource_pool_names=["minikube-cpu"], ), ResourcePreset( name="cpu-p", @@ -121,13 +137,20 @@ def cluster() -> Cluster: preemptible_node=True, ), ResourcePreset( - name="gpu", + name="nvidia-gpu", + credits_per_hour=Decimal(10), + cpu=0.2, + memory=100 * 2**20, + nvidia_gpu=1, + available_resource_pool_names=["minikube-gpu"], + ), + ResourcePreset( + name="amd-gpu", credits_per_hour=Decimal(10), cpu=0.2, memory=100 * 2**20, - gpu=1, - gpu_model="nvidia-tesla-k80", - resource_affinity=["minikube-gpu"], + amd_gpu=1, + available_resource_pool_names=["minikube-gpu"], ), ], ), @@ -156,12 +179,26 @@ def jobs_client() -> mock.Mock: @pytest.fixture() def kube_client() -> mock.Mock: async def get_nodes(label_selector: str = "") -> Sequence[Node]: - assert label_selector == "platform.neuromation.io/job" + assert label_selector == "platform.neuromation.io/nodepool" return [ - create_node("minikube-cpu", "minikube-cpu-1"), - create_node("minikube-cpu", "minikube-cpu-2"), - create_node("minikube-gpu", "minikube-gpu-1"), - create_node("minikube-gpu", "minikube-gpu-2"), + create_node("minikube-cpu", "minikube-cpu-1", cpu=1, memory=2**30), + create_node("minikube-cpu", "minikube-cpu-2", cpu=1, memory=2**30), + create_node( + "minikube-gpu", + "minikube-gpu-1", + cpu=1, + memory=2**30, + nvidia_gpu=1, + amd_gpu=2, + ), + create_node( + "minikube-gpu", + "minikube-gpu-2", + cpu=1, + memory=2**30, + nvidia_gpu=1, + amd_gpu=2, + ), ] client = mock.Mock(spec=KubeClient) @@ -191,20 +228,21 @@ async def test_get_available_jobs_count( ) -> None: kube_client.get_pods.side_effect = get_pods_factory( create_pod("minikube-cpu-1", cpu_m=50, memory=128 * 2**20), - create_pod("minikube-gpu-1", cpu_m=100, memory=256 * 2**20, gpu=1), + create_pod("minikube-gpu-1", cpu_m=100, memory=256 * 2**20, nvidia_gpu=1), create_pod("minikube-cpu-2", cpu_m=50, memory=128 * 2**20), + create_pod("minikube-gpu-1", cpu_m=100, memory=256 * 2**20, amd_gpu=1), ) result = await service.get_available_jobs_counts() - assert result == {"cpu": 8, "gpu": 1, "cpu-p": 0} + assert result == {"cpu": 8, "nvidia-gpu": 1, "amd-gpu": 3, "cpu-p": 0} async def test_get_available_jobs_count_free_nodes( self, service: JobsService ) -> None: result = await service.get_available_jobs_counts() - assert result == {"cpu": 10, "gpu": 2, "cpu-p": 0} + assert result == {"cpu": 10, "nvidia-gpu": 2, "amd-gpu": 4, "cpu-p": 0} async def test_get_available_jobs_count_busy_nodes( self, service: JobsService, kube_client: mock.Mock @@ -212,13 +250,13 @@ async def test_get_available_jobs_count_busy_nodes( kube_client.get_pods.side_effect = get_pods_factory( create_pod("minikube-cpu-1", cpu_m=1000, memory=2**30), create_pod("minikube-cpu-2", cpu_m=1000, memory=2**30), - create_pod("minikube-gpu-1", cpu_m=1000, memory=2**30, gpu=1), - create_pod("minikube-gpu-2", cpu_m=1000, memory=2**30, gpu=1), + create_pod("minikube-gpu-1", cpu_m=1000, memory=2**30, nvidia_gpu=1), + create_pod("minikube-gpu-2", cpu_m=1000, memory=2**30, nvidia_gpu=1), ) result = await service.get_available_jobs_counts() - assert result == {"cpu": 0, "gpu": 0, "cpu-p": 0} + assert result == {"cpu": 0, "nvidia-gpu": 0, "amd-gpu": 0, "cpu-p": 0} async def test_get_available_jobs_count_for_pods_without_nodes( self, service: JobsService, kube_client: mock.Mock @@ -229,7 +267,7 @@ async def test_get_available_jobs_count_for_pods_without_nodes( result = await service.get_available_jobs_counts() - assert result == {"cpu": 10, "gpu": 2, "cpu-p": 0} + assert result == {"cpu": 10, "nvidia-gpu": 2, "amd-gpu": 4, "cpu-p": 0} async def test_get_available_jobs_count_node_not_found( self, service: JobsService, kube_client: mock.Mock diff --git a/tests/unit/test_kube.py b/tests/unit/test_kube.py index 00b93307..d731c3f0 100644 --- a/tests/unit/test_kube.py +++ b/tests/unit/test_kube.py @@ -6,6 +6,8 @@ import pytest from platform_monitoring.kube_client import ( + ContainerResources, + ContainerStatus, GPUCounter, GPUCounters, JobError, @@ -13,7 +15,7 @@ Pod, PodContainerStats, PodPhase, - Resources, + PodRestartPolicy, StatsSummary, ) from platform_monitoring.logs import filter_out_rpc_error @@ -21,45 +23,66 @@ class TestPod: def test_no_node_name(self) -> None: - pod = Pod({"spec": {}}) - assert pod.node_name is None + pod = Pod.from_primitive({"metadata": {"name": "pod"}, "spec": {}}) + assert pod.spec.node_name is None def test_node_name(self) -> None: - pod = Pod({"spec": {"nodeName": "testnode"}}) - assert pod.node_name == "testnode" + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": {"nodeName": "testnode"}, + } + ) + assert pod.spec.node_name == "testnode" def test_no_status(self) -> None: - pod = Pod({"spec": {}}) - with pytest.raises(ValueError, match="Missing pod status"): - pod.get_container_status("testcontainer") + pod = Pod.from_primitive({"metadata": {"name": "pod"}, "spec": {}}) + assert pod.get_container_status("testcontainer") == ContainerStatus( + name="testcontainer" + ) def test_no_container_status(self) -> None: - pod = Pod({"spec": {}, "status": {"containerStatuses": []}}) - container_status = pod.get_container_status("testcontainer") - assert container_status == {} + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": {}, + "status": {"containerStatuses": []}, + } + ) + assert pod.get_container_status("testcontainer") == ContainerStatus( + name="testcontainer" + ) def test_container_status(self) -> None: - pod = Pod( + pod = Pod.from_primitive( { - "spec": {}, + "metadata": {"name": "pod"}, + "spec": {"restartPolicy": "Never"}, "status": { "containerStatuses": [{"name": ""}, {"name": "testcontainer"}] }, } ) container_status = pod.get_container_status("testcontainer") - assert container_status == {"name": "testcontainer"} + assert container_status == ContainerStatus( + name="testcontainer", pod_restart_policy=PodRestartPolicy.NEVER + ) def test_no_container_id(self) -> None: - pod = Pod( - {"spec": {}, "status": {"containerStatuses": [{"name": "testcontainer"}]}} + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": {}, + "status": {"containerStatuses": [{"name": "testcontainer"}]}, + } ) container_id = pod.get_container_id("testcontainer") assert container_id is None def test_container_id(self) -> None: - pod = Pod( + pod = Pod.from_primitive( { + "metadata": {"name": "pod"}, "spec": {}, "status": { "containerStatuses": [ @@ -75,72 +98,129 @@ def test_container_id(self) -> None: assert container_id == "testcontainerid" def test_phase(self) -> None: - pod = Pod({"spec": {}, "status": {"phase": "Running"}}) - assert pod.phase == PodPhase.RUNNING + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": {}, + "status": {"phase": "Running"}, + } + ) + assert pod.status.phase == PodPhase.RUNNING - def test_is_phase_running_false(self) -> None: - pod = Pod({"spec": {}, "status": {"phase": "Pending"}}) - assert not pod.is_phase_running + def test_phase_is_running_false(self) -> None: + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": {}, + "status": {"phase": "Pending"}, + } + ) + assert not pod.status.is_running - def test_is_phase_running(self) -> None: - pod = Pod({"spec": {}, "status": {"phase": "Running"}}) - assert pod.is_phase_running + def test_phase_is_running(self) -> None: + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": {}, + "status": {"phase": "Running"}, + } + ) + assert pod.status.is_running def test_no_resource_requests(self) -> None: - pod = Pod({"spec": {"containers": [{"resources": {}}]}}) - assert pod.resource_requests == Resources() + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": {"containers": [{"resources": {}}]}, + } + ) + assert pod.resource_requests == ContainerResources() def test_resource_requests_cpu_milicores(self) -> None: - pod = Pod( - {"spec": {"containers": [{"resources": {"requests": {"cpu": "100m"}}}]}} + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": {"containers": [{"resources": {"requests": {"cpu": "100m"}}}]}, + } ) - assert pod.resource_requests == Resources(cpu_m=100) + assert pod.resource_requests == ContainerResources(cpu_m=100) def test_resource_requests_cpu_cores(self) -> None: - pod = Pod({"spec": {"containers": [{"resources": {"requests": {"cpu": "1"}}}]}}) - assert pod.resource_requests == Resources(cpu_m=1000) + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": {"containers": [{"resources": {"requests": {"cpu": "1"}}}]}, + } + ) + assert pod.resource_requests == ContainerResources(cpu_m=1000) def test_resource_requests_memory_mebibytes(self) -> None: - pod = Pod( + pod = Pod.from_primitive( { + "metadata": {"name": "pod"}, "spec": { "containers": [{"resources": {"requests": {"memory": "1000Mi"}}}] - } + }, } ) - assert pod.resource_requests == Resources(memory=1000 * 2**20) + assert pod.resource_requests == ContainerResources(memory=1000 * 2**20) def test_resource_requests_memory_gibibytes(self) -> None: - pod = Pod( - {"spec": {"containers": [{"resources": {"requests": {"memory": "1Gi"}}}]}} + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": { + "containers": [{"resources": {"requests": {"memory": "1Gi"}}}] + }, + } ) - assert pod.resource_requests == Resources(memory=1024 * 2**20) + assert pod.resource_requests == ContainerResources(memory=1024 * 2**20) def test_resource_requests_memory_terabytes(self) -> None: - pod = Pod( - {"spec": {"containers": [{"resources": {"requests": {"memory": "4T"}}}]}} + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": {"containers": [{"resources": {"requests": {"memory": "4T"}}}]}, + } ) - assert pod.resource_requests == Resources(memory=4 * 10**12) + assert pod.resource_requests == ContainerResources(memory=4 * 10**12) def test_resource_requests_memory_tebibytes(self) -> None: - pod = Pod( - {"spec": {"containers": [{"resources": {"requests": {"memory": "4Ti"}}}]}} + pod = Pod.from_primitive( + { + "metadata": {"name": "pod"}, + "spec": { + "containers": [{"resources": {"requests": {"memory": "4Ti"}}}] + }, + } ) - assert pod.resource_requests == Resources(memory=4 * 2**40) + assert pod.resource_requests == ContainerResources(memory=4 * 2**40) def test_resource_requests_gpu(self) -> None: - pod = Pod( + pod = Pod.from_primitive( { + "metadata": {"name": "pod"}, "spec": { - "containers": [{"resources": {"requests": {"nvidia.com/gpu": "1"}}}] - } + "containers": [ + { + "resources": { + "requests": { + "nvidia.com/gpu": "1", + "amd.com/gpu": "2", + } + } + } + ] + }, } ) - assert pod.resource_requests == Resources(gpu=1) + assert pod.resource_requests.has_gpu + assert pod.resource_requests == ContainerResources(nvidia_gpu=1, amd_gpu=2) def test_resource_requests_for_multiple_containers(self) -> None: - pod = Pod( + pod = Pod.from_primitive( { + "metadata": {"name": "pod"}, "spec": { "containers": [ {"resources": {"requests": {"cpu": "0.5", "memory": "512Mi"}}}, @@ -150,15 +230,16 @@ def test_resource_requests_for_multiple_containers(self) -> None: "cpu": "1", "memory": "1Gi", "nvidia.com/gpu": "1", + "amd.com/gpu": "2", } } }, ] - } + }, } ) - assert pod.resource_requests == Resources( - cpu_m=1500, memory=1536 * 2**20, gpu=1 + assert pod.resource_requests == ContainerResources( + cpu_m=1500, memory=1536 * 2**20, nvidia_gpu=1, amd_gpu=2 ) @@ -545,35 +626,61 @@ async def _feed_raw_chunk(data: bytes) -> None: class TestNode: def test_name(self) -> None: - node = Node({"metadata": {"name": "default"}}) - assert node.name == "default" - - def test_get_label(self) -> None: - node = Node({"metadata": {"labels": {"hello": "world"}}}) - assert node.get_label("hello") == "world" + node = Node.from_primitive( + { + "metadata": {"name": "default"}, + "status": {"nodeInfo": {"containerRuntimeVersion": "containerd"}}, + } + ) + assert node.metadata.name == "default" - def test_get_label_is_none(self) -> None: - node = Node({"metadata": {}}) - assert node.get_label("hello") is None + def test_labels(self) -> None: + node = Node.from_primitive( + { + "metadata": {"labels": {"hello": "world"}}, + "status": {"nodeInfo": {"containerRuntimeVersion": "containerd"}}, + } + ) + assert node.metadata.labels == {"hello": "world"} -class TestResources: +class TestContainerResources: def test_add(self) -> None: - resources1 = Resources(cpu_m=1, memory=2 * 2**20, gpu=3) - resources2 = Resources(cpu_m=4, memory=5 * 2**20, gpu=6) - assert resources1.add(resources2) == Resources(cpu_m=5, memory=7 * 2**20, gpu=9) - - def test_available(self) -> None: - total = Resources(cpu_m=1000, memory=1024 * 2**20, gpu=2) - used = Resources(cpu_m=100, memory=256 * 2**20, gpu=1) - assert total.available(used) == Resources(cpu_m=900, memory=768 * 2**20, gpu=1) - - def test_count(self) -> None: - total = Resources(cpu_m=1000, memory=1024 * 2**20, gpu=2) - - assert total.count(Resources(cpu_m=100, memory=128 * 2**20, gpu=1)) == 2 - assert total.count(Resources(cpu_m=100, memory=128 * 2**20)) == 8 - assert total.count(Resources(cpu_m=100)) == 10 - assert total.count(Resources(cpu_m=1100)) == 0 - assert total.count(Resources()) == 110 - assert Resources().count(Resources()) == 0 + resources1 = ContainerResources( + cpu_m=1, memory=2 * 2**20, nvidia_gpu=3, amd_gpu=4 + ) + resources2 = ContainerResources( + cpu_m=4, memory=5 * 2**20, nvidia_gpu=6, amd_gpu=7 + ) + assert resources1 + resources2 == ContainerResources( + cpu_m=5, memory=7 * 2**20, nvidia_gpu=9, amd_gpu=11 + ) + + def test_sub(self) -> None: + total = ContainerResources( + cpu_m=1000, memory=1024 * 2**20, nvidia_gpu=2, amd_gpu=4 + ) + used = ContainerResources( + cpu_m=100, memory=256 * 2**20, nvidia_gpu=1, amd_gpu=2 + ) + assert total - used == ContainerResources( + cpu_m=900, memory=768 * 2**20, nvidia_gpu=1, amd_gpu=2 + ) + + def test_floordiv(self) -> None: + total = ContainerResources( + cpu_m=1000, memory=1024 * 2**20, nvidia_gpu=2, amd_gpu=4 + ) + + assert ( + total + // ContainerResources( + cpu_m=100, memory=128 * 2**20, nvidia_gpu=1, amd_gpu=2 + ) + == 2 + ) + assert total // ContainerResources(cpu_m=100, memory=128 * 2**20) == 8 + assert total // ContainerResources(cpu_m=100) == 10 + assert total // ContainerResources(cpu_m=1100) == 0 + assert total // ContainerResources() == 110 + assert ContainerResources() // ContainerResources() == 0