From e8e4873801211e0f0e9de52973246e01dfe741aa Mon Sep 17 00:00:00 2001 From: Jonas Dedden Date: Wed, 3 Apr 2024 12:54:45 +0200 Subject: [PATCH] WIP --- dask_kubernetes/classic/tests/test_async.py | 6 +- dask_kubernetes/conftest.py | 29 +- .../operator/controller/controller.py | 274 +++++++++++------- .../controller/tests/test_controller.py | 189 ++++++++---- .../kubecluster/tests/test_kubecluster.py | 73 ++--- requirements-test.txt | 2 + 6 files changed, 366 insertions(+), 207 deletions(-) diff --git a/dask_kubernetes/classic/tests/test_async.py b/dask_kubernetes/classic/tests/test_async.py index b6c2a6201..e7e9640c1 100644 --- a/dask_kubernetes/classic/tests/test_async.py +++ b/dask_kubernetes/classic/tests/test_async.py @@ -773,7 +773,7 @@ async def test_start_with_workers(k8s_cluster, pod_spec): @pytest.mark.anyio @pytest.mark.xfail(reason="Flaky in CI and classic is deprecated anyway") -async def test_adapt_delete(cluster, ns): +async def test_adapt_delete(cluster, namespace): """ testing whether KubeCluster.adapt will bring back deleted worker pod (issue #244) @@ -782,7 +782,7 @@ async def test_adapt_delete(cluster, ns): async def get_worker_pods(): pods_list = await core_api.list_namespaced_pod( - namespace=ns, + namespace=namespace, label_selector=f"dask.org/component=worker,dask.org/cluster-name={cluster.name}", ) return [x.metadata.name for x in pods_list.items] @@ -797,7 +797,7 @@ async def get_worker_pods(): assert len(worker_pods) == 2 # delete one worker pod to_delete = worker_pods[0] - await core_api.delete_namespaced_pod(name=to_delete, namespace=ns) + await core_api.delete_namespaced_pod(name=to_delete, namespace=namespace) # wait until it is deleted start = time() while True: diff --git a/dask_kubernetes/conftest.py b/dask_kubernetes/conftest.py index aadaa2aac..3569b306a 100644 --- a/dask_kubernetes/conftest.py +++ b/dask_kubernetes/conftest.py @@ -5,6 +5,7 @@ import sys import tempfile import uuid +from typing import Iterator, Final import pytest from kopf.testing import KopfRunner @@ -12,30 +13,30 @@ from dask_kubernetes.common.utils import check_dependency -DIR = pathlib.Path(__file__).parent.absolute() +DIR: Final[pathlib.Path] = pathlib.Path(__file__).parent.absolute() check_dependency("helm") check_dependency("kubectl") check_dependency("docker") -DISABLE_LOGGERS = ["httpcore", "httpx"] +DISABLE_LOGGERS: Final[list[str]] = ["httpcore", "httpx"] -def pytest_configure(): +def pytest_configure() -> None: for logger_name in DISABLE_LOGGERS: logger = logging.getLogger(logger_name) logger.disabled = True @pytest.fixture() -def kopf_runner(k8s_cluster, ns): +def kopf_runner(k8s_cluster: KindCluster, namespace: str) -> KopfRunner: yield KopfRunner( - ["run", "-m", "dask_kubernetes.operator", "--verbose", "--namespace", ns] + ["run", "-m", "dask_kubernetes.operator", "--verbose", "--namespace", namespace] ) @pytest.fixture(scope="session") -def docker_image(): +def docker_image() -> str: image_name = "dask-kubernetes:dev" python_version = f"{sys.version_info.major}.{sys.version_info.minor}" subprocess.run( @@ -54,7 +55,9 @@ def docker_image(): @pytest.fixture(scope="session") -def k8s_cluster(request, docker_image): +def k8s_cluster( + request: pytest.FixtureRequest, docker_image: str +) -> Iterator[KindCluster]: image = None if version := os.environ.get("KUBERNETES_VERSION"): image = f"kindest/node:v{version}" @@ -73,7 +76,7 @@ def k8s_cluster(request, docker_image): @pytest.fixture(scope="session", autouse=True) -def install_istio(k8s_cluster): +def install_istio(k8s_cluster: KindCluster) -> None: if bool(os.environ.get("TEST_ISTIO", False)): check_dependency("istioctl") subprocess.run( @@ -85,7 +88,7 @@ def install_istio(k8s_cluster): @pytest.fixture(autouse=True) -def ns(k8s_cluster): +def namespace(k8s_cluster: KindCluster) -> Iterator[str]: ns = "dask-k8s-pytest-" + uuid.uuid4().hex[:10] k8s_cluster.kubectl("create", "ns", ns) yield ns @@ -93,7 +96,7 @@ def ns(k8s_cluster): @pytest.fixture(scope="session", autouse=True) -def install_gateway(k8s_cluster): +def install_gateway(k8s_cluster: KindCluster) -> Iterator[None]: if bool(os.environ.get("TEST_DASK_GATEWAY", False)): check_dependency("helm") # To ensure the operator can coexist with Gateway @@ -129,11 +132,11 @@ def install_gateway(k8s_cluster): @pytest.fixture(scope="session", autouse=True) -def customresources(k8s_cluster): +def customresources(k8s_cluster: KindCluster) -> Iterator[None]: temp_dir = tempfile.TemporaryDirectory() crd_path = os.path.join(DIR, "operator", "customresources") - def run_generate(crd_path, patch_path, temp_path): + def run_generate(crd_path: str, patch_path: str, temp_path: str) -> None: subprocess.run( ["k8s-crd-resolver", "-r", "-j", patch_path, crd_path, temp_path], check=True, @@ -154,5 +157,5 @@ def run_generate(crd_path, patch_path, temp_path): @pytest.fixture -def anyio_backend(): +def anyio_backend() -> str: return "asyncio" diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 17c641784..8f8cc1c17 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import copy import time @@ -5,16 +7,16 @@ from contextlib import suppress from datetime import datetime from uuid import uuid4 -from typing import Final, Any +from typing import Final, Any, Coroutine, TYPE_CHECKING import aiohttp import dask.config import kopf -import kr8s +import kr8s # type: ignore[import-untyped] from distributed.core import clean_exception, rpc from distributed.protocol.pickle import dumps from importlib_metadata import entry_points -from kr8s.asyncio.objects import Deployment, Pod, Service +from kr8s.asyncio.objects import Deployment, Pod, Service # type: ignore[import-untyped] from dask_kubernetes.common.objects import validate_cluster_name from dask_kubernetes.constants import SCHEDULER_NAME_TEMPLATE @@ -27,6 +29,9 @@ ) from dask_kubernetes.operator.networking import get_scheduler_address +if TYPE_CHECKING: + from distributed import Scheduler + _ANNOTATION_NAMESPACES_TO_IGNORE: Final[tuple[str, ...]] = ( "kopf.zalando.org", "kubectl.kubernetes.io", @@ -35,7 +40,9 @@ KUBERNETES_DATETIME_FORMAT: Final[str] = "%Y-%m-%dT%H:%M:%SZ" -DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION: Final[str] = "kubernetes.dask.org/cooldown-until" +DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION: Final[ + str +] = "kubernetes.dask.org/cooldown-until" # Load operator plugins from other packages PLUGINS: list[Any] = [] @@ -48,7 +55,7 @@ class SchedulerCommError(Exception): """Raised when unable to communicate with a scheduler.""" -def _get_annotations(meta) -> dict[str, str]: +def _get_annotations(meta: kopf.Meta) -> dict[str, str]: return { annotation_key: annotation_value for annotation_key, annotation_value in meta.annotations.items() @@ -59,7 +66,7 @@ def _get_annotations(meta) -> dict[str, str]: } -def _get_labels(meta): +def _get_labels(meta: kopf.Meta) -> dict[str, str]: return { label_key: label_value for label_key, label_value in meta.labels.items() @@ -70,15 +77,17 @@ def _get_labels(meta): def build_scheduler_deployment_spec( - cluster_name, namespace, pod_spec, annotations, labels -): - labels.update( - **{ - "dask.org/cluster-name": cluster_name, - "dask.org/component": "scheduler", - "sidecar.istio.io/inject": "false", - } - ) + cluster_name: str, + namespace: str, + pod_spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: + labels = dict(labels) | { + "dask.org/cluster-name": cluster_name, + "dask.org/component": "scheduler", + "sidecar.istio.io/inject": "false", + } metadata = { "name": SCHEDULER_NAME_TEMPLATE.format(cluster_name=cluster_name), "labels": labels, @@ -102,13 +111,16 @@ def build_scheduler_deployment_spec( } -def build_scheduler_service_spec(cluster_name, spec, annotations, labels): - labels.update( - **{ - "dask.org/cluster-name": cluster_name, - "dask.org/component": "scheduler", - } - ) +def build_scheduler_service_spec( + cluster_name: str, + spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: + labels = dict(labels) | { + "dask.org/cluster-name": cluster_name, + "dask.org/component": "scheduler", + } return { "apiVersion": "v1", "kind": "Service", @@ -122,37 +134,40 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels): def build_worker_deployment_spec( - worker_group_name, namespace, cluster_name, uuid, pod_spec, annotations, labels -): - labels.update( - **{ - "dask.org/cluster-name": cluster_name, - "dask.org/workergroup-name": worker_group_name, - "dask.org/component": "worker", - "sidecar.istio.io/inject": "false", - } - ) + worker_group_name: str, + namespace: str, + cluster_name: str, + uuid: str, + pod_spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: + labels = dict(labels) | { + "dask.org/cluster-name": cluster_name, + "dask.org/workergroup-name": worker_group_name, + "dask.org/component": "worker", + "sidecar.istio.io/inject": "false", + } worker_name = f"{worker_group_name}-worker-{uuid}" metadata = { "name": worker_name, "labels": labels, "annotations": annotations, } - spec = { - "replicas": 1, - "selector": { - "matchLabels": labels, - }, - "template": { - "metadata": metadata, - "spec": copy.deepcopy(pod_spec), - }, - } - deployment_spec = { + deployment_spec: dict[str, Any] = { "apiVersion": "apps/v1", "kind": "Deployment", "metadata": metadata, - "spec": spec, + "spec": { + "replicas": 1, + "selector": { + "matchLabels": labels, + }, + "template": { + "metadata": metadata, + "spec": copy.deepcopy(pod_spec), + }, + }, } worker_env = { "name": "DASK_WORKER_NAME", @@ -176,19 +191,24 @@ def build_worker_deployment_spec( return deployment_spec -def get_job_runner_pod_name(job_name): +def get_job_runner_pod_name(job_name: str) -> str: return f"{job_name}-runner" -def build_job_pod_spec(job_name, cluster_name, namespace, spec, annotations, labels): - labels.update( - **{ - "dask.org/cluster-name": cluster_name, - "dask.org/component": "job-runner", - "sidecar.istio.io/inject": "false", - } - ) - pod_spec = { +def build_job_pod_spec( + job_name: str, + cluster_name: str, + namespace: str, + spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: + labels = dict(labels) | { + "dask.org/cluster-name": cluster_name, + "dask.org/component": "job-runner", + "sidecar.istio.io/inject": "false", + } + pod_spec: dict[str, Any] = { "apiVersion": "v1", "kind": "Pod", "metadata": { @@ -212,19 +232,22 @@ def build_job_pod_spec(job_name, cluster_name, namespace, spec, annotations, lab return pod_spec -def build_default_worker_group_spec(cluster_name, spec, annotations, labels): - labels.update( - **{ - "dask.org/cluster-name": cluster_name, - "dask.org/component": "workergroup", - } - ) +def build_default_worker_group_spec( + cluster_name: str, + spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: return { "apiVersion": "kubernetes.dask.org/v1", "kind": "DaskWorkerGroup", "metadata": { "name": f"{cluster_name}-default", - "labels": labels, + "labels": dict(labels) + | { + "dask.org/cluster-name": cluster_name, + "dask.org/component": "workergroup", + }, "annotations": annotations, }, "spec": { @@ -234,18 +257,22 @@ def build_default_worker_group_spec(cluster_name, spec, annotations, labels): } -def build_cluster_spec(name, worker_spec, scheduler_spec, annotations, labels): - labels.update( - **{ - "dask.org/cluster-name": name, - } - ) +def build_cluster_spec( + name: str, + worker_spec: kopf.Spec, + scheduler_spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: return { "apiVersion": "kubernetes.dask.org/v1", "kind": "DaskCluster", "metadata": { "name": name, - "labels": labels, + "labels": dict(labels) + | { + "dask.org/cluster-name": name, + }, "annotations": annotations, }, "spec": {"worker": worker_spec, "scheduler": scheduler_spec}, @@ -253,7 +280,7 @@ def build_cluster_spec(name, worker_spec, scheduler_spec, annotations, labels): @kopf.on.startup() -async def startup(settings: kopf.OperatorSettings, **kwargs): +async def startup(settings: kopf.OperatorSettings, **kwargs: Any) -> None: # Set server and client timeouts to reconnect from time to time. # In rare occasions the connection might go idle we will no longer receive any events. # These timeouts should help in those cases. @@ -271,12 +298,14 @@ async def startup(settings: kopf.OperatorSettings, **kwargs): # There may be useful things for us to expose via the liveness probe # https://kopf.readthedocs.io/en/stable/probing/#probe-handlers @kopf.on.probe(id="now") -def get_current_timestamp(**kwargs): +def get_current_timestamp(**kwargs: Any) -> str: return datetime.utcnow().isoformat() @kopf.on.create("daskcluster.kubernetes.dask.org") -async def daskcluster_create(name, namespace, logger, patch, **kwargs): +async def daskcluster_create( + name: str, namespace: str, logger: kopf.Logger, patch: kopf.Patch, **kwargs: Any +) -> None: """When DaskCluster resource is created set the status.phase. This allows us to track that the operator is running. @@ -293,8 +322,14 @@ async def daskcluster_create(name, namespace, logger, patch, **kwargs): @kopf.on.field("daskcluster.kubernetes.dask.org", field="status.phase", new="Created") async def daskcluster_create_components( - spec, name, namespace, logger, patch, meta, **kwargs -): + spec: kopf.Spec, + name: str, + namespace: str, + logger: kopf.Logger, + patch: kopf.Patch, + meta: kopf.Meta, + **kwargs: Any, +) -> None: """When the DaskCluster status.phase goes into Created create the cluster components.""" logger.info("Creating Dask cluster components.") @@ -348,8 +383,13 @@ async def daskcluster_create_components( @kopf.on.field("service", field="status", labels={"dask.org/component": "scheduler"}) async def handle_scheduler_service_status( - spec, labels, status, namespace, logger, **kwargs -): + spec: kopf.Spec, + labels: kopf.Labels, + status: kopf.Status, + namespace: str, + logger: kopf.Logger, + **kwargs: Any, +) -> None: # If the Service is a LoadBalancer with no ingress endpoints mark the cluster as Pending if spec["type"] == "LoadBalancer" and not len( status.get("load_balancer", {}).get("ingress", []) @@ -365,7 +405,9 @@ async def handle_scheduler_service_status( @kopf.on.create("daskworkergroup.kubernetes.dask.org") -async def daskworkergroup_create(body, namespace, logger, **kwargs): +async def daskworkergroup_create( + body: kopf.Body, namespace: str, logger: kopf.Logger, **kwargs: Any +) -> None: wg = await DaskWorkerGroup(body, namespace=namespace) cluster = await wg.cluster() await cluster.adopt(wg) @@ -382,8 +424,12 @@ async def daskworkergroup_create(body, namespace, logger, **kwargs): async def retire_workers( - n_workers, scheduler_service_name, worker_group_name, namespace, logger -): + n_workers: int, + scheduler_service_name: str, + worker_group_name: str, + namespace: str, + logger: kopf.Logger, +) -> list[str]: # Try gracefully retiring via the HTTP API dashboard_address = await get_scheduler_address( scheduler_service_name, @@ -437,7 +483,9 @@ async def retire_workers( return [w.name for w in workers[:-n_workers]] -async def check_scheduler_idle(scheduler_service_name, namespace, logger): +async def check_scheduler_idle( + scheduler_service_name: str, namespace: str, logger: kopf.Logger +) -> str: # Try getting idle time via HTTP API dashboard_address = await get_scheduler_address( scheduler_service_name, @@ -481,7 +529,10 @@ async def check_scheduler_idle(scheduler_service_name, namespace, logger): f"Checking {scheduler_service_name} idleness failed via the Dask RPC, falling back to run_on_scheduler" ) - def idle_since(dask_scheduler=None): + def idle_since_func( + dask_scheduler: Scheduler + | None = None, # TODO: why is None allowed here? It will crash immediately. + ) -> str: if not dask_scheduler.idle_timeout: dask_scheduler.idle_timeout = 300 dask_scheduler.check_idle() @@ -494,7 +545,7 @@ def idle_since(dask_scheduler=None): ) async with rpc(comm_address) as scheduler_comm: response = await scheduler_comm.run_function( - function=dumps(idle_since), + function=dumps(idle_since_func), ) if response["status"] == "error": typ, exc, tb = clean_exception(**response) @@ -506,7 +557,9 @@ def idle_since(dask_scheduler=None): return idle_since -async def get_desired_workers(scheduler_service_name, namespace, logger): +async def get_desired_workers( + scheduler_service_name: str, namespace: str, logger: kopf.Logger +) -> Any: # Try gracefully retiring via the HTTP API dashboard_address = await get_scheduler_address( scheduler_service_name, @@ -537,13 +590,13 @@ async def get_desired_workers(scheduler_service_name, namespace, logger): ) from e -worker_group_scale_locks = defaultdict(lambda: asyncio.Lock()) +worker_group_scale_locks: dict[str, asyncio.Lock] = defaultdict(lambda: asyncio.Lock()) @kopf.on.field("daskcluster.kubernetes.dask.org", field="spec.worker.replicas") async def daskcluster_default_worker_group_replica_update( - name, namespace, old, new, **kwargs -): + name: str, namespace: str, old, new, **kwargs: Any +) -> None: if old is not None: wg = await DaskWorkerGroup.get(f"{name}-default", namespace=namespace) await wg.scale(new) @@ -551,8 +604,15 @@ async def daskcluster_default_worker_group_replica_update( @kopf.on.field("daskworkergroup.kubernetes.dask.org", field="spec.worker.replicas") async def daskworkergroup_replica_update( - name, namespace, meta, spec, new, body, logger, **kwargs -): + name: str, + namespace: str, + meta: kopf.Meta, + spec: kopf.Spec, + new, + body: kopf.Body, + logger: kopf.Logger, + **kwargs: Any, +) -> None: cluster_name = spec["cluster"] wg = await DaskWorkerGroup(body, namespace=namespace) try: @@ -631,14 +691,16 @@ async def daskworkergroup_replica_update( @kopf.on.delete("daskworkergroup.kubernetes.dask.org", optional=True) -async def daskworkergroup_remove(name, namespace, **kwargs): +async def daskworkergroup_remove(name: str, namespace: str, **kwargs: Any) -> None: lock_key = f"{name}/{namespace}" if lock_key in worker_group_scale_locks: del worker_group_scale_locks[lock_key] @kopf.on.create("daskjob.kubernetes.dask.org") -async def daskjob_create(name, namespace, logger, patch, **kwargs): +async def daskjob_create( + name: str, namespace: str, logger: kopf.Logger, patch: kopf.Patch, **kwargs: Any +) -> None: logger.info(f"A DaskJob has been created called {name} in {namespace}.") patch.status["jobStatus"] = "JobCreated" @@ -647,8 +709,14 @@ async def daskjob_create(name, namespace, logger, patch, **kwargs): "daskjob.kubernetes.dask.org", field="status.jobStatus", new="JobCreated" ) async def daskjob_create_components( - spec, name, namespace, logger, patch, meta, **kwargs -): + spec: kopf.Spec, + name: str, + namespace: str, + logger: kopf.Logger, + patch: kopf.Patch, + meta: kopf.Meta, + **kwargs: Any, +) -> None: logger.info("Creating Dask job components.") cluster_name = f"{name}" labels = _get_labels(meta) @@ -703,7 +771,9 @@ async def daskjob_create_components( labels={"dask.org/component": "job-runner"}, new="Running", ) -async def handle_runner_status_change_running(meta, namespace, logger, **kwargs): +async def handle_runner_status_change_running( + meta: kopf.Meta, namespace: str, logger: kopf.Logger, **kwargs: Any +) -> None: logger.info("Job now in running") name = meta["labels"]["dask.org/cluster-name"] job = await DaskJob.get(name, namespace=namespace) @@ -724,7 +794,9 @@ async def handle_runner_status_change_running(meta, namespace, logger, **kwargs) labels={"dask.org/component": "job-runner"}, new="Succeeded", ) -async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwargs): +async def handle_runner_status_change_succeeded( + meta: kopf.Meta, namespace: str, logger: kopf.Logger, **kwargs: Any +) -> None: logger.info("Job succeeded, deleting Dask cluster.") name = meta["labels"]["dask.org/cluster-name"] cluster = await DaskCluster.get(name, namespace=namespace) @@ -747,7 +819,9 @@ async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwarg labels={"dask.org/component": "job-runner"}, new="Failed", ) -async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwargs): +async def handle_runner_status_change_failed( + meta: kopf.Meta, namespace: str, logger: kopf.Logger, **kwargs: Any +) -> None: logger.info("Job failed, deleting Dask cluster.") name = meta["labels"]["dask.org/cluster-name"] cluster = await DaskCluster.get(name, namespace=namespace) @@ -765,7 +839,7 @@ async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwarg @kopf.on.create("daskautoscaler.kubernetes.dask.org") -async def daskautoscaler_create(body, logger, **_): +async def daskautoscaler_create(body: kopf.Body, logger: kopf.Logger, **_: Any) -> None: """When an autoscaler is created make it a child of the associated cluster for cascade deletion.""" autoscaler = await DaskAutoscaler(body) cluster = await autoscaler.cluster() @@ -774,7 +848,9 @@ async def daskautoscaler_create(body, logger, **_): @kopf.timer("daskautoscaler.kubernetes.dask.org", interval=5.0) -async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs): +async def daskautoscaler_adapt( + spec: kopf.Spec, name: str, namespace: str, logger: kopf.Logger, **kwargs: Any +) -> None: try: scheduler = await Pod.get( label_selector={ @@ -849,7 +925,9 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs): @kopf.timer("daskcluster.kubernetes.dask.org", interval=5.0) -async def daskcluster_autoshutdown(spec, name, namespace, logger, **kwargs): +async def daskcluster_autoshutdown( + spec: kopf.Spec, name: str, namespace: str, logger: kopf.Logger, **kwargs: Any +) -> None: if spec["idleTimeout"]: try: idle_since = await check_scheduler_idle( diff --git a/dask_kubernetes/operator/controller/tests/test_controller.py b/dask_kubernetes/operator/controller/tests/test_controller.py index 69ea367bc..c3768f2ea 100644 --- a/dask_kubernetes/operator/controller/tests/test_controller.py +++ b/dask_kubernetes/operator/controller/tests/test_controller.py @@ -1,16 +1,26 @@ +from __future__ import annotations + import asyncio import json import os.path import pathlib from contextlib import asynccontextmanager from datetime import datetime, timedelta -from typing import Final +from typing import ( + Final, + Any, + Callable, + AsyncIterator, + AsyncContextManager, + Iterator, + TYPE_CHECKING, +) import dask.config import pytest import yaml from dask.distributed import Client -from kr8s.asyncio.objects import Deployment, Pod, Service +from kr8s.asyncio.objects import Deployment, Pod, Service # type: ignore[import-untyped] from dask_kubernetes.constants import MAX_CLUSTER_NAME_LEN from dask_kubernetes.operator._objects import DaskCluster, DaskJob, DaskWorkerGroup @@ -19,6 +29,10 @@ get_job_runner_pod_name, ) +if TYPE_CHECKING: + from pytest_kind.cluster import KindCluster # type: ignore[import-untyped] + from kopf.testing import KopfRunner + DIR: Final[pathlib.Path] = pathlib.Path(__file__).parent.absolute() _EXPECTED_ANNOTATIONS: Final[dict[str, str]] = {"test-annotation": "annotation-value"} @@ -27,8 +41,8 @@ @pytest.fixture() -def gen_cluster_manifest(tmp_path): - def factory(cluster_name=DEFAULT_CLUSTER_NAME): +def gen_cluster_manifest(tmp_path: pathlib.Path) -> Callable[..., pathlib.Path]: + def factory(cluster_name: str = DEFAULT_CLUSTER_NAME) -> pathlib.Path: original_manifest_path = os.path.join(DIR, "resources", "simplecluster.yaml") with open(original_manifest_path, "r") as original_manifest_file: manifest = yaml.safe_load(original_manifest_file) @@ -42,52 +56,60 @@ def factory(cluster_name=DEFAULT_CLUSTER_NAME): @pytest.fixture() -def gen_cluster(k8s_cluster, ns, gen_cluster_manifest): +def gen_cluster( + k8s_cluster: KindCluster, + namespace: str, + gen_cluster_manifest: Callable[..., pathlib.Path], +) -> Iterator[Callable[..., AsyncContextManager[tuple[str, str]]]]: """Yields an instantiated context manager for creating/deleting a simple cluster.""" @asynccontextmanager - async def cm(cluster_name=DEFAULT_CLUSTER_NAME): + async def cm( + cluster_name: str = DEFAULT_CLUSTER_NAME, + ) -> AsyncIterator[tuple[str, str]]: cluster_path = gen_cluster_manifest(cluster_name) # Create cluster resource - k8s_cluster.kubectl("apply", "-n", ns, "-f", cluster_path) + k8s_cluster.kubectl("apply", "-n", namespace, "-f", str(cluster_path)) while cluster_name not in k8s_cluster.kubectl( - "get", "daskclusters.kubernetes.dask.org", "-n", ns + "get", "daskclusters.kubernetes.dask.org", "-n", namespace ): await asyncio.sleep(0.1) try: - yield cluster_name, ns + yield cluster_name, namespace finally: # Test: remove the wait=True, because I think this is blocking the operator - k8s_cluster.kubectl("delete", "-n", ns, "-f", cluster_path) + k8s_cluster.kubectl("delete", "-n", namespace, "-f", str(cluster_path)) yield cm @pytest.fixture() -def gen_job(k8s_cluster, ns): +def gen_job( + k8s_cluster: KindCluster, namespace: str +) -> Iterator[Callable[[str], AsyncContextManager[tuple[str, str]]]]: """Yields an instantiated context manager for creating/deleting a simple job.""" @asynccontextmanager - async def cm(job_file): + async def cm(job_file: str) -> AsyncIterator[tuple[str, str]]: job_path = os.path.join(DIR, "resources", job_file) with open(job_path) as f: job_name = yaml.load(f, yaml.Loader)["metadata"]["name"] # Create cluster resource - k8s_cluster.kubectl("apply", "-n", ns, "-f", job_path) + k8s_cluster.kubectl("apply", "-n", namespace, "-f", job_path) while job_name not in k8s_cluster.kubectl( - "get", "daskjobs.kubernetes.dask.org", "-n", ns + "get", "daskjobs.kubernetes.dask.org", "-n", namespace ): await asyncio.sleep(0.1) try: - yield job_name, ns + yield job_name, namespace finally: # Test: remove the wait=True, because I think this is blocking the operator - k8s_cluster.kubectl("delete", "-n", ns, "-f", job_path) + k8s_cluster.kubectl("delete", "-n", namespace, "-f", job_path) while job_name in k8s_cluster.kubectl( - "get", "daskjobs.kubernetes.dask.org", "-n", ns + "get", "daskjobs.kubernetes.dask.org", "-n", namespace ): await asyncio.sleep(0.1) @@ -95,42 +117,44 @@ async def cm(job_file): @pytest.fixture() -def gen_worker_group(k8s_cluster, ns): +def gen_worker_group( + k8s_cluster: KindCluster, namespace: str +) -> Iterator[Callable[[str], AsyncContextManager[tuple[str, str]]]]: """Yields an instantiated context manager for creating/deleting a worker group.""" @asynccontextmanager - async def cm(worker_group_file): + async def cm(worker_group_file: str) -> AsyncIterator[tuple[str, str]]: worker_group_path = os.path.join(DIR, "resources", worker_group_file) with open(worker_group_path) as f: worker_group_name = yaml.load(f, yaml.Loader)["metadata"]["name"] # Create cluster resource - k8s_cluster.kubectl("apply", "-n", ns, "-f", worker_group_path) + k8s_cluster.kubectl("apply", "-n", namespace, "-f", worker_group_path) while worker_group_name not in k8s_cluster.kubectl( - "get", "daskworkergroups.kubernetes.dask.org", "-n", ns + "get", "daskworkergroups.kubernetes.dask.org", "-n", namespace ): await asyncio.sleep(0.1) try: - yield worker_group_name, ns + yield worker_group_name, namespace finally: # Test: remove the wait=True, because I think this is blocking the operator - k8s_cluster.kubectl("delete", "-n", ns, "-f", worker_group_path) + k8s_cluster.kubectl("delete", "-n", namespace, "-f", worker_group_path) while worker_group_name in k8s_cluster.kubectl( - "get", "daskworkergroups.kubernetes.dask.org", "-n", ns + "get", "daskworkergroups.kubernetes.dask.org", "-n", namespace ): await asyncio.sleep(0.1) yield cm -def test_customresources(k8s_cluster): +def test_customresources(k8s_cluster: KindCluster) -> None: assert "daskclusters.kubernetes.dask.org" in k8s_cluster.kubectl("get", "crd") assert "daskworkergroups.kubernetes.dask.org" in k8s_cluster.kubectl("get", "crd") assert "daskjobs.kubernetes.dask.org" in k8s_cluster.kubectl("get", "crd") -def test_operator_runs(kopf_runner): +def test_operator_runs(kopf_runner: KopfRunner) -> None: with kopf_runner as runner: pass @@ -138,7 +162,7 @@ def test_operator_runs(kopf_runner): assert runner.exception is None -def test_operator_plugins(kopf_runner): +def test_operator_plugins(kopf_runner: KopfRunner) -> None: with kopf_runner as runner: pass @@ -149,7 +173,11 @@ def test_operator_plugins(kopf_runner): @pytest.mark.timeout(180) @pytest.mark.anyio -async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster): +async def test_simplecluster( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): scheduler_deployment_name = "simple-scheduler" @@ -168,7 +196,7 @@ async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster): with k8s_cluster.port_forward( f"service/{service_name}", 8786, "-n", ns ) as port: - async with Client( + async with Client( # type: ignore[no-untyped-call] f"tcp://localhost:{port}", asynchronous=True ) as client: await client.wait_for_workers(2) @@ -293,7 +321,11 @@ async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster): @pytest.mark.anyio -async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster): +async def test_scalesimplecluster( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): scheduler_deployment_name = "simple-scheduler" @@ -311,7 +343,7 @@ async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster): with k8s_cluster.port_forward( f"service/{service_name}", 8786, "-n", ns ) as port: - async with Client( + async with Client( # type: ignore[no-untyped-call] f"tcp://localhost:{port}", asynchronous=True ) as client: k8s_cluster.kubectl( @@ -339,8 +371,10 @@ async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster): @pytest.mark.anyio async def test_scalesimplecluster_from_cluster_spec( - k8s_cluster, kopf_runner, gen_cluster -): + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): scheduler_deployment_name = "simple-scheduler" @@ -358,7 +392,7 @@ async def test_scalesimplecluster_from_cluster_spec( with k8s_cluster.port_forward( f"service/{service_name}", 8786, "-n", ns ) as port: - async with Client( + async with Client( # type: ignore[no-untyped-call] f"tcp://localhost:{port}", asynchronous=True ) as client: k8s_cluster.kubectl( @@ -385,7 +419,11 @@ async def test_scalesimplecluster_from_cluster_spec( @pytest.mark.anyio -async def test_recreate_scheduler_pod(k8s_cluster, kopf_runner, gen_cluster): +async def test_recreate_scheduler_pod( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): scheduler_deployment_name = "simple-scheduler" @@ -424,7 +462,11 @@ async def test_recreate_scheduler_pod(k8s_cluster, kopf_runner, gen_cluster): @pytest.mark.anyio @pytest.mark.skip(reason="Flaky in CI") -async def test_recreate_worker_pods(k8s_cluster, kopf_runner, gen_cluster): +async def test_recreate_worker_pods( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): cluster = await DaskCluster.get(cluster_name, namespace=ns) @@ -454,8 +496,10 @@ async def test_recreate_worker_pods(k8s_cluster, kopf_runner, gen_cluster): @pytest.mark.anyio async def test_simplecluster_batched_worker_deployments( - k8s_cluster, kopf_runner, gen_cluster -): + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: with dask.config.set( { @@ -481,7 +525,7 @@ async def test_simplecluster_batched_worker_deployments( with k8s_cluster.port_forward( f"service/{service_name}", 8786, "-n", ns ) as port: - async with Client( + async with Client( # type: ignore[no-untyped-call] f"tcp://localhost:{port}", asynchronous=True ) as client: await client.wait_for_workers(2) @@ -490,8 +534,8 @@ async def test_simplecluster_batched_worker_deployments( assert (await total) == sum(map(lambda x: x + 1, range(10))) -def _get_job_status(k8s_cluster, ns): - return json.loads( +def _get_job_status(k8s_cluster: KindCluster, ns: str) -> dict[str, Any]: + return json.loads( # type: ignore[no-any-return] k8s_cluster.kubectl( "get", "-n", @@ -503,17 +547,17 @@ def _get_job_status(k8s_cluster, ns): ) -def _assert_job_status_created(job_status): +def _assert_job_status_created(job_status: dict[str, Any]) -> None: assert "jobStatus" in job_status -def _assert_job_status_cluster_created(job, job_status): +def _assert_job_status_cluster_created(job: str, job_status: dict[str, Any]) -> None: assert "jobStatus" in job_status assert job_status["clusterName"] == job assert job_status["jobRunnerPodName"] == get_job_runner_pod_name(job) -def _assert_job_status_running(job, job_status): +def _assert_job_status_running(job: str, job_status: dict[str, Any]) -> None: assert "jobStatus" in job_status assert job_status["clusterName"] == job assert job_status["jobRunnerPodName"] == get_job_runner_pod_name(job) @@ -521,7 +565,9 @@ def _assert_job_status_running(job, job_status): assert datetime.utcnow() > start_time > (datetime.utcnow() - timedelta(seconds=10)) -def _assert_final_job_status(job, job_status, expected_status): +def _assert_final_job_status( + job: str, job_status: dict[str, Any], expected_status: str +) -> None: assert job_status["jobStatus"] == expected_status assert job_status["clusterName"] == job assert job_status["jobRunnerPodName"] == get_job_runner_pod_name(job) @@ -539,7 +585,11 @@ def _assert_final_job_status(job, job_status, expected_status): @pytest.mark.anyio -async def test_job(k8s_cluster, kopf_runner, gen_job): +async def test_job( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_job: Callable[[str], AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner as runner: async with gen_job("simplejob.yaml") as (job, ns): assert job @@ -610,7 +660,11 @@ async def test_job(k8s_cluster, kopf_runner, gen_job): @pytest.mark.anyio -async def test_failed_job(k8s_cluster, kopf_runner, gen_job): +async def test_failed_job( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_job: Callable[[str], AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner as runner: async with gen_job("failedjob.yaml") as (job, ns): assert job @@ -668,12 +722,16 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job): @pytest.mark.anyio -async def test_object_dask_cluster(k8s_cluster, kopf_runner, gen_cluster): +async def test_object_dask_cluster( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): cluster = await DaskCluster.get(cluster_name, namespace=ns) - worker_groups = [] + worker_groups: list[DaskWorkerGroup] = [] while not worker_groups: worker_groups = await cluster.worker_groups() await asyncio.sleep(0.1) @@ -701,8 +759,11 @@ async def test_object_dask_cluster(k8s_cluster, kopf_runner, gen_cluster): @pytest.mark.anyio async def test_object_dask_worker_group( - k8s_cluster, kopf_runner, gen_cluster, gen_worker_group -): + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], + gen_worker_group: Callable[[str], AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with ( gen_cluster() as (cluster_name, ns), @@ -716,7 +777,7 @@ async def test_object_dask_worker_group( additional_workergroup_name, namespace=ns ) - worker_groups = [] + worker_groups: list[DaskWorkerGroup] = [] while not worker_groups: worker_groups = await cluster.worker_groups() await asyncio.sleep(0.1) @@ -726,13 +787,13 @@ async def test_object_dask_worker_group( for wg in worker_groups: assert isinstance(wg, DaskWorkerGroup) - deployments = [] + deployments: list[Deployment] = [] while not deployments: deployments = await wg.deployments() await asyncio.sleep(0.1) assert all([isinstance(d, Deployment) for d in deployments]) - pods = [] + pods: list[Pod] = [] while not pods: pods = await wg.pods() await asyncio.sleep(0.1) @@ -757,7 +818,11 @@ async def test_object_dask_worker_group( @pytest.mark.anyio @pytest.mark.skip(reason="Flaky in CI") -async def test_object_dask_job(k8s_cluster, kopf_runner, gen_job): +async def test_object_dask_job( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_job: Callable[[str], AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_job("simplejob.yaml") as (job_name, ns): job = await DaskJob.get(job_name, namespace=ns) @@ -769,13 +834,15 @@ async def test_object_dask_job(k8s_cluster, kopf_runner, gen_job): assert isinstance(cluster, DaskCluster) -async def _get_cluster_status(k8s_cluster, ns, cluster_name): +async def _get_cluster_status( + k8s_cluster: KindCluster, ns: str, cluster_name: str +) -> str: """ Will loop infinitely in search of non-falsey cluster status. Make sure there is a timeout on any test which calls this. """ while True: - cluster_status = k8s_cluster.kubectl( + cluster_status: str = k8s_cluster.kubectl( "get", "-n", ns, @@ -800,8 +867,12 @@ async def _get_cluster_status(k8s_cluster, ns, cluster_name): ], ) async def test_create_cluster_validates_name( - cluster_name, expected_status, k8s_cluster, kopf_runner, gen_cluster -): + cluster_name: str, + expected_status: str, + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster(cluster_name=cluster_name) as (_, ns): actual_status = await _get_cluster_status(k8s_cluster, ns, cluster_name) diff --git a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py index 662e711fa..917d31c10 100644 --- a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py @@ -14,10 +14,10 @@ def test_experimental_shim(): assert ExperimentalKubeCluster is KubeCluster -def test_kubecluster(kopf_runner, docker_image, ns): +def test_kubecluster(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( - name="cluster", namespace=ns, image=docker_image, n_workers=1 + name="cluster", namespace=namespace, image=docker_image, n_workers=1 ) as cluster: with Client(cluster) as client: client.scheduler_info() @@ -25,11 +25,11 @@ def test_kubecluster(kopf_runner, docker_image, ns): @pytest.mark.anyio -async def test_kubecluster_async(kopf_runner, docker_image, ns): +async def test_kubecluster_async(kopf_runner, docker_image, namespace): with kopf_runner: async with KubeCluster( name="async", - namespace=ns, + namespace=namespace, image=docker_image, n_workers=1, asynchronous=True, @@ -38,69 +38,71 @@ async def test_kubecluster_async(kopf_runner, docker_image, ns): assert await client.submit(lambda x: x + 1, 10).result() == 11 -def test_custom_worker_command(kopf_runner, docker_image, ns): +def test_custom_worker_command(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( name="customworker", image=docker_image, worker_command=["python", "-m", "distributed.cli.dask_worker"], n_workers=1, - namespace=ns, + namespace=namespace, ) as cluster: with Client(cluster) as client: assert client.submit(lambda x: x + 1, 10).result() == 11 -def test_multiple_clusters(kopf_runner, docker_image, ns): +def test_multiple_clusters(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( - name="bar", image=docker_image, n_workers=1, namespace=ns + name="bar", image=docker_image, n_workers=1, namespace=namespace ) as cluster1: with Client(cluster1) as client1: assert client1.submit(lambda x: x + 1, 10).result() == 11 with KubeCluster( - name="baz", image=docker_image, n_workers=1, namespace=ns + name="baz", image=docker_image, n_workers=1, namespace=namespace ) as cluster2: with Client(cluster2) as client2: assert client2.submit(lambda x: x + 1, 10).result() == 11 -def test_clusters_with_custom_port_forward(kopf_runner, docker_image, ns): +def test_clusters_with_custom_port_forward(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( name="bar", image=docker_image, n_workers=1, scheduler_forward_port=8888, - namespace=ns, + namespace=namespace, ) as cluster1: assert cluster1.forwarded_dashboard_port == "8888" with Client(cluster1) as client1: assert client1.submit(lambda x: x + 1, 10).result() == 11 -def test_multiple_clusters_simultaneously(kopf_runner, docker_image, ns): +def test_multiple_clusters_simultaneously(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( - name="fizz", image=docker_image, n_workers=1, namespace=ns + name="fizz", image=docker_image, n_workers=1, namespace=namespace ) as cluster1, KubeCluster( - name="buzz", image=docker_image, n_workers=1, namespace=ns + name="buzz", image=docker_image, n_workers=1, namespace=namespace ) as cluster2: with Client(cluster1) as client1, Client(cluster2) as client2: assert client1.submit(lambda x: x + 1, 10).result() == 11 assert client2.submit(lambda x: x + 1, 10).result() == 11 -def test_multiple_clusters_simultaneously_same_loop(kopf_runner, docker_image, ns): +def test_multiple_clusters_simultaneously_same_loop( + kopf_runner, docker_image, namespace +): with kopf_runner: with KubeCluster( - name="fizz", image=docker_image, n_workers=1, namespace=ns + name="fizz", image=docker_image, n_workers=1, namespace=namespace ) as cluster1, KubeCluster( name="buzz", image=docker_image, loop=cluster1.loop, n_workers=1, - namespace=ns, + namespace=namespace, ) as cluster2: with Client(cluster1) as client1, Client(cluster2) as client2: assert cluster1.loop is cluster2.loop is client1.loop is client2.loop @@ -109,27 +111,30 @@ def test_multiple_clusters_simultaneously_same_loop(kopf_runner, docker_image, n @pytest.mark.anyio -async def test_cluster_from_name(kopf_runner, docker_image, ns): +async def test_cluster_from_name(kopf_runner, docker_image, namespace): with kopf_runner: async with KubeCluster( name="abc", - namespace=ns, + namespace=namespace, image=docker_image, n_workers=1, asynchronous=True, ) as firstcluster: async with KubeCluster.from_name( - "abc", namespace=ns, asynchronous=True + "abc", namespace=namespace, asynchronous=True ) as secondcluster: assert firstcluster == secondcluster firstcluster.scale(2) assert firstcluster.scheduler_info == secondcluster.scheduler_info -def test_additional_worker_groups(kopf_runner, docker_image, ns): +def test_additional_worker_groups(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( - name="additionalgroups", n_workers=1, image=docker_image, namespace=ns + name="additionalgroups", + n_workers=1, + image=docker_image, + namespace=namespace, ) as cluster: cluster.add_worker_group(name="more", n_workers=1) with Client(cluster) as client: @@ -138,18 +143,18 @@ def test_additional_worker_groups(kopf_runner, docker_image, ns): cluster.delete_worker_group(name="more") -def test_cluster_without_operator(docker_image, ns): +def test_cluster_without_operator(docker_image, namespace): with pytest.raises(TimeoutError, match="is the Dask Operator running"): KubeCluster( name="noop", n_workers=1, image=docker_image, resource_timeout=1, - namespace=ns, + namespace=namespace, ) -def test_cluster_crashloopbackoff(kopf_runner, docker_image, ns): +def test_cluster_crashloopbackoff(kopf_runner, docker_image, namespace): with kopf_runner: with pytest.raises(SchedulerStartupError, match="Scheduler failed to start"): spec = make_cluster_spec(name="crashloopbackoff", n_workers=1) @@ -158,19 +163,19 @@ def test_cluster_crashloopbackoff(kopf_runner, docker_image, ns): ] = "dask-schmeduler" KubeCluster( custom_cluster_spec=spec, - namespace=ns, + namespace=namespace, resource_timeout=1, idle_timeout=2, ) -def test_adapt(kopf_runner, docker_image, ns): +def test_adapt(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( name="adaptive", image=docker_image, n_workers=0, - namespace=ns, + namespace=namespace, ) as cluster: cluster.adapt(minimum=0, maximum=1) with Client(cluster) as client: @@ -182,17 +187,17 @@ def test_adapt(kopf_runner, docker_image, ns): cluster.scale(0) -def test_custom_spec(kopf_runner, docker_image, ns): +def test_custom_spec(kopf_runner, docker_image, namespace): with kopf_runner: spec = make_cluster_spec("customspec", image=docker_image) with KubeCluster( - custom_cluster_spec=spec, n_workers=1, namespace=ns + custom_cluster_spec=spec, n_workers=1, namespace=namespace ) as cluster: with Client(cluster) as client: assert client.submit(lambda x: x + 1, 10).result() == 11 -def test_typo_resource_limits(ns): +def test_typo_resource_limits(namespace): with pytest.raises(ValueError): KubeCluster( name="foo", @@ -201,7 +206,7 @@ def test_typo_resource_limits(ns): "CPU": "1", }, }, - namespace=ns, + namespace=namespace, ) @@ -212,11 +217,11 @@ def test_typo_resource_limits(ns): "invalid.chars.in.name", ], ) -def test_invalid_cluster_name_fails(cluster_name, kopf_runner, docker_image, ns): +def test_invalid_cluster_name_fails(cluster_name, kopf_runner, docker_image, namespace): with kopf_runner: with pytest.raises(ValidationError): KubeCluster( name=cluster_name, - namespace=ns, + namespace=namespace, image=docker_image, ) diff --git a/requirements-test.txt b/requirements-test.txt index c44e8bbba..4d9b61c55 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -9,3 +9,5 @@ git+https://github.com/elemental-lf/k8s-crd-resolver@v0.14.0 jsonschema==4.17.3 dask[complete] anyio +mypy +types-PyYAML