Skip to content

Commit

Permalink
ENG-29 get_available_jobs_count should take all pods into account (#909)
Browse files Browse the repository at this point in the history
  • Loading branch information
zubenkoivan authored May 10, 2024
1 parent 7ff9c34 commit 1f5c7c0
Show file tree
Hide file tree
Showing 16 changed files with 711 additions and 376 deletions.
4 changes: 0 additions & 4 deletions charts/platform-monitoring/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ release: {{ .Release.Name | quote }}
- name: SENTRY_SAMPLE_RATE
value: {{ .Values.sentry.sampleRate | default 0 | quote }}
{{- end }}
{{- if .Values.nodeLabels.job }}
- name: NP_MONITORING_NODE_LABEL_JOB
value: {{ .Values.nodeLabels.job }}
{{- end }}
{{- if .Values.nodeLabels.nodePool }}
- name: NP_MONITORING_NODE_LABEL_NODE_POOL
value: {{ .Values.nodeLabels.nodePool }}
Expand Down
2 changes: 1 addition & 1 deletion charts/platform-monitoring/templates/fluent-bit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ spec:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: {{ .Values.nodeLabels.job | quote }}
- key: {{ .Values.nodeLabels.nodePool | quote }}
operator: Exists
{{- if .Values.fluentbit.tolerations }}
tolerations: {{ toYaml .Values.fluentbit.tolerations | nindent 8 }}
Expand Down
1 change: 0 additions & 1 deletion charts/platform-monitoring/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ sentry:
sampleRate: 0.002

nodeLabels:
job: platform.neuromation.io/job
nodePool: platform.neuromation.io/nodepool

fluentbit:
Expand Down
1 change: 0 additions & 1 deletion minikube.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ function minikube::start {
--wait-timeout=5m
kubectl config use-context minikube
kubectl get nodes -o name | xargs -I {} kubectl label {} --overwrite \
platform.neuromation.io/job=true \
platform.neuromation.io/nodepool=minikube
}

Expand Down
1 change: 0 additions & 1 deletion platform_monitoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,6 @@ async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:
kube_client=kube_client,
container_runtime_client_registry=container_runtime_client_registry,
cluster_name=config.cluster_name,
kube_job_label=config.kube.job_label,
kube_node_pool_label=config.kube.node_pool_label,
)
app[MONITORING_APP_KEY][JOBS_SERVICE_KEY] = jobs_service
Expand Down
1 change: 0 additions & 1 deletion platform_monitoring/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ class KubeConfig:
kubelet_node_port: int = 10250
nvidia_dcgm_node_port: int | None = None

job_label: str = "platform.neuromation.io/job"
node_pool_label: str = "platform.neuromation.io/nodepool"


Expand Down
3 changes: 0 additions & 3 deletions platform_monitoring/config_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ def _create_kube(self) -> KubeConfig:
if "NP_MONITORING_K8S_NVIDIA_DCGM_PORT" in self._environ
else KubeConfig.nvidia_dcgm_node_port
),
job_label=self._environ.get(
"NP_MONITORING_NODE_LABEL_JOB", KubeConfig.job_label
),
node_pool_label=self._environ.get(
"NP_MONITORING_NODE_LABEL_NODE_POOL", KubeConfig.node_pool_label
),
Expand Down
119 changes: 75 additions & 44 deletions platform_monitoring/jobs_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
from collections import defaultdict
from collections.abc import AsyncGenerator, AsyncIterator, Mapping, Sequence
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from dataclasses import dataclass
from functools import reduce

import aiohttp
from neuro_config_client import ConfigClient, ResourcePoolType
Expand All @@ -14,7 +14,14 @@
ContainerRuntimeClientRegistry,
ContainerRuntimeError,
)
from .kube_client import JobNotFoundException, KubeClient, Pod, Resources
from .kube_client import (
DEFAULT_MAX_PODS_PER_NODE,
ContainerResources,
JobNotFoundException,
KubeClient,
NodeResources,
Pod,
)
from .user import User
from .utils import KubeHelper, asyncgeneratorcontextmanager

Expand Down Expand Up @@ -44,12 +51,12 @@ def __init__(self, name: str) -> None:
class JobsService:
def __init__(
self,
*,
config_client: ConfigClient,
jobs_client: JobsClient,
kube_client: KubeClient,
container_runtime_client_registry: ContainerRuntimeClientRegistry,
cluster_name: str,
kube_job_label: str = KubeConfig.job_label,
kube_node_pool_label: str = KubeConfig.node_pool_label,
) -> None:
self._config_client = config_client
Expand All @@ -58,7 +65,6 @@ def __init__(
self._kube_helper = KubeHelper()
self._container_runtime_client_registry = container_runtime_client_registry
self._cluster_name = cluster_name
self._kube_job_label = kube_job_label
self._kube_node_pool_label = kube_node_pool_label

async def get(self, job_id: str) -> Job:
Expand Down Expand Up @@ -91,7 +97,10 @@ async def save(
cont_id = pod.get_container_id(pod_name)
assert cont_id

runtime_client = await self._container_runtime_client_registry.get(pod.host_ip)
assert pod.status.host_ip
runtime_client = await self._container_runtime_client_registry.get(
pod.status.host_ip
)

try:
async with runtime_client.commit(
Expand Down Expand Up @@ -138,7 +147,10 @@ async def attach(
if not pod.tty:
tty = False

runtime_client = await self._container_runtime_client_registry.get(pod.host_ip)
assert pod.status.host_ip
runtime_client = await self._container_runtime_client_registry.get(
pod.status.host_ip
)

async with runtime_client.attach(
cont_id, tty=tty, stdin=stdin, stdout=stdout, stderr=stderr
Expand All @@ -161,7 +173,10 @@ async def exec(
cont_id = pod.get_container_id(pod_name)
assert cont_id

runtime_client = await self._container_runtime_client_registry.get(pod.host_ip)
assert pod.status.host_ip
runtime_client = await self._container_runtime_client_registry.get(
pod.status.host_ip
)

async with runtime_client.exec(
cont_id, cmd, tty=tty, stdin=stdin, stdout=stdout, stderr=stderr
Expand All @@ -175,7 +190,10 @@ async def kill(self, job: Job) -> None:
cont_id = pod.get_container_id(pod_name)
assert cont_id

runtime_client = await self._container_runtime_client_registry.get(pod.host_ip)
assert pod.status.host_ip
runtime_client = await self._container_runtime_client_registry.get(
pod.status.host_ip
)

await runtime_client.kill(cont_id)

Expand All @@ -184,14 +202,14 @@ async def port_forward(
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
pod_name = self._kube_helper.get_job_pod_name(job)
pod = await self._get_running_jobs_pod(pod_name)
reader, writer = await asyncio.open_connection(pod.pod_ip, port)
reader, writer = await asyncio.open_connection(pod.status.pod_ip, port)
return reader, writer

async def _get_running_jobs_pod(self, job_id: str) -> Pod:
pod: Pod | None
try:
pod = await self._kube_client.get_pod(job_id)
if not pod.is_phase_running:
if not pod.status.is_running:
pod = None
except JobNotFoundException:
# job's pod does not exist: it might be already garbage-collected
Expand All @@ -207,39 +225,47 @@ async def get_available_jobs_counts(self) -> Mapping[str, int]:
result: dict[str, int] = {}
cluster = await self._config_client.get_cluster(self._cluster_name)
assert cluster.orchestrator is not None
resource_requests = await self._get_resource_requests_by_node_pool()
available_resources = await self._get_available_resources_by_node_pool()
pool_types = {p.name: p for p in cluster.orchestrator.resource_pool_types}
for preset in cluster.orchestrator.resource_presets:
available_jobs_count = 0
preset_resources = Resources(
preset_resources = ContainerResources(
cpu_m=int(preset.cpu * 1000),
memory=preset.memory,
gpu=preset.gpu or 0,
nvidia_gpu=preset.nvidia_gpu or 0,
amd_gpu=preset.amd_gpu or 0,
)
preset_pool_types = [pool_types[r] for r in preset.resource_affinity]
for node_pool in preset_pool_types:
node_pools = [pool_types[r] for r in preset.available_resource_pool_names]
for node_pool in node_pools:
node_resource_limit = self._get_node_resource_limit(node_pool)
node_resource_requests = resource_requests.get(node_pool.name, [])
running_nodes_count = len(node_resource_requests)
node_pool_available_resources = available_resources.get(
node_pool.name, []
)
running_nodes_count = len(node_pool_available_resources)
free_nodes_count = node_pool.max_size - running_nodes_count
# get number of jobs that can be scheduled on running nodes
# in the current node pool
for request in node_resource_requests:
available_resources = node_resource_limit.available(request)
available_jobs_count += available_resources.count(preset_resources)
for node_available_resources in node_pool_available_resources:
available_jobs_count += min(
node_available_resources // preset_resources,
node_available_resources.pods,
)
# get number of jobs that can be scheduled on free nodes
# in the current node pool
if free_nodes_count > 0:
available_jobs_count += (
free_nodes_count * node_resource_limit.count(preset_resources)
available_jobs_count += free_nodes_count * min(
DEFAULT_MAX_PODS_PER_NODE,
node_resource_limit // preset_resources,
)
result[preset.name] = available_jobs_count
return result

async def _get_resource_requests_by_node_pool(self) -> dict[str, list[Resources]]:
result: dict[str, list[Resources]] = {}
async def _get_available_resources_by_node_pool(
self,
) -> dict[str, list[NodeResources]]:
result: dict[str, list[NodeResources]] = defaultdict(list)
pods = await self._kube_client.get_pods(
label_selector=self._kube_job_label,
all_namespaces=True,
field_selector=",".join(
(
"status.phase!=Failed",
Expand All @@ -248,36 +274,41 @@ async def _get_resource_requests_by_node_pool(self) -> dict[str, list[Resources]
),
),
)
nodes = await self._kube_client.get_nodes(label_selector=self._kube_job_label)
for node_name, node_pods in self._group_pods_by_node(pods).items():
if not node_name:
continue
nodes = await self._kube_client.get_nodes(
label_selector=self._kube_node_pool_label
)
for node_name, node_pods in self._get_pods_by_node(pods).items():
for node in nodes:
if node.name == node_name:
if node.metadata.name == node_name:
break
else:
raise NodeNotFoundException(node_name)
node_pool_name = node.get_label(self._kube_node_pool_label)
node_pool_name = node.metadata.labels.get(self._kube_node_pool_label)
if not node_pool_name: # pragma: no coverage
continue
pod_resources = [p.resource_requests for p in node_pods]
node_resources = reduce(Resources.add, pod_resources, Resources())
result.setdefault(node_pool_name, []).append(node_resources)
resource_requests = sum(
(pod.resource_requests for pod in node_pods), ContainerResources()
)
available_resources = node.status.allocatable - resource_requests
available_resources = available_resources.with_pods(
available_resources.pods - len(node_pods)
)
result[node_pool_name].append(available_resources)
return result

def _group_pods_by_node(self, pods: Sequence[Pod]) -> dict[str | None, list[Pod]]:
result: dict[str | None, list[Pod]] = {}
def _get_pods_by_node(self, pods: Sequence[Pod]) -> dict[str, list[Pod]]:
result: dict[str, list[Pod]] = defaultdict(list)
for pod in pods:
group = result.get(pod.node_name)
if not group:
group = []
result[pod.node_name] = group
group.append(pod)
if pod.spec.node_name:
result[pod.spec.node_name].append(pod)
return result

def _get_node_resource_limit(self, node_pool: ResourcePoolType) -> Resources:
return Resources(
def _get_node_resource_limit(
self, node_pool: ResourcePoolType
) -> ContainerResources:
return ContainerResources(
cpu_m=int(node_pool.available_cpu * 1000),
memory=node_pool.available_memory,
gpu=node_pool.gpu or 0,
nvidia_gpu=node_pool.nvidia_gpu or 0,
amd_gpu=node_pool.amd_gpu or 0,
)
Loading

0 comments on commit 1f5c7c0

Please sign in to comment.