From 62745d8db39d070a92a063b7e8886f4e55c9644b Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 30 Apr 2024 13:50:08 +0100 Subject: [PATCH] Remove classic KubeCluster and HelmCluster --- .github/workflows/helmcluster.yaml | 57 -- .github/workflows/kubecluster.yaml | 57 -- dask_kubernetes/__init__.py | 24 +- dask_kubernetes/classic/__init__.py | 2 - dask_kubernetes/classic/kubecluster.py | 781 --------------- .../classic/tests/config-demo.yaml | 38 - dask_kubernetes/classic/tests/fake-ca-file | 0 dask_kubernetes/classic/tests/fake-cert-file | 0 dask_kubernetes/classic/tests/fake-key-file | 0 .../classic/tests/fake_gcp_auth.py | 25 - dask_kubernetes/classic/tests/test_async.py | 886 ------------------ dask_kubernetes/classic/tests/test_sync.py | 497 ---------- dask_kubernetes/common/auth.py | 493 ---------- dask_kubernetes/common/networking.py | 240 ----- dask_kubernetes/common/objects.py | 386 -------- dask_kubernetes/common/tests/test_kind.py | 25 - dask_kubernetes/common/tests/test_objects.py | 88 -- dask_kubernetes/common/utils.py | 47 - dask_kubernetes/conftest.py | 12 +- dask_kubernetes/experimental/__init__.py | 23 - dask_kubernetes/helm/__init__.py | 1 - dask_kubernetes/helm/helmcluster.py | 335 ------- .../helm/tests/resources/values.yaml | 20 - dask_kubernetes/helm/tests/test_helm.py | 241 ----- .../operator/controller/controller.py | 2 +- .../operator/kubecluster/kubecluster.py | 2 +- dask_kubernetes/operator/validation.py | 17 + doc/source/helmcluster.rst | 69 -- doc/source/index.rst | 26 +- doc/source/kubecluster.rst | 342 ------- doc/source/kubecluster_migrating.rst | 31 +- doc/source/operator_kubecluster.rst | 6 +- 32 files changed, 36 insertions(+), 4737 deletions(-) delete mode 100644 .github/workflows/helmcluster.yaml delete mode 100644 .github/workflows/kubecluster.yaml delete mode 100644 dask_kubernetes/classic/__init__.py delete mode 100644 dask_kubernetes/classic/kubecluster.py delete mode 100644 dask_kubernetes/classic/tests/config-demo.yaml delete mode 100644 dask_kubernetes/classic/tests/fake-ca-file delete mode 100644 dask_kubernetes/classic/tests/fake-cert-file delete mode 100644 dask_kubernetes/classic/tests/fake-key-file delete mode 100644 dask_kubernetes/classic/tests/fake_gcp_auth.py delete mode 100644 dask_kubernetes/classic/tests/test_async.py delete mode 100644 dask_kubernetes/classic/tests/test_sync.py delete mode 100644 dask_kubernetes/common/auth.py delete mode 100644 dask_kubernetes/common/networking.py delete mode 100644 dask_kubernetes/common/objects.py delete mode 100644 dask_kubernetes/common/tests/test_kind.py delete mode 100644 dask_kubernetes/common/tests/test_objects.py delete mode 100644 dask_kubernetes/common/utils.py delete mode 100644 dask_kubernetes/experimental/__init__.py delete mode 100644 dask_kubernetes/helm/__init__.py delete mode 100644 dask_kubernetes/helm/helmcluster.py delete mode 100644 dask_kubernetes/helm/tests/resources/values.yaml delete mode 100644 dask_kubernetes/helm/tests/test_helm.py create mode 100644 dask_kubernetes/operator/validation.py delete mode 100644 doc/source/helmcluster.rst delete mode 100644 doc/source/kubecluster.rst diff --git a/.github/workflows/helmcluster.yaml b/.github/workflows/helmcluster.yaml deleted file mode 100644 index 5e2f0ca70..000000000 --- a/.github/workflows/helmcluster.yaml +++ /dev/null @@ -1,57 +0,0 @@ -name: "HelmCluster" -on: - pull_request: - paths: - - ".github/workflows/helmcluster.yaml" - - "requirements*" - - "ci/**" - - "dask_kubernetes/helm/**" - - "dask_kubernetes/common/**" - - "dask_kubernetes/*" - push: - paths: - - ".github/workflows/helmcluster.yaml" - - "requirements*" - - "ci/**" - - "dask_kubernetes/helm/**" - - "dask_kubernetes/common/**" - - "dask_kubernetes/*" - -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} - cancel-in-progress: true - -jobs: - test: - runs-on: ubuntu-latest - timeout-minutes: 15 - strategy: - fail-fast: false - matrix: - python-version: ["3.9", "3.10", "3.11", "3.12"] - kubernetes-version: ["1.29.2"] - include: - - python-version: '3.10' - kubernetes-version: 1.28.7 - - python-version: '3.10' - kubernetes-version: 1.27.11 - - python-version: '3.10' - kubernetes-version: 1.26.14 - - env: - KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig - - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - name: Install deps - run: ./ci/install-deps.sh - - name: Run tests - env: - KUBERNETES_VERSION: ${{ matrix.kubernetes-version }} - run: pytest --reruns=5 dask_kubernetes/common/tests dask_kubernetes/helm/tests - - name: Debug k8s resources - if: success() || failure() - run: kubectl get all -A diff --git a/.github/workflows/kubecluster.yaml b/.github/workflows/kubecluster.yaml deleted file mode 100644 index f2e6d81dd..000000000 --- a/.github/workflows/kubecluster.yaml +++ /dev/null @@ -1,57 +0,0 @@ -name: "KubeCluster" -on: - pull_request: - paths: - - ".github/workflows/kubecluster.yaml" - - "requirements*" - - "ci/**" - - "dask_kubernetes/classic/**" - - "dask_kubernetes/common/**" - - "dask_kubernetes/*" - push: - paths: - - ".github/workflows/kubecluster.yaml" - - "requirements*" - - "ci/**" - - "dask_kubernetes/classic/**" - - "dask_kubernetes/common/**" - - "dask_kubernetes/*" - -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} - cancel-in-progress: true - -jobs: - test: - runs-on: ubuntu-latest - timeout-minutes: 45 - strategy: - fail-fast: false - matrix: - python-version: ["3.9", "3.10", "3.11", "3.12"] - kubernetes-version: ["1.29.2"] - include: - - python-version: '3.10' - kubernetes-version: 1.28.7 - - python-version: '3.10' - kubernetes-version: 1.27.11 - - python-version: '3.10' - kubernetes-version: 1.26.14 - - env: - KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig - - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - name: Install deps - run: ./ci/install-deps.sh - - name: Run tests - env: - KUBERNETES_VERSION: ${{ matrix.kubernetes-version }} - run: pytest --reruns=5 dask_kubernetes/common/tests dask_kubernetes/classic/tests - - name: Debug k8s resources - if: success() || failure() - run: kubectl get all -A diff --git a/dask_kubernetes/__init__.py b/dask_kubernetes/__init__.py index 09c37910c..e88ac7c69 100644 --- a/dask_kubernetes/__init__.py +++ b/dask_kubernetes/__init__.py @@ -1,20 +1,6 @@ -from importlib import import_module -from warnings import warn - from . import config -from .common.auth import ( - AutoRefreshConfiguration, - AutoRefreshKubeConfigLoader, - ClusterAuth, - InCluster, - KubeAuth, - KubeConfig, -) -from .common.objects import clean_pod_template, make_pod_from_dict, make_pod_spec -from .helm import HelmCluster - -__all__ = ["HelmCluster", "KubeCluster"] +__all__ = [] try: from ._version import version as __version__ # noqa @@ -22,11 +8,3 @@ except ImportError: __version__ = "0.0.0" __version_tuple__ = (0, 0, 0) - - -def __getattr__(name): - if name == "KubeCluster": - new_module = import_module("dask_kubernetes.classic") - return getattr(new_module, name) - - raise AttributeError(f"module {__name__} has no attribute {name}") diff --git a/dask_kubernetes/classic/__init__.py b/dask_kubernetes/classic/__init__.py deleted file mode 100644 index d1aaad827..000000000 --- a/dask_kubernetes/classic/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from ..common.objects import make_pod_spec -from .kubecluster import KubeCluster diff --git a/dask_kubernetes/classic/kubecluster.py b/dask_kubernetes/classic/kubecluster.py deleted file mode 100644 index 6f5b8a299..000000000 --- a/dask_kubernetes/classic/kubecluster.py +++ /dev/null @@ -1,781 +0,0 @@ -import asyncio -import copy -import getpass -import logging -import os -import time -import uuid -import warnings - -import aiohttp -import dask -import dask.distributed -import distributed.security -import kubernetes_asyncio as kubernetes -import yaml -from distributed.deploy import ProcessInterface, SpecCluster -from distributed.utils import Log, Logs, format_dashboard_link -from kubernetes_asyncio.client.rest import ApiException - -from ..common.auth import ClusterAuth -from ..common.networking import ( - get_external_address_for_scheduler_service, - get_scheduler_address, -) -from ..common.objects import ( - clean_pdb_template, - clean_pod_template, - clean_service_template, - make_pdb_from_dict, - make_pod_from_dict, - make_service_from_dict, -) -from ..common.utils import ( - escape, - get_current_namespace, -) - -logger = logging.getLogger(__name__) - -SCHEDULER_PORT = 8786 - - -class Pod(ProcessInterface): - """A superclass for Kubernetes Pods - - See Also - -------- - Worker - Scheduler - """ - - def __init__( - self, - cluster, - core_api, - policy_api, - pod_template, - namespace, - loop=None, - **kwargs - ): - self._pod = None - self.cluster = cluster - self.core_api = core_api - self.policy_api = policy_api - self.pod_template = copy.deepcopy(pod_template) - self.base_labels = self.pod_template.metadata.labels - self.namespace = namespace - self.name = None - self.loop = loop - self.kwargs = kwargs - super().__init__() - - @property - def cluster_name(self): - return self.pod_template.metadata.labels["dask.org/cluster-name"] - - async def start(self, **kwargs): - retry_count = 0 # Retry 10 times - while True: - try: - self._pod = await self.core_api.create_namespaced_pod( - self.namespace, self.pod_template - ) - return await super().start(**kwargs) - except ApiException as e: - if retry_count < 10: - logger.debug("Error when creating pod, retrying... - %s", str(e)) - await asyncio.sleep(0.1) - retry_count += 1 - else: - raise e - - async def close(self, **kwargs): - if self._pod: - retry_count = 0 # Retry 10 times - while True: - name, namespace = self._pod.metadata.name, self.namespace - try: - await self.core_api.delete_namespaced_pod(name, namespace) - return await super().close(**kwargs) - except ApiException as e: - if e.reason == "Not Found": - logger.debug( - "Pod %s in namespace %s has been deleted already.", - name, - namespace, - ) - return await super().close(**kwargs) - else: - raise - except aiohttp.client_exceptions.ClientConnectorError as e: - if retry_count < 10: - logger.debug("Connection error, retrying... - %s", str(e)) - await asyncio.sleep(0.1) - retry_count += 1 - else: - raise e - - async def logs(self): - try: - log = await self.core_api.read_namespaced_pod_log( - self._pod.metadata.name, - self.namespace, - container=self.pod_template.spec.containers[0].name, - ) - except ApiException as e: - if "waiting to start" in str(e): - log = "" - else: - raise e - return Log(log) - - async def describe_pod(self): - self._pod = await self.core_api.read_namespaced_pod( - self._pod.metadata.name, self.namespace - ) - return self._pod - - def __repr__(self): - return "" % (type(self).__name__, self.status) - - -class Worker(Pod): - """A Remote Dask Worker controled by Kubernetes - Parameters - ---------- - scheduler: str - The address of the scheduler - name (optional): - The name passed to the dask-worker CLI at creation time. - """ - - def __init__(self, scheduler: str, name=None, **kwargs): - super().__init__(**kwargs) - - self.scheduler = scheduler - - self.pod_template.metadata.labels["dask.org/component"] = "worker" - self.pod_template.spec.containers[0].env.append( - kubernetes.client.V1EnvVar( - name="DASK_SCHEDULER_ADDRESS", value=self.scheduler - ) - ) - if name is not None: - worker_name_args = ["--name", str(name)] - self.pod_template.spec.containers[0].args += worker_name_args - - -class Scheduler(Pod): - """A Remote Dask Scheduler controled by Kubernetes - Parameters - ---------- - idle_timeout: str, optional - The scheduler task will exit after this amount of time - if there are no requests from the client. Default is to - never timeout. - service_wait_timeout_s: int (optional) - Timeout, in seconds, to wait for the remote scheduler service to be ready. - Defaults to 30 seconds. - Set to 0 to disable the timeout (not recommended). - """ - - def __init__( - self, - idle_timeout: str, - service_wait_timeout_s: int = None, - service_name_retries: int = None, - **kwargs - ): - super().__init__(**kwargs) - self.cluster._log("Creating scheduler pod on cluster. This may take some time.") - self.service = None - self._idle_timeout = idle_timeout - self._service_wait_timeout_s = service_wait_timeout_s - self._service_name_retries = service_name_retries - if self._idle_timeout is not None: - self.pod_template.spec.containers[0].args += [ - "--idle-timeout", - self._idle_timeout, - ] - self.pdb = None - - async def start(self, **kwargs): - await super().start(**kwargs) - - while (await self.describe_pod()).status.phase == "Pending": - await asyncio.sleep(0.1) - - while self.address is None: - logs = await self.logs() - for line in logs.splitlines(): - if "Scheduler at:" in line: - self.address = line.split("Scheduler at:")[1].strip() - await asyncio.sleep(0.1) - - self.service = await self._create_service() - self.address = "tcp://{name}.{namespace}:{port}".format( - name=self.service.metadata.name, - namespace=self.namespace, - port=SCHEDULER_PORT, - ) - self.external_address = await get_external_address_for_scheduler_service( - self.core_api, - self.service, - service_name_resolution_retries=self._service_name_retries, - ) - - self.pdb = await self._create_pdb() - - async def close(self, **kwargs): - if self.service: - await self.core_api.delete_namespaced_service( - self.cluster_name, self.namespace - ) - if self.pdb: - await self.policy_api.delete_namespaced_pod_disruption_budget( - self.cluster_name, self.namespace - ) - await super().close(**kwargs) - - async def _create_service(self): - service_template_dict = dask.config.get("kubernetes.scheduler-service-template") - self.service_template = clean_service_template( - make_service_from_dict(service_template_dict) - ) - self.service_template.metadata.name = self.cluster_name - self.service_template.metadata.labels = copy.deepcopy(self.base_labels) - - self.service_template.spec.selector["dask.org/cluster-name"] = self.cluster_name - if self.service_template.spec.type is None: - self.service_template.spec.type = dask.config.get( - "kubernetes.scheduler-service-type" - ) - await self.core_api.create_namespaced_service( - self.namespace, self.service_template - ) - service = await self.core_api.read_namespaced_service( - self.cluster_name, self.namespace - ) - if service.spec.type == "LoadBalancer": - # Wait for load balancer to be assigned - start = time.time() - while service.status.load_balancer.ingress is None: - if ( - self._service_wait_timeout_s > 0 - and time.time() > start + self._service_wait_timeout_s - ): - raise asyncio.TimeoutError( - "Timed out waiting for Load Balancer to be provisioned." - ) - service = await self.core_api.read_namespaced_service( - self.cluster_name, self.namespace - ) - await asyncio.sleep(0.2) - return service - - async def _create_pdb(self): - pdb_template_dict = dask.config.get("kubernetes.scheduler-pdb-template") - self.pdb_template = clean_pdb_template(make_pdb_from_dict(pdb_template_dict)) - self.pdb_template.metadata.name = self.cluster_name - self.pdb_template.metadata.labels = copy.deepcopy(self.base_labels) - self.pdb_template.spec.selector.match_labels[ - "dask.org/cluster-name" - ] = self.cluster_name - await self.policy_api.create_namespaced_pod_disruption_budget( - self.namespace, self.pdb_template - ) - return await self.policy_api.read_namespaced_pod_disruption_budget( - self.cluster_name, self.namespace - ) - - -class KubeCluster(SpecCluster): - """Launch a Dask cluster on Kubernetes - - This starts a local Dask scheduler and then dynamically launches - Dask workers on a Kubernetes cluster. The Kubernetes cluster is taken - to be either the current one on which this code is running, or as a - fallback, the default one configured in a kubeconfig file. - - **Environments** - - Your worker pod image should have a similar environment to your local - environment, including versions of Python, dask, cloudpickle, and any - libraries that you may wish to use (like NumPy, Pandas, or Scikit-Learn). - See examples below for suggestions on how to manage and check for this. - - **Network** - - Since the Dask scheduler is launched locally, for it to work, we need to - be able to open network connections between this local node and all the - workers nodes on the Kubernetes cluster. If the current process is not - already on a Kubernetes node, some network configuration will likely be - required to make this work. - - **Resources** - - Your Kubernetes resource limits and requests should match the - ``--memory-limit`` and ``--nthreads`` parameters given to the - ``dask-worker`` command. - - Parameters - ---------- - pod_template: (kubernetes.client.V1Pod, dict, str) - A Kubernetes specification for a Pod for a dask worker. Can be either a - ``V1Pod``, a dict representation of a pod, or a path to a yaml file - containing a pod specification. - scheduler_pod_template: kubernetes.client.V1Pod (optional) - A Kubernetes specification for a Pod for a dask scheduler. - Defaults to the pod_template. - name: str (optional) - Name given to the pods. Defaults to ``dask-$USER-random`` - namespace: str (optional) - Namespace in which to launch the workers. - Defaults to current namespace if available or "default" - n_workers: int - Number of workers on initial launch. - Use ``scale`` to change this number in the future - env: Dict[str, str] - Dictionary of environment variables to pass to worker pod - host: str - Listen address for local scheduler. Defaults to 0.0.0.0 - port: int - Port of local scheduler - auth: List[ClusterAuth] (optional) - Configuration methods to attempt in order. Defaults to - ``[InCluster(), KubeConfig()]``. - idle_timeout: str (optional) - The scheduler task will exit after this amount of time - if there are no requests from the client. Default is to - never timeout. - scheduler_service_wait_timeout: int (optional) - Timeout, in seconds, to wait for the remote scheduler service to be ready. - Defaults to 30 seconds. - Set to 0 to disable the timeout (not recommended). - scheduler_service_name_resolution_retries: int (optional) - Number of retries to resolve scheduler service name when running - from within the Kubernetes cluster. - Defaults to 20. - Must be set to 1 or greater. - deploy_mode: str (optional) - Run the scheduler as "local" or "remote". - Defaults to ``"remote"``. - apply_default_affinity: str (optional) - Apply a default affinity to pods: "required", "preferred" or "none" - Defaults to ``"preferred"``. - **kwargs: dict - Additional keyword arguments to pass to SpecCluster - - Examples - -------- - >>> from dask_kubernetes.classic import KubeCluster, make_pod_spec - >>> pod_spec = make_pod_spec(image='ghcr.io/dask/dask:latest', - ... memory_limit='4G', memory_request='4G', - ... cpu_limit=1, cpu_request=1, - ... env={'EXTRA_PIP_PACKAGES': 'fastparquet git+https://github.com/dask/distributed'}) - >>> cluster = KubeCluster(pod_spec) - >>> cluster.scale(10) - - You can also create clusters with worker pod specifications as dictionaries - or stored in YAML files - - >>> cluster = KubeCluster('worker-template.yml') - >>> cluster = KubeCluster({...}) - - Rather than explicitly setting a number of workers you can also ask the - cluster to allocate workers dynamically based on current workload - - >>> cluster.adapt() - - You can pass this cluster directly to a Dask client - - >>> from dask.distributed import Client - >>> client = Client(cluster) - - You can verify that your local environment matches your worker environments - by calling ``client.get_versions(check=True)``. This will raise an - informative error if versions do not match. - - >>> client.get_versions(check=True) - - The ``ghcr.io/dask/dask`` docker images support ``EXTRA_PIP_PACKAGES``, - ``EXTRA_APT_PACKAGES`` and ``EXTRA_CONDA_PACKAGES`` environment variables - to help with small adjustments to the worker environments. We recommend - the use of pip over conda in this case due to a much shorter startup time. - These environment variables can be modified directly from the KubeCluster - constructor methods using the ``env=`` keyword. You may list as many - packages as you like in a single string like the following: - - >>> pip = 'pyarrow gcsfs git+https://github.com/dask/distributed' - >>> conda = '-c conda-forge scikit-learn' - >>> KubeCluster(..., env={'EXTRA_PIP_PACKAGES': pip, - ... 'EXTRA_CONDA_PACKAGES': conda}) - - You can also start a KubeCluster with no arguments *if* the worker template - is specified in the Dask config files, either as a full template in - ``kubernetes.worker-template`` or a path to a YAML file in - ``kubernetes.worker-template-path``. - - See https://docs.dask.org/en/latest/configuration.html for more - information about setting configuration values.:: - - $ export DASK_KUBERNETES__WORKER_TEMPLATE_PATH=worker_template.yaml - - >>> cluster = KubeCluster() # automatically finds 'worker_template.yaml' - - See Also - -------- - KubeCluster.adapt - """ - - def __init__( - self, - pod_template=None, - name=None, - namespace=None, - n_workers=None, - host=None, - port=None, - env=None, - auth=ClusterAuth.DEFAULT, - idle_timeout=None, - deploy_mode=None, - interface=None, - protocol=None, - dashboard_address=None, - security=None, - scheduler_service_wait_timeout=None, - scheduler_service_name_resolution_retries=None, - scheduler_pod_template=None, - apply_default_affinity="preferred", - **kwargs - ): - warnings.warn( - "The classic KubeCluster is going away. " - "Please migrate to the new operator based implementation " - "https://kubernetes.dask.org/en/latest/kubecluster_migrating.html. ", - DeprecationWarning, - stacklevel=2, - ) - if isinstance(pod_template, str): - with open(pod_template) as f: - pod_template = dask.config.expand_environment_variables( - yaml.safe_load(f) - ) - if isinstance(pod_template, dict): - pod_template = make_pod_from_dict(pod_template) - - if isinstance(scheduler_pod_template, str): - with open(scheduler_pod_template) as f: - scheduler_pod_template = dask.config.expand_environment_variables( - yaml.safe_load(f) - ) - if isinstance(scheduler_pod_template, dict): - scheduler_pod_template = make_pod_from_dict(scheduler_pod_template) - - self.pod_template = copy.deepcopy(pod_template) - self.scheduler_pod_template = copy.deepcopy(scheduler_pod_template) - self.apply_default_affinity = apply_default_affinity - self._generate_name = dask.config.get("kubernetes.name", override_with=name) - self.namespace = dask.config.get( - "kubernetes.namespace", override_with=namespace - ) - self._n_workers = dask.config.get( - "kubernetes.count.start", override_with=n_workers - ) - self._idle_timeout = dask.config.get( - "kubernetes.idle-timeout", override_with=idle_timeout - ) - self._deploy_mode = dask.config.get( - "kubernetes.deploy-mode", override_with=deploy_mode - ) - self._protocol = dask.config.get("kubernetes.protocol", override_with=protocol) - self._interface = dask.config.get( - "kubernetes.interface", override_with=interface - ) - self._dashboard_address = dask.config.get( - "kubernetes.dashboard_address", override_with=dashboard_address - ) - self._scheduler_service_wait_timeout = dask.config.get( - "kubernetes.scheduler-service-wait-timeout", - override_with=scheduler_service_wait_timeout, - ) - self._scheduler_service_name_resolution_retries = dask.config.get( - "kubernetes.scheduler-service-name-resolution-retries", - override_with=scheduler_service_name_resolution_retries, - ) - self.security = security - if self.security and not isinstance( - self.security, distributed.security.Security - ): - raise RuntimeError( - "Security object is not a valid distributed.security.Security object" - ) - self.host = dask.config.get("kubernetes.host", override_with=host) - self.port = dask.config.get("kubernetes.port", override_with=port) - self.env = dask.config.get("kubernetes.env", override_with=env) - self.auth = auth - self.kwargs = kwargs - super().__init__(**self.kwargs) - - @property - def dashboard_link(self): - host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0] - return format_dashboard_link(host, self.forwarded_dashboard_port) - - def _get_pod_template(self, pod_template, pod_type): - if not pod_template and dask.config.get( - "kubernetes.{}-template".format(pod_type), None - ): - d = dask.config.get("kubernetes.{}-template".format(pod_type)) - d = dask.config.expand_environment_variables(d) - pod_template = make_pod_from_dict(d) - - if not pod_template and dask.config.get( - "kubernetes.{}-template-path".format(pod_type), None - ): - import yaml - - fn = dask.config.get("kubernetes.{}-template-path".format(pod_type)) - fn = fn.format(**os.environ) - with open(fn) as f: - d = yaml.safe_load(f) - d = dask.config.expand_environment_variables(d) - pod_template = make_pod_from_dict(d) - return pod_template - - def _fill_pod_templates(self, pod_template, pod_type): - pod_template = copy.deepcopy(pod_template) - - # Default labels that can't be overwritten - pod_template.metadata.labels["dask.org/cluster-name"] = self._generate_name - pod_template.metadata.labels["dask.org/component"] = pod_type - pod_template.metadata.labels["user"] = escape(getpass.getuser()) - pod_template.metadata.labels["app"] = "dask" - pod_template.metadata.namespace = self.namespace - - if self.env: - pod_template.spec.containers[0].env.extend( - [ - kubernetes.client.V1EnvVar(name=k, value=str(v)) - for k, v in self.env.items() - ] - ) - pod_template.metadata.generate_name = self._generate_name - - return pod_template - - async def _start(self): - self.pod_template = self._get_pod_template(self.pod_template, pod_type="worker") - self.scheduler_pod_template = self._get_pod_template( - self.scheduler_pod_template, pod_type="scheduler" - ) - if not self.pod_template: - msg = ( - "Worker pod specification not provided. See KubeCluster " - "docstring for ways to specify workers" - ) - raise ValueError(msg) - - base_pod_template = self.pod_template - self.pod_template = clean_pod_template( - self.pod_template, - apply_default_affinity=self.apply_default_affinity, - pod_type="worker", - ) - - if not self.scheduler_pod_template: - self.scheduler_pod_template = base_pod_template - self.scheduler_pod_template.spec.containers[0].args = ["dask-scheduler"] - - self.scheduler_pod_template = clean_pod_template( - self.scheduler_pod_template, - apply_default_affinity=self.apply_default_affinity, - pod_type="scheduler", - ) - - await ClusterAuth.load_first(self.auth) - - self.core_api = kubernetes.client.CoreV1Api() - self.policy_api = kubernetes.client.PolicyV1Api() - - if self.namespace is None: - self.namespace = get_current_namespace() - - environ = {k: v for k, v in os.environ.items() if k not in ["user", "uuid"]} - self._generate_name = self._generate_name.format( - user=getpass.getuser(), uuid=str(uuid.uuid4())[:10], **environ - ) - self._generate_name = escape(self._generate_name) - - self.pod_template = self._fill_pod_templates( - self.pod_template, pod_type="worker" - ) - self.scheduler_pod_template = self._fill_pod_templates( - self.scheduler_pod_template, pod_type="scheduler" - ) - - common_options = { - "cluster": self, - "core_api": self.core_api, - "policy_api": self.policy_api, - "namespace": self.namespace, - "loop": self.loop, - } - - if self._deploy_mode == "local": - self.scheduler_spec = { - "cls": dask.distributed.Scheduler, - "options": { - "protocol": self._protocol, - "interface": self._interface, - "host": self.host, - "port": self.port, - "dashboard_address": self._dashboard_address, - "security": self.security, - }, - } - elif self._deploy_mode == "remote": - self.scheduler_spec = { - "cls": Scheduler, - "options": { - "idle_timeout": self._idle_timeout, - "service_wait_timeout_s": self._scheduler_service_wait_timeout, - "service_name_retries": self._scheduler_service_name_resolution_retries, - "pod_template": self.scheduler_pod_template, - **common_options, - }, - } - else: - raise RuntimeError("Unknown deploy mode %s" % self._deploy_mode) - - self.new_spec = { - "cls": Worker, - "options": {"pod_template": self.pod_template, **common_options}, - } - self.worker_spec = {i: self.new_spec for i in range(self._n_workers)} - - self.name = self.pod_template.metadata.generate_name - - await super()._start() - - if self._deploy_mode == "local": - self.forwarded_dashboard_port = self.scheduler.services["dashboard"].port - else: - dashboard_address = await get_scheduler_address( - self.scheduler.service.metadata.name, - self.namespace, - port_name="http-dashboard", - ) - self.forwarded_dashboard_port = dashboard_address.split(":")[-1] - - @classmethod - def from_dict(cls, pod_spec, **kwargs): - """Create cluster with worker pod spec defined by Python dictionary - - Deprecated, please use the `KubeCluster` constructor directly. - - Examples - -------- - >>> spec = { - ... 'metadata': {}, - ... 'spec': { - ... 'containers': [{ - ... 'args': ['dask-worker', '$(DASK_SCHEDULER_ADDRESS)', - ... '--nthreads', '1', - ... '--death-timeout', '60'], - ... 'command': None, - ... 'image': 'ghcr.io/dask/dask:latest', - ... 'name': 'dask-worker', - ... }], - ... 'restartPolicy': 'Never', - ... } - ... } - >>> cluster = KubeCluster.from_dict(spec, namespace='my-ns') # doctest: +SKIP - - See Also - -------- - KubeCluster.from_yaml - """ - warnings.warn( - "KubeCluster.from_dict is deprecated, use the constructor directly" - ) - return cls(pod_spec, **kwargs) - - @classmethod - def from_yaml(cls, yaml_path, **kwargs): - """Create cluster with worker pod spec defined by a YAML file - - Deprecated, please use the `KubeCluster` constructor directly. - - We can start a cluster with pods defined in an accompanying YAML file - like the following: - - .. code-block:: yaml - - kind: Pod - metadata: - labels: - foo: bar - baz: quux - spec: - containers: - - image: ghcr.io/dask/dask:latest - name: dask-worker - args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '2', --memory-limit, 8GB] - restartPolicy: Never - - Examples - -------- - >>> cluster = KubeCluster.from_yaml('pod.yaml', namespace='my-ns') # doctest: +SKIP - - See Also - -------- - KubeCluster.from_dict - """ - warnings.warn( - "KubeCluster.from_yaml is deprecated, use the constructor directly" - ) - return cls(yaml_path, **kwargs) - - def scale(self, n): - # A shim to maintain backward compatibility - # https://github.com/dask/distributed/issues/3054 - maximum = dask.config.get("kubernetes.count.max") - if maximum is not None and maximum < n: - logger.info( - "Tried to scale beyond maximum number of workers %d > %d", n, maximum - ) - n = maximum - return super().scale(n) - - async def _logs(self, scheduler=True, workers=True): - """Return logs for the scheduler and workers - Parameters - ---------- - scheduler : boolean - Whether or not to collect logs for the scheduler - workers : boolean or Iterable[str], optional - A list of worker addresses to select. - Defaults to all workers if `True` or no workers if `False` - Returns - ------- - logs: Dict[str] - A dictionary of logs, with one item for the scheduler and one for - each worker - """ - logs = Logs() - - if scheduler: - logs["Scheduler"] = await self.scheduler.logs() - - if workers: - worker_logs = await asyncio.gather( - *[w.logs() for w in self.workers.values()] - ) - for key, log in zip(self.workers, worker_logs): - logs[key] = log - - return logs diff --git a/dask_kubernetes/classic/tests/config-demo.yaml b/dask_kubernetes/classic/tests/config-demo.yaml deleted file mode 100644 index 262984b21..000000000 --- a/dask_kubernetes/classic/tests/config-demo.yaml +++ /dev/null @@ -1,38 +0,0 @@ -apiVersion: v1 -clusters: -- cluster: - certificate-authority: fake-ca-file - server: https://1.2.3.4 - name: development -- cluster: - insecure-skip-tls-verify: true - server: https://5.6.7.8 - name: scratch -contexts: -- context: - cluster: development - namespace: frontend - user: developer - name: dev-frontend -- context: - cluster: development - namespace: storage - user: developer - name: dev-storage -- context: - cluster: scratch - namespace: default - user: experimenter - name: exp-scratch -current-context: dev-frontend -kind: Config -preferences: {} -users: -- name: developer - user: - client-certificate: fake-cert-file - client-key: fake-key-file -- name: experimenter - user: - password: some-password - username: exp diff --git a/dask_kubernetes/classic/tests/fake-ca-file b/dask_kubernetes/classic/tests/fake-ca-file deleted file mode 100644 index e69de29bb..000000000 diff --git a/dask_kubernetes/classic/tests/fake-cert-file b/dask_kubernetes/classic/tests/fake-cert-file deleted file mode 100644 index e69de29bb..000000000 diff --git a/dask_kubernetes/classic/tests/fake-key-file b/dask_kubernetes/classic/tests/fake-key-file deleted file mode 100644 index e69de29bb..000000000 diff --git a/dask_kubernetes/classic/tests/fake_gcp_auth.py b/dask_kubernetes/classic/tests/fake_gcp_auth.py deleted file mode 100644 index bab976588..000000000 --- a/dask_kubernetes/classic/tests/fake_gcp_auth.py +++ /dev/null @@ -1,25 +0,0 @@ -import datetime -import json - -expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=5) -expiry.replace(tzinfo=datetime.timezone.utc) -expiry_str = expiry.isoformat("T") + "Z" - -fake_token = "0" * 137 -fake_id = "abcdefghijklmnopqrstuvwxyz.1234567890" * 37 + "." * 32 - -data = """ -{ - "credential": { - "access_token": "%s", - "id_token": "%s", - "token_expiry": "%s" - } -} -""" % ( - fake_token, - fake_id, - expiry_str, -) - -print(json.dumps(json.loads(data), indent=4)) diff --git a/dask_kubernetes/classic/tests/test_async.py b/dask_kubernetes/classic/tests/test_async.py deleted file mode 100644 index b6c2a6201..000000000 --- a/dask_kubernetes/classic/tests/test_async.py +++ /dev/null @@ -1,886 +0,0 @@ -import asyncio -import base64 -import getpass -import os -import random -import sys -from time import time - -import dask -import kubernetes_asyncio as kubernetes -import pytest -import yaml -from dask.distributed import Client, wait -from dask.utils import tmpfile -from distributed.utils_test import captured_logger - -import dask_kubernetes -from dask_kubernetes import ( - ClusterAuth, - KubeAuth, - KubeCluster, - KubeConfig, - clean_pod_template, - make_pod_spec, -) -from dask_kubernetes.common.utils import get_current_namespace -from dask_kubernetes.constants import KUBECLUSTER_CONTAINER_NAME - -TEST_DIR = os.path.abspath(os.path.join(__file__, "..")) -CONFIG_DEMO = os.path.join(TEST_DIR, "config-demo.yaml") -FAKE_CERT = os.path.join(TEST_DIR, "fake-cert-file") -FAKE_KEY = os.path.join(TEST_DIR, "fake-key-file") -FAKE_CA = os.path.join(TEST_DIR, "fake-ca-file") - - -@pytest.fixture -def pod_spec(docker_image): - yield clean_pod_template( - make_pod_spec( - image=docker_image, - extra_container_config={"imagePullPolicy": "IfNotPresent"}, - ) - ) - - -@pytest.fixture -def user_env(): - """The env var USER is not always set on non-linux systems.""" - if "USER" not in os.environ: - os.environ["USER"] = getpass.getuser() - yield - del os.environ["USER"] - else: - yield - - -cluster_kwargs = {"asynchronous": True} - - -@pytest.fixture -async def cluster(k8s_cluster, pod_spec): - async with KubeCluster(pod_spec, **cluster_kwargs) as cluster: - yield cluster - - -@pytest.fixture -async def remote_cluster(k8s_cluster, pod_spec): - async with KubeCluster(pod_spec, deploy_mode="remote", **cluster_kwargs) as cluster: - yield cluster - - -@pytest.fixture -async def client(cluster): - async with Client(cluster, asynchronous=True) as client: - yield client - - -@pytest.mark.anyio -async def test_fixtures(client): - """An initial test to get all the fixtures to run and check the cluster is usable.""" - assert client - - -@pytest.mark.anyio -async def test_versions(client): - await client.get_versions(check=True) - - -@pytest.mark.anyio -async def test_cluster_create(cluster): - cluster.scale(1) - await cluster - async with Client(cluster, asynchronous=True) as client: - result = await client.submit(lambda x: x + 1, 10) - assert result == 11 - - -@pytest.mark.anyio -async def test_basic(cluster, client): - cluster.scale(2) - future = client.submit(lambda x: x + 1, 10) - result = await future - assert result == 11 - - await client.wait_for_workers(2) - - # Ensure that inter-worker communication works well - futures = client.map(lambda x: x + 1, range(10)) - total = client.submit(sum, futures) - assert (await total) == sum(map(lambda x: x + 1, range(10))) - assert all((await client.has_what()).values()) - - -@pytest.mark.anyio -async def test_logs(remote_cluster): - cluster = remote_cluster - cluster.scale(2) - await cluster - - async with Client(cluster, asynchronous=True) as client: - await client.wait_for_workers(2) - - logs = await cluster.get_logs() - assert len(logs) == 4 - for _, log in logs.items(): - assert ( - "distributed.scheduler" in log - or "distributed.worker" in log - or "Creating scheduler pod" in log - ) - - -@pytest.mark.anyio -async def test_dask_worker_name_env_variable(k8s_cluster, pod_spec, user_env): - with dask.config.set({"kubernetes.name": "foo-{USER}-{uuid}"}): - async with KubeCluster(pod_spec, **cluster_kwargs) as cluster: - assert "foo-" + getpass.getuser() in cluster.name - - -@pytest.mark.anyio -async def test_diagnostics_link_env_variable(k8s_cluster, pod_spec, user_env): - pytest.importorskip("bokeh") - with dask.config.set({"distributed.dashboard.link": "foo-{USER}-{port}"}): - async with KubeCluster(pod_spec, asynchronous=True) as cluster: - port = cluster.forwarded_dashboard_port - - assert ( - "foo-" + getpass.getuser() + "-" + str(port) in cluster.dashboard_link - ) - - -@pytest.mark.skip(reason="Cannot run two closers locally as loadbalancer ports collide") -@pytest.mark.anyio -async def test_namespace(k8s_cluster, pod_spec): - async with KubeCluster(pod_spec, **cluster_kwargs) as cluster: - assert "dask" in cluster.name - assert getpass.getuser() in cluster.name - async with KubeCluster(pod_spec, **cluster_kwargs) as cluster2: - assert cluster.name != cluster2.name - - cluster2.scale(1) - while len(await cluster2.pods()) != 1: - await asyncio.sleep(0.1) - - -@pytest.mark.anyio -async def test_adapt(cluster): - cluster.adapt() - async with Client(cluster, asynchronous=True) as client: - future = client.submit(lambda x: x + 1, 10) - result = await future - assert result == 11 - - -@pytest.mark.xfail(reason="The widget has changed upstream") -@pytest.mark.anyio -async def test_ipython_display(cluster): - ipywidgets = pytest.importorskip("ipywidgets") - cluster.scale(1) - await cluster - cluster._ipython_display_() - box = cluster._cached_widget - assert isinstance(box, ipywidgets.Widget) - cluster._ipython_display_() - assert cluster._cached_widget is box - - start = time() - while "1" not in str(box): # one worker in a table - assert time() < start + 20 - await asyncio.sleep(0.5) - - -@pytest.mark.anyio -async def test_env(k8s_cluster, pod_spec): - async with KubeCluster(pod_spec, env={"ABC": "DEF"}, **cluster_kwargs) as cluster: - cluster.scale(1) - await cluster - async with Client(cluster, asynchronous=True) as client: - await client.wait_for_workers(1) - env = await client.run(lambda: dict(os.environ)) - assert all(v["ABC"] == "DEF" for v in env.values()) - - -@pytest.mark.anyio -async def test_pod_from_yaml(k8s_cluster, docker_image): - test_yaml = { - "kind": "Pod", - "metadata": {"labels": {"app": "dask", "component": "dask-worker"}}, - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - ], - "image": docker_image, - "imagePullPolicy": "IfNotPresent", - "name": KUBECLUSTER_CONTAINER_NAME, - } - ] - }, - } - - with tmpfile(extension="yaml") as fn: - with open(fn, mode="w") as f: - yaml.dump(test_yaml, f) - async with KubeCluster(f.name, **cluster_kwargs) as cluster: - cluster.scale(2) - await cluster - async with Client(cluster, asynchronous=True) as client: - future = client.submit(lambda x: x + 1, 10) - result = await future.result(timeout=30) - assert result == 11 - - await client.wait_for_workers(2) - - # Ensure that inter-worker communication works well - futures = client.map(lambda x: x + 1, range(10)) - total = client.submit(sum, futures) - assert (await total) == sum(map(lambda x: x + 1, range(10))) - assert all((await client.has_what()).values()) - - -@pytest.mark.anyio -async def test_pod_expand_env_vars(k8s_cluster, docker_image): - try: - os.environ["FOO_IMAGE"] = docker_image - - test_yaml = { - "kind": "Pod", - "metadata": {"labels": {"app": "dask", "component": "dask-worker"}}, - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - ], - "image": "${FOO_IMAGE}", - "imagePullPolicy": "IfNotPresent", - "name": KUBECLUSTER_CONTAINER_NAME, - } - ] - }, - } - - with tmpfile(extension="yaml") as fn: - with open(fn, mode="w") as f: - yaml.dump(test_yaml, f) - async with KubeCluster(f.name, **cluster_kwargs) as cluster: - assert cluster.pod_template.spec.containers[0].image == docker_image - finally: - del os.environ["FOO_IMAGE"] - - -@pytest.mark.anyio -async def test_pod_template_dict(docker_image): - spec = { - "metadata": {}, - "restartPolicy": "Never", - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - "--death-timeout", - "60", - ], - "command": None, - "image": docker_image, - "imagePullPolicy": "IfNotPresent", - "name": KUBECLUSTER_CONTAINER_NAME, - } - ] - }, - } - - async with KubeCluster(spec, port=32000, **cluster_kwargs) as cluster: - cluster.scale(2) - await cluster - async with Client(cluster, asynchronous=True) as client: - future = client.submit(lambda x: x + 1, 10) - result = await future - assert result == 11 - - await client.wait_for_workers(2) - - # Ensure that inter-worker communication works well - futures = client.map(lambda x: x + 1, range(10)) - total = client.submit(sum, futures) - assert (await total) == sum(map(lambda x: x + 1, range(10))) - assert all((await client.has_what()).values()) - - -@pytest.mark.anyio -async def test_pod_template_minimal_dict(k8s_cluster, docker_image): - spec = { - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - "--death-timeout", - "60", - ], - "command": None, - "image": docker_image, - "imagePullPolicy": "IfNotPresent", - "name": KUBECLUSTER_CONTAINER_NAME, - } - ] - } - } - - async with KubeCluster(spec, **cluster_kwargs) as cluster: - cluster.adapt() - async with Client(cluster, asynchronous=True) as client: - future = client.submit(lambda x: x + 1, 10) - result = await future - assert result == 11 - - -@pytest.mark.anyio -async def test_pod_template_from_conf(docker_image): - spec = { - "spec": { - "containers": [{"name": KUBECLUSTER_CONTAINER_NAME, "image": docker_image}] - } - } - - with dask.config.set({"kubernetes.worker-template": spec}): - async with KubeCluster(**cluster_kwargs) as cluster: - assert ( - cluster.pod_template.spec.containers[0].name - == KUBECLUSTER_CONTAINER_NAME - ) - - -@pytest.mark.anyio -async def test_pod_template_with_custom_container_name(docker_image): - container_name = "my-custom-container" - spec = {"spec": {"containers": [{"name": container_name, "image": docker_image}]}} - - with dask.config.set({"kubernetes.worker-template": spec}): - async with KubeCluster(**cluster_kwargs) as cluster: - assert cluster.pod_template.spec.containers[0].name == container_name - - -@pytest.mark.anyio -async def test_constructor_parameters(k8s_cluster, pod_spec): - env = {"FOO": "BAR", "A": 1} - async with KubeCluster( - pod_spec, name="myname", env=env, **cluster_kwargs - ) as cluster: - pod = cluster.pod_template - assert pod.metadata.namespace == get_current_namespace() - - var = [v for v in pod.spec.containers[0].env if v.name == "FOO"] - assert var and var[0].value == "BAR" - - var = [v for v in pod.spec.containers[0].env if v.name == "A"] - assert var and var[0].value == "1" - - assert pod.metadata.generate_name == "myname" - - -@pytest.mark.anyio -async def test_reject_evicted_workers(cluster): - cluster.scale(1) - await cluster - - start = time() - while len(cluster.scheduler_info["workers"]) != 1: - await asyncio.sleep(0.1) - assert time() < start + 60 - - # Evict worker - [worker] = cluster.workers.values() - await cluster.core_api.create_namespaced_pod_eviction( - (await worker.describe_pod()).metadata.name, - (await worker.describe_pod()).metadata.namespace, - kubernetes.client.V1Eviction( - delete_options=kubernetes.client.V1DeleteOptions(grace_period_seconds=300), - metadata=(await worker.describe_pod()).metadata, - ), - ) - - # Wait until worker removal has been picked up by scheduler - start = time() - while len(cluster.scheduler_info["workers"]) != 0: - delta = time() - start - assert delta < 60, f"Scheduler failed to remove worker in {delta:.0f}s" - await asyncio.sleep(0.1) - - # Wait until worker removal has been handled by cluster - while len(cluster.workers) != 0: - delta = time() - start - assert delta < 60, f"Cluster failed to remove worker in {delta:.0f}s" - await asyncio.sleep(0.1) - - -@pytest.mark.anyio -async def test_scale_up_down(cluster, client): - np = pytest.importorskip("numpy") - cluster.scale(2) - await cluster - - start = time() - while len(cluster.scheduler_info["workers"]) != 2: - await asyncio.sleep(0.1) - assert time() < start + 60 - - a, b = list(cluster.scheduler_info["workers"]) - x = client.submit(np.ones, 1, workers=a) - y = client.submit(np.ones, 50_000, workers=b) - - await wait([x, y]) - - cluster.scale(1) - await cluster - - start = time() - while len(cluster.scheduler_info["workers"]) != 1: - await asyncio.sleep(0.1) - assert time() < start + 60 - - # assert set(cluster.scheduler_info["workers"]) == {b} - - -@pytest.mark.xfail( - reason="The delay between scaling up, starting a worker, and then scale down causes issues" -) -@pytest.mark.anyio -async def test_scale_up_down_fast(cluster, client): - cluster.scale(1) - await cluster - - start = time() - await client.wait_for_workers(1) - - worker = next(iter(cluster.scheduler_info["workers"].values())) - - # Put some data on this worker - future = client.submit(lambda: b"\x00" * int(1e6)) - await wait(future) - assert worker in cluster.scheduler.tasks[future.key].who_has - - # Rescale the cluster many times without waiting: this should put some - # pressure on kubernetes but this should never fail nor delete our worker - # with the temporary result. - for i in range(10): - await cluster._scale_up(4) - await asyncio.sleep(random.random() / 2) - cluster.scale(1) - await asyncio.sleep(random.random() / 2) - - start = time() - while len(cluster.scheduler_info["workers"]) != 1: - await asyncio.sleep(0.1) - assert time() < start + 20 - - # The original task result is still stored on the original worker: this pod - # has never been deleted when rescaling the cluster and the result can - # still be fetched back. - assert worker in cluster.scheduler.tasks[future.key].who_has - assert len(await future) == int(1e6) - - -@pytest.mark.xfail(reason="scaling has some unfortunate state") -@pytest.mark.anyio -async def test_scale_down_pending(cluster, client, cleanup_namespaces): - # Try to scale the cluster to use more pods than available - nodes = (await cluster.core_api.list_node()).items - max_pods = sum(int(node.status.allocatable["pods"]) for node in nodes) - if max_pods > 50: - # It's probably not reasonable to run this test against a large - # kubernetes cluster. - pytest.skip("Require a small test kubernetes cluster (maxpod <= 50)") - extra_pods = 5 - requested_pods = max_pods + extra_pods - cluster.scale(requested_pods) - - start = time() - while len(cluster.scheduler_info["workers"]) < 2: - await asyncio.sleep(0.1) - # Wait a bit because the kubernetes cluster can take time to provision - # the requested pods as we requested a large number of pods. - assert time() < start + 60 - - pending_pods = [p for p in (await cluster.pods()) if p.status.phase == "Pending"] - assert len(pending_pods) >= extra_pods - - running_workers = list(cluster.scheduler_info["workers"].keys()) - assert len(running_workers) >= 2 - - # Put some data on those workers to make them important to keep as long - # as possible. - def load_data(i): - return b"\x00" * (i * int(1e6)) - - futures = [ - client.submit(load_data, i, workers=w) for i, w in enumerate(running_workers) - ] - await wait(futures) - - # Reduce the cluster size down to the actually useful nodes: pending pods - # and running pods without results should be shutdown and removed first: - cluster.scale(len(running_workers)) - - start = time() - pod_statuses = [p.status.phase for p in await cluster.pods()] - while len(pod_statuses) != len(running_workers): - if time() - start > 60: - raise AssertionError( - "Expected %d running pods but got %r" - % (len(running_workers), pod_statuses) - ) - await asyncio.sleep(0.1) - pod_statuses = [p.status.phase for p in await cluster.pods()] - - assert pod_statuses == ["Running"] * len(running_workers) - assert list(cluster.scheduler_info["workers"].keys()) == running_workers - - # Terminate everything - cluster.scale(0) - - start = time() - while len(cluster.scheduler_info["workers"]) > 0: - await asyncio.sleep(0.1) - assert time() < start + 60 - - -@pytest.mark.anyio -async def test_automatic_startup(k8s_cluster, docker_image): - test_yaml = { - "kind": "Pod", - "metadata": {"labels": {"foo": "bar"}}, - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - ], - "image": docker_image, - "name": KUBECLUSTER_CONTAINER_NAME, - } - ] - }, - } - - with tmpfile(extension="yaml") as fn: - with open(fn, mode="w") as f: - yaml.dump(test_yaml, f) - with dask.config.set({"kubernetes.worker-template-path": fn}): - async with KubeCluster(**cluster_kwargs) as cluster: - assert cluster.pod_template.metadata.labels["foo"] == "bar" - - -@pytest.mark.anyio -async def test_repr(cluster): - for text in [repr(cluster), str(cluster)]: - assert "Box" not in text - assert ( - cluster.scheduler.address in text - or cluster.scheduler.external_address in text - ) - - -@pytest.mark.anyio -async def test_escape_username(k8s_cluster, pod_spec, monkeypatch): - monkeypatch.setenv("LOGNAME", "Foo!._") - - async with KubeCluster(pod_spec, **cluster_kwargs) as cluster: - assert "foo" in cluster.name - assert "!" not in cluster.name - assert "." not in cluster.name - assert "_" not in cluster.name - assert "foo" in cluster.pod_template.metadata.labels["user"] - - -@pytest.mark.anyio -async def test_escape_name(k8s_cluster, pod_spec): - async with KubeCluster(pod_spec, name="foo@bar", **cluster_kwargs) as cluster: - assert "@" not in str(cluster.pod_template) - - -@pytest.mark.anyio -async def test_maximum(cluster): - with dask.config.set({"kubernetes.count.max": 1}): - with captured_logger("dask_kubernetes") as logger: - cluster.scale(10) - await cluster - - start = time() - while len(cluster.scheduler_info["workers"]) <= 0: - await asyncio.sleep(0.1) - assert time() < start + 60 - await asyncio.sleep(0.5) - while len(cluster.scheduler_info["workers"]) != 1: - await asyncio.sleep(0.1) - assert time() < start + 60 - - result = logger.getvalue() - assert "scale beyond maximum number of workers" in result.lower() - - -def test_default_toleration(pod_spec): - tolerations = pod_spec.to_dict()["spec"]["tolerations"] - assert { - "key": "k8s.dask.org/dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations - assert { - "key": "k8s.dask.org_dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations - - -def test_default_toleration_preserved(docker_image): - pod_spec = clean_pod_template( - make_pod_spec( - image=docker_image, - extra_pod_config={ - "tolerations": [ - { - "key": "example.org/toleration", - "operator": "Exists", - "effect": "NoSchedule", - } - ] - }, - ) - ) - tolerations = pod_spec.to_dict()["spec"]["tolerations"] - assert { - "key": "k8s.dask.org/dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations - assert { - "key": "k8s.dask.org_dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations - assert { - "key": "example.org/toleration", - "operator": "Exists", - "effect": "NoSchedule", - } in tolerations - - -@pytest.mark.anyio -async def test_auth_missing(k8s_cluster, pod_spec): - with pytest.raises(kubernetes.config.ConfigException) as info: - await KubeCluster(pod_spec, auth=[], **cluster_kwargs) - - assert "No authorization methods were provided" in str(info.value) - - -@pytest.mark.anyio -async def test_auth_tries_all_methods(k8s_cluster, pod_spec): - fails = {"count": 0} - - class FailAuth(ClusterAuth): - def load(self): - fails["count"] += 1 - raise kubernetes.config.ConfigException("Fail #{count}".format(**fails)) - - with pytest.raises(kubernetes.config.ConfigException) as info: - await KubeCluster(pod_spec, auth=[FailAuth()] * 3, **cluster_kwargs) - - assert "Fail #3" in str(info.value) - assert fails["count"] == 3 - - -@pytest.mark.xfail( - reason="Updating the default client configuration is broken in kubernetes" -) -@pytest.mark.anyio -async def test_auth_kubeconfig_with_filename(): - await KubeConfig(config_file=CONFIG_DEMO).load() - - # we've set the default configuration, so check that it is default - config = kubernetes.client.Configuration() - assert config.host == "https://1.2.3.4" - assert config.cert_file == FAKE_CERT - assert config.key_file == FAKE_KEY - assert config.ssl_ca_cert == FAKE_CA - - -@pytest.mark.xfail( - reason="Updating the default client configuration is broken in kubernetes" -) -@pytest.mark.anyio -async def test_auth_kubeconfig_with_context(): - await KubeConfig(config_file=CONFIG_DEMO, context="exp-scratch").load() - - # we've set the default configuration, so check that it is default - config = kubernetes.client.Configuration() - assert config.host == "https://5.6.7.8" - assert config.api_key["authorization"] == "Basic {}".format( - base64.b64encode(b"exp:some-password").decode("ascii") - ) - - -@pytest.mark.xfail( - reason="Updating the default client configuration is broken in async kubernetes" -) -@pytest.mark.anyio -async def test_auth_explicit(): - await KubeAuth( - host="https://9.8.7.6", username="abc", password="some-password" - ).load() - - config = kubernetes.client.Configuration() - assert config.host == "https://9.8.7.6" - assert config.username == "abc" - assert config.password == "some-password" - assert config.get_basic_auth_token() == "Basic {}".format( - base64.b64encode(b"abc:some-password").decode("ascii") - ) - - -@pytest.mark.anyio -async def test_start_with_workers(k8s_cluster, pod_spec): - async with KubeCluster(pod_spec, n_workers=2, **cluster_kwargs) as cluster: - async with Client(cluster, asynchronous=True) as client: - await client.wait_for_workers(2) - - -@pytest.mark.anyio -@pytest.mark.xfail(reason="Flaky in CI and classic is deprecated anyway") -async def test_adapt_delete(cluster, ns): - """ - testing whether KubeCluster.adapt will bring - back deleted worker pod (issue #244) - """ - core_api = cluster.core_api - - async def get_worker_pods(): - pods_list = await core_api.list_namespaced_pod( - namespace=ns, - label_selector=f"dask.org/component=worker,dask.org/cluster-name={cluster.name}", - ) - return [x.metadata.name for x in pods_list.items] - - cluster.adapt(maximum=2, minimum=2) - start = time() - while len(cluster.scheduler_info["workers"]) != 2: - await asyncio.sleep(0.1) - assert time() < start + 60 - - worker_pods = await get_worker_pods() - assert len(worker_pods) == 2 - # delete one worker pod - to_delete = worker_pods[0] - await core_api.delete_namespaced_pod(name=to_delete, namespace=ns) - # wait until it is deleted - start = time() - while True: - worker_pods = await get_worker_pods() - if to_delete not in worker_pods: - break - await asyncio.sleep(0.1) - assert time() < start + 60 - # test whether adapt will bring it back - start = time() - while len(cluster.scheduler_info["workers"]) != 2: - await asyncio.sleep(0.1) - assert time() < start + 60 - assert len(cluster.scheduler_info["workers"]) == 2 - - -@pytest.mark.anyio -@pytest.mark.xfail(reason="Failing in CI with FileNotFoundError") -async def test_auto_refresh(cluster): - config = { - "apiVersion": "v1", - "clusters": [ - { - "cluster": {"certificate-authority-data": "", "server": ""}, - "name": "mock_gcp_config", - } - ], - "contexts": [ - { - "context": { - "cluster": "mock_gcp_config", - "user": "mock_gcp_config", - }, - "name": "mock_gcp_config", - } - ], - "current-context": "mock_gcp_config", - "kind": "config", - "preferences": {}, - "users": [ - { - "name": "mock_gcp_config", - "user": { - "auth-provider": { - "config": { - "access-token": "", - "cmd-args": "--fake-arg arg", - "cmd-path": f"{sys.executable} {TEST_DIR}/fake_gcp_auth.py", - "expiry": "", - "expiry-key": "{.credential.token_expiry}", - "toekn-key": "{.credential.access_token}", - }, - "name": "gcp", - } - }, - } - ], - } - config_persister = False - - loader = dask_kubernetes.AutoRefreshKubeConfigLoader( - config_dict=config, - config_base_path=None, - config_persister=config_persister, - ) - - await loader.load_gcp_token() - # Check that we get back a token - assert loader.token == f"Bearer {'0' * 137}" - - next_expire = loader.token_expire_ts - for task in asyncio.all_tasks(): - if task.get_name() == "dask_auth_auto_refresh": - await asyncio.wait_for(task, 10) - - # Ensure that our token expiration timer was refreshed - assert loader.token_expire_ts > next_expire - - # Ensure refresh task was re-created - for task in asyncio.all_tasks(): - if task.get_name() == "dask_auth_auto_refresh": - loader.auto_refresh = False - await asyncio.wait_for(task, 60) - break - else: - assert False diff --git a/dask_kubernetes/classic/tests/test_sync.py b/dask_kubernetes/classic/tests/test_sync.py deleted file mode 100644 index ce7fed9d1..000000000 --- a/dask_kubernetes/classic/tests/test_sync.py +++ /dev/null @@ -1,497 +0,0 @@ -import os -from time import sleep, time - -import dask -import pytest -import yaml -from dask.distributed import Client, wait -from dask.utils import tmpfile -from distributed.utils_test import captured_logger - -from dask_kubernetes.classic import KubeCluster, make_pod_spec -from dask_kubernetes.constants import KUBECLUSTER_CONTAINER_NAME - -TEST_DIR = os.path.abspath(os.path.join(__file__, "..")) -CONFIG_DEMO = os.path.join(TEST_DIR, "config-demo.yaml") -FAKE_CERT = os.path.join(TEST_DIR, "fake-cert-file") -FAKE_KEY = os.path.join(TEST_DIR, "fake-key-file") -FAKE_CA = os.path.join(TEST_DIR, "fake-ca-file") - - -@pytest.fixture -def pod_spec(docker_image): - yield make_pod_spec( - image=docker_image, extra_container_config={"imagePullPolicy": "IfNotPresent"} - ) - - -@pytest.fixture -def cluster(pod_spec): - with KubeCluster(pod_spec) as cluster: - yield cluster - - -@pytest.fixture -def client(cluster): - with Client(cluster) as client: - yield client - - -def test_fixtures(client, cluster): - client.scheduler_info() - cluster.scale(1) - assert client.submit(lambda x: x + 1, 10).result() == 11 - - -def test_basic(cluster, client): - cluster.scale(2) - future = client.submit(lambda x: x + 1, 10) - result = future.result() - assert result == 11 - - while len(cluster.scheduler_info["workers"]) < 2: - sleep(0.1) - - # Ensure that inter-worker communication works well - futures = client.map(lambda x: x + 1, range(10)) - total = client.submit(sum, futures) - assert total.result() == sum(map(lambda x: x + 1, range(10))) - assert all(client.has_what().values()) - - -@pytest.mark.xfail(reason="The widget has changed upstream") -def test_ipython_display(cluster): - ipywidgets = pytest.importorskip("ipywidgets") - cluster.scale(1) - cluster._ipython_display_() - box = cluster._cached_widget - assert isinstance(box, ipywidgets.Widget) - cluster._ipython_display_() - assert cluster._cached_widget is box - - start = time() - while "1" not in str(box): # one worker in a table - assert time() < start + 20 - sleep(0.5) - - -def test_env(pod_spec): - with KubeCluster(pod_spec, env={"ABC": "DEF"}) as cluster: - cluster.scale(1) - with Client(cluster) as client: - while not cluster.scheduler_info["workers"]: - sleep(0.1) - env = client.run(lambda: dict(os.environ)) - assert all(v["ABC"] == "DEF" for v in env.values()) - - -def dont_test_pod_template_yaml(docker_image): - test_yaml = { - "kind": "Pod", - "metadata": {"labels": {"app": "dask", "component": "dask-worker"}}, - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - ], - "image": docker_image, - "imagePullPolicy": "IfNotPresent", - "name": KUBECLUSTER_CONTAINER_NAME, - } - ] - }, - } - - with tmpfile(extension="yaml") as fn: - with open(fn, mode="w") as f: - yaml.dump(test_yaml, f) - with KubeCluster(f.name) as cluster: - cluster.scale(2) - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - result = future.result(timeout=10) - assert result == 11 - - start = time() - while len(cluster.scheduler_info["workers"]) < 2: - sleep(0.1) - assert time() < start + 20, "timeout" - - # Ensure that inter-worker communication works well - futures = client.map(lambda x: x + 1, range(10)) - total = client.submit(sum, futures) - assert total.result() == sum(map(lambda x: x + 1, range(10))) - assert all(client.has_what().values()) - - -def test_pod_template_yaml_expand_env_vars(docker_image): - try: - os.environ["FOO_IMAGE"] = docker_image - - test_yaml = { - "kind": "Pod", - "metadata": {"labels": {"app": "dask", "component": "dask-worker"}}, - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - ], - "image": "${FOO_IMAGE}", - "imagePullPolicy": "IfNotPresent", - "name": KUBECLUSTER_CONTAINER_NAME, - } - ] - }, - } - - with tmpfile(extension="yaml") as fn: - with open(fn, mode="w") as f: - yaml.dump(test_yaml, f) - with KubeCluster(f.name) as cluster: - assert cluster.pod_template.spec.containers[0].image == docker_image - finally: - del os.environ["FOO_IMAGE"] - - -def test_pod_template_dict(docker_image): - spec = { - "metadata": {}, - "restartPolicy": "Never", - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - "--death-timeout", - "60", - ], - "command": None, - "image": docker_image, - "imagePullPolicy": "IfNotPresent", - "name": KUBECLUSTER_CONTAINER_NAME, - } - ] - }, - } - - with KubeCluster(spec) as cluster: - cluster.scale(2) - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - result = future.result() - assert result == 11 - - while len(cluster.scheduler_info["workers"]) < 2: - sleep(0.1) - - # Ensure that inter-worker communication works well - futures = client.map(lambda x: x + 1, range(10)) - total = client.submit(sum, futures) - assert total.result() == sum(map(lambda x: x + 1, range(10))) - assert all(client.has_what().values()) - - -def test_pod_template_minimal_dict(docker_image): - spec = { - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - "--death-timeout", - "60", - ], - "command": None, - "image": docker_image, - "imagePullPolicy": "IfNotPresent", - "name": KUBECLUSTER_CONTAINER_NAME, - } - ] - } - } - - with KubeCluster(spec) as cluster: - cluster.adapt() - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - result = future.result() - assert result == 11 - - -def test_worker_pod_template_spec_are_copied(docker_image): - worker_spec = make_pod_spec(docker_image) - worker_spec.spec.containers[0].args[0] = "fake-worker-cmd" - - with KubeCluster(pod_template=worker_spec): - assert worker_spec.spec.containers[0].args[0] == "fake-worker-cmd" - - -def test_scheduler_pod_template_spec_are_copied(docker_image): - scheduler_spec = make_pod_spec(docker_image) - scheduler_spec.spec.containers[0].args[0] = "fake-scheduler-cmd" - - with KubeCluster( - pod_template=make_pod_spec(docker_image), scheduler_pod_template=scheduler_spec - ): - assert scheduler_spec.spec.containers[0].args[0] == "fake-scheduler-cmd" - - -def test_pod_template_from_conf(docker_image): - spec = { - "spec": { - "containers": [{"name": KUBECLUSTER_CONTAINER_NAME, "image": docker_image}] - } - } - - with dask.config.set({"kubernetes.worker-template": spec}): - with KubeCluster() as cluster: - assert ( - cluster.pod_template.spec.containers[0].name - == KUBECLUSTER_CONTAINER_NAME - ) - - -def test_pod_template_with_custom_container_name(docker_image): - container_name = "my-custom-container" - spec = {"spec": {"containers": [{"name": container_name, "image": docker_image}]}} - - with dask.config.set({"kubernetes.worker-template": spec}): - with KubeCluster() as cluster: - assert cluster.pod_template.spec.containers[0].name == container_name - - -def test_bad_args(): - with pytest.raises(FileNotFoundError): - KubeCluster("myfile.yaml") - - with pytest.raises((ValueError, TypeError, AttributeError)): - KubeCluster({"kind": "Pod"}) - - -def test_constructor_parameters(pod_spec): - env = {"FOO": "BAR", "A": 1} - with KubeCluster(pod_spec, name="myname", env=env) as cluster: - pod = cluster.pod_template - - var = [v for v in pod.spec.containers[0].env if v.name == "FOO"] - assert var and var[0].value == "BAR" - - var = [v for v in pod.spec.containers[0].env if v.name == "A"] - assert var and var[0].value == "1" - - assert pod.metadata.generate_name == "myname" - - -def test_scale_up_down(cluster, client): - np = pytest.importorskip("numpy") - cluster.scale(2) - - start = time() - while len(cluster.scheduler_info["workers"]) != 2: - sleep(0.1) - assert time() < start + 30 - - a, b = list(cluster.scheduler_info["workers"]) - x = client.submit(np.ones, 1, workers=a) - y = client.submit(np.ones, 50_000, workers=b) - - wait([x, y]) - - # start = time() - # while ( - # cluster.scheduler_info["workers"][a].metrics["memory"] - # > cluster.scheduler_info["workers"][b].metrics["memory"] - # ): - # sleep(0.1) - # assert time() < start + 1 - - cluster.scale(1) - - start = time() - while len(cluster.scheduler_info["workers"]) != 1: - sleep(0.1) - assert time() < start + 60 - - # assert set(cluster.scheduler_info["workers"]) == {b} - - -def test_automatic_startup(docker_image): - test_yaml = { - "kind": "Pod", - "metadata": {"labels": {"foo": "bar"}}, - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - ], - "image": docker_image, - "name": KUBECLUSTER_CONTAINER_NAME, - } - ] - }, - } - - with tmpfile(extension="yaml") as fn: - with open(fn, mode="w") as f: - yaml.dump(test_yaml, f) - with dask.config.set({"kubernetes.worker-template-path": fn}): - with KubeCluster() as cluster: - assert cluster.pod_template.metadata.labels["foo"] == "bar" - - -def test_repr(cluster): - for text in [repr(cluster), str(cluster)]: - assert "Box" not in text - assert ( - cluster.scheduler.address in text - or cluster.scheduler.external_address in text - ) - assert "workers=0" in text - - -def test_escape_username(pod_spec, monkeypatch): - monkeypatch.setenv("LOGNAME", "Foo!") - - with KubeCluster(pod_spec) as cluster: - assert "foo" in cluster.name - assert "!" not in cluster.name - assert "foo" in cluster.pod_template.metadata.labels["user"] - - -def test_escape_name(pod_spec): - with KubeCluster(pod_spec, name="foo@bar") as cluster: - assert "@" not in str(cluster.pod_template) - - -def test_maximum(cluster): - with dask.config.set({"kubernetes.count.max": 1}): - with captured_logger("dask_kubernetes") as logger: - cluster.scale(10) - - start = time() - while len(cluster.scheduler_info["workers"]) <= 0: - sleep(0.1) - assert time() < start + 60 - - sleep(0.5) - assert len(cluster.scheduler_info["workers"]) == 1 - - result = logger.getvalue() - assert "scale beyond maximum number of workers" in result.lower() - - -def test_extra_pod_config(docker_image): - """ - Test that our pod config merging process works fine - """ - with KubeCluster( - make_pod_spec( - docker_image, extra_pod_config={"automountServiceAccountToken": False} - ), - n_workers=0, - ) as cluster: - - pod = cluster.pod_template - - assert pod.spec.automount_service_account_token is False - - -def test_extra_container_config(docker_image): - """ - Test that our container config merging process works fine - """ - with KubeCluster( - make_pod_spec( - docker_image, - extra_container_config={ - "imagePullPolicy": "IfNotPresent", - "securityContext": {"runAsUser": 0}, - }, - ), - n_workers=0, - ) as cluster: - - pod = cluster.pod_template - - assert pod.spec.containers[0].image_pull_policy == "IfNotPresent" - assert pod.spec.containers[0].security_context == {"runAsUser": 0} - - -def test_container_resources_config(docker_image): - """ - Test container resource requests / limits being set properly - """ - with KubeCluster( - make_pod_spec( - docker_image, memory_request="0.5G", memory_limit="1G", cpu_limit="1" - ), - n_workers=0, - ) as cluster: - - pod = cluster.pod_template - - assert pod.spec.containers[0].resources.requests["memory"] == "0.5G" - assert pod.spec.containers[0].resources.limits["memory"] == "1G" - assert pod.spec.containers[0].resources.limits["cpu"] == "1" - assert "cpu" not in pod.spec.containers[0].resources.requests - - -def test_extra_container_config_merge(docker_image): - """ - Test that our container config merging process works recursively fine - """ - with KubeCluster( - make_pod_spec( - docker_image, - env={"TEST": "HI"}, - extra_container_config={ - "env": [{"name": "BOO", "value": "FOO"}], - "args": ["last-item"], - }, - ), - n_workers=0, - ) as cluster: - - pod = cluster.pod_template - - for e in [{"name": "TEST", "value": "HI"}, {"name": "BOO", "value": "FOO"}]: - assert e in pod.spec.containers[0].env - - assert pod.spec.containers[0].args[-1] == "last-item" - - -def test_worker_args(docker_image): - """ - Test that dask-worker arguments are added to the container args - """ - with KubeCluster( - make_pod_spec( - docker_image, - memory_limit="5000M", - resources="FOO=1 BAR=2", - ), - n_workers=0, - ) as cluster: - - pod = cluster.pod_template - - for arg in ["--memory-limit", "5000M", "--resources", "FOO=1 BAR=2"]: - assert arg in pod.spec.containers[0].args diff --git a/dask_kubernetes/common/auth.py b/dask_kubernetes/common/auth.py deleted file mode 100644 index 1ebdff89f..000000000 --- a/dask_kubernetes/common/auth.py +++ /dev/null @@ -1,493 +0,0 @@ -""" -Defines different methods to configure a connection to a Kubernetes cluster. -""" -import asyncio -import base64 -import contextlib -import copy -import datetime -import json -import logging -import os - -import kubernetes -import kubernetes_asyncio -from kubernetes_asyncio.client import Configuration -from kubernetes_asyncio.config.dateutil import parse_rfc3339 -from kubernetes_asyncio.config.google_auth import google_auth_credentials -from kubernetes_asyncio.config.kube_config import KubeConfigLoader, KubeConfigMerger - -logger = logging.getLogger(__name__) - -tzUTC = datetime.timezone.utc - - -class AutoRefreshKubeConfigLoader(KubeConfigLoader): - """ - Extends KubeConfigLoader, automatically attempts to refresh authentication - credentials before they expire. - """ - - def __init__(self, *args, **kwargs): - super(AutoRefreshKubeConfigLoader, self).__init__(*args, **kwargs) - - self._retry_count = 0 - self._max_retries = float("Inf") - self.auto_refresh = True - self.refresh_task = None - self.last_refreshed = None - self.token_expire_ts = None - - def __del__(self): - self.auto_refresh = False - - def extract_oid_expiration_from_provider(self, provider): - """ - Extracts the expiration datestamp for the provider token - Parameters - ---------- - provider : authentication provider dictionary. - - Returns - ------- - expires : expiration timestamp - """ - parts = provider["config"]["id-token"].split(".") - - if len(parts) != 3: - raise ValueError("oidc: JWT tokens should contain 3 period-delimited parts") - - id_token = parts[1] - # Re-pad the unpadded JWT token - id_token += (4 - len(id_token) % 4) * "=" - jwt_attributes = json.loads(base64.b64decode(id_token).decode("utf8")) - expires = jwt_attributes.get("exp") - - return expires - - async def create_refresh_task_from_expiration_timestamp(self, expiration_timestamp): - """ - Takes an expiration timestamp, and creates a refresh task to ensure that the token - does not expire. - - Parameters - ---------- - expiration_timestamp : time at which the current authentication token will expire - - Returns - ------- - N/A - """ - # Set our token expiry to be actual expiry - 20% - expiry = parse_rfc3339(expiration_timestamp) - expiry_delta = datetime.timedelta( - seconds=(expiry - datetime.datetime.now(tz=tzUTC)).total_seconds() - ) - scaled_expiry_delta = datetime.timedelta( - seconds=0.8 * expiry_delta.total_seconds() - ) - - self.refresh_task = asyncio.create_task( - self.refresh_after( - when=scaled_expiry_delta.total_seconds(), reschedule_on_failure=True - ), - name="dask_auth_auto_refresh", - ) - - self.last_refreshed = datetime.datetime.now(tz=tzUTC) - self.token_expire_ts = self.last_refreshed + scaled_expiry_delta - - async def refresh_after(self, when, reschedule_on_failure=False): - """ - Refresh kuberenetes authentication - Parameters - ---------- - when : Seconds before we should refresh. This should be set to some delta before - the actual token expiration time, or you will likely see authentication race - / failure conditions. - - reschedule_on_failure : If the refresh task fails, re-try in 30 seconds, until - _max_retries is exceeded, then raise an exception. - """ - - if not self.auto_refresh: - return - - logger.debug( - msg=f"Refresh_at coroutine sleeping for " - f"{int(when // 60)} minutes {(when % 60):0.2f} seconds." - ) - try: - await asyncio.sleep(when) - if self.provider == "gcp": - await self.refresh_gcp_token() - elif self.provider == "oidc": - await self.refresh_oid_token() - return - elif "exec" in self._user: - logger.warning(msg="Auto-refresh doesn't support generic ExecProvider") - return - - except Exception as e: - logger.warning( - msg=f"Authentication refresh failed for provider '{self.provider}.'", - exc_info=e, - ) - if not reschedule_on_failure or self._retry_count > self._max_retries: - raise - - logger.warning(msg=f"Retrying '{self.provider}' in 30 seconds.") - self._retry_count += 1 - self.refresh_task = asyncio.create_task(self.refresh_after(30)) - - async def refresh_oid_token(self): - """ - Adapted from kubernetes_asyncio/config/kube_config:_load_oid_token - - Refreshes the existing oid token, if necessary, and creates a refresh task - that will keep the token from expiring. - - Returns - ------- - """ - provider = self._user["auth-provider"] - - logger.debug("Refreshing OID token.") - - if "config" not in provider: - raise ValueError("oidc: missing configuration") - - if (not self.token_expire_ts) or ( - self.token_expire_ts <= datetime.datetime.now(tz=tzUTC) - ): - await self._refresh_oidc(provider) - expires = datetime.datetime.fromtimestamp( - self.extract_oid_expiration_from_provider(provider=provider) - ) - - await self.create_refresh_task_from_expiration_timestamp( - expiration_timestamp=expires - ) - - self.token = "Bearer {}".format(provider["config"]["id-token"]) - - async def refresh_gcp_token(self): - """ - Adapted from kubernetes_asyncio/config/kube_config:load_gcp_token - - Refreshes the existing gcp token, if necessary, and creates a refresh task - that will keep the token from expiring. - - Returns - ------- - """ - if "config" not in self._user["auth-provider"]: - self._user["auth-provider"].value["config"] = {} - - config = self._user["auth-provider"]["config"] - - if (not self.token_expire_ts) or ( - self.token_expire_ts <= datetime.datetime.now(tz=tzUTC) - ): - - logger.debug("Refreshing GCP token.") - if self._get_google_credentials is not None: - if asyncio.iscoroutinefunction(self._get_google_credentials): - credentials = await self._get_google_credentials() - else: - credentials = self._get_google_credentials() - else: - # config is read-only. - extra_args = " --force-auth-refresh" - _config = { - "cmd-args": config["cmd-args"] + extra_args, - "cmd-path": config["cmd-path"], - } - credentials = await google_auth_credentials(_config) - - config.value["access-token"] = credentials.token - config.value["expiry"] = credentials.expiry - - # Set our token expiry to be actual expiry - 20% - await self.create_refresh_task_from_expiration_timestamp( - expiration_timestamp=config.value["expiry"] - ) - - if self._config_persister: - self._config_persister(self._config.value) - - self.token = "Bearer %s" % config["access-token"] - - async def _load_oid_token(self): - """ - Overrides KubeConfigLoader implementation. - Returns - ------- - Auth token - """ - await self.refresh_oid_token() - - return self.token - - async def load_gcp_token(self): - """ - Override KubeConfigLoader implementation so that we can keep track of the expiration timestamp - and automatically refresh auth tokens. - - Returns - ------- - GCP access token - """ - await self.refresh_gcp_token() - - return self.token - - -class AutoRefreshConfiguration(Configuration): - """ - Extends kubernetes_async Configuration to support automatic token refresh. - Lets us keep track of the original loader object, which can be used - to regenerate the authentication token. - """ - - def __init__(self, loader, refresh_frequency=None, *args, **kwargs): - super(AutoRefreshConfiguration, self).__init__(*args, **kwargs) - - # Set refresh api callback - self.refresh_api_key_hook = self.refresh_api_key - self.last_refreshed = datetime.datetime.now(tz=tzUTC) - self.loader = loader - - # Adapted from kubernetes_asyncio/client/configuration.py:__deepcopy__ - def __deepcopy__(self, memo): - """ - Modified so that we don't try to deep copy the loader off the config - """ - cls = self.__class__ - result = cls.__new__(cls) - memo[id(self)] = result - for k, v in self.__dict__.items(): - if k not in ("logger", "logger_file_handler", "loader"): - setattr(result, k, copy.deepcopy(v, memo)) - - # shallow copy loader object - result.loader = self.loader - # shallow copy of loggers - result.logger = copy.copy(self.logger) - # use setters to configure loggers - result.logger_file = self.logger_file - result.debug = self.debug - - return result - - def refresh_api_key(self, client_configuration): - """ - Checks to see if the loader has updated the authentication token. If it - has, the token is copied from the loader into the current configuration. - - This function is assigned to Configuration.refresh_api_key_hook, and will - fire when entering get_api_key_with_prefix, before the api_key is retrieved. - """ - if self.loader.last_refreshed is not None: - if ( - self.last_refreshed is None - or self.last_refreshed < self.loader.last_refreshed - ): - logger.debug("Entering refresh_api_key_hook") - client_configuration.api_key[ - "authorization" - ] = client_configuration.loader.token - self.last_refreshed = datetime.datetime.now(tz=tzUTC) - - -class ClusterAuth(object): - """ - An abstract base class for methods for configuring a connection to a - Kubernetes API server. - - Examples - -------- - >>> from dask_kubernetes import KubeConfig - >>> auth = KubeConfig(context='minikube') - - >>> from dask_kubernetes import KubeAuth - >>> auth = KubeAuth(host='https://localhost', username='superuser', password='pass') - - """ - - async def load(self): - """ - Load Kubernetes configuration and set as default - - Raises - ------ - - kubernetes.client.KubeConfigException - """ - raise NotImplementedError() - - @staticmethod - async def load_first(auth=None): - """ - Load the first valid configuration in the list *auth*. A single - configuration method can be passed. - - Parameters - ---------- - auth: List[ClusterAuth] (optional) - Configuration methods to attempt in order. Defaults to - ``[InCluster(), KubeConfig()]``. - - """ - if auth is None: - auth = ClusterAuth.DEFAULT - elif isinstance(auth, ClusterAuth): - auth = [auth] - elif isinstance(auth, list): - if not auth: - raise kubernetes_asyncio.config.ConfigException( - "No authorization methods were provided" - ) - else: - msg = ( - "Invalid authorization method provided. See ClusterAuth " - "docstring for ways to specify authentication methods" - ) - raise ValueError(msg) - - auth_exc = None - for auth_instance in auth: - try: - await auth_instance.load() - except ( - kubernetes_asyncio.config.ConfigException, - kubernetes.config.ConfigException, - ) as exc: - logger.debug( - "Failed to load configuration with %s method: %s", - auth_instance.__class__, - exc, - ) - auth_exc = exc - else: - break - else: - raise auth_exc - - -class InCluster(ClusterAuth): - """Configure the Kubernetes connection from a container's environment. - - This authentication method is intended for use when the client is running - in a container started by Kubernetes with an authorized service account. - This loads the mounted service account token and discovers the Kubernetes - API via Kubernetes service discovery. - """ - - async def load(self): - kubernetes.config.load_incluster_config() - kubernetes_asyncio.config.load_incluster_config() - - -class KubeConfig(ClusterAuth): - """Configure the Kubernetes connection from a kubeconfig file. - - Parameters - ---------- - config_file: str (optional) - The path of the kubeconfig file to load. Defaults to the value of the - ``KUBECONFIG`` environment variable, or the string ``~/.kube/config``. - context: str (optional) - The kubeconfig context to use. Defaults to the value of ``current-context`` - in the configuration file. - persist_config: bool (optional) - Whether changes to the configuration will be saved back to disk (e.g. - GCP token refresh). Defaults to ``True``. - - """ - - def __init__(self, config_file=None, context=None, persist_config=True): - self.config_file = config_file - self.context = context - self.persist_config = persist_config - - async def load(self): - with contextlib.suppress(KeyError): - if self.config_file is None: - self.config_file = os.path.abspath( - os.path.expanduser(os.environ.get("KUBECONFIG", "~/.kube/config")) - ) - - await self.load_kube_config() - - # Adapted from from kubernetes_asyncio/config/kube_config.py:get_kube_config_loader_for_yaml_file - def get_kube_config_loader_for_yaml_file(self): - kcfg = KubeConfigMerger(self.config_file) - config_persister = None - if self.persist_config: - config_persister = kcfg.save_changes() - - return AutoRefreshKubeConfigLoader( - config_dict=kcfg.config, - config_base_path=None, - config_persister=config_persister, - ) - - # Adapted from kubernetes_asyncio/config/kube_config.py:load_kube_config - async def load_kube_config(self): - # Create a config loader, this will automatically refresh our credentials before they expire - loader = self.get_kube_config_loader_for_yaml_file() - - # Grab our async + callback aware configuration - config = AutoRefreshConfiguration(loader) - - await loader.load_and_set(config) - Configuration.set_default(config) - - -class KubeAuth(ClusterAuth): - """Configure the Kubernetes connection explicitly. - - Parameters - ---------- - host: str - The base URL of the Kubernetes host to connect - username: str (optional) - Username for HTTP basic authentication - password: str (optional) - Password for HTTP basic authentication - debug: bool (optional) - Debug switch - verify_ssl: bool (optional) - Set this to false to skip verifying SSL certificate when calling API - from https server. Defaults to ``True``. - ssl_ca_cert: str (optional) - Set this to customize the certificate file to verify the peer. - cert_file: str (optional) - Client certificate file - key_file: str (optional) - Client key file - assert_hostname: bool (optional) - Set this to True/False to enable/disable SSL hostname verification. - Defaults to True. - proxy: str (optional) - URL for a proxy to connect through - """ - - def __init__(self, host, **kwargs): - # We need to create a new configuration in this way, because if we just - # instantiate a new Configuration object we will get the default - # values. - config = type.__call__(kubernetes.client.Configuration) - config.host = host - - for key, value in kwargs.items(): - setattr(config, key, value) - self.config = config - - async def load(self): - kubernetes.client.Configuration.set_default(self.config) - await kubernetes_asyncio.client.Configuration.set_default(self.config) - - -ClusterAuth.DEFAULT = [InCluster(), KubeConfig()] diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py deleted file mode 100644 index b789c2c06..000000000 --- a/dask_kubernetes/common/networking.py +++ /dev/null @@ -1,240 +0,0 @@ -import asyncio -import random -import socket -import subprocess -import time -from contextlib import suppress -from weakref import finalize - -import kr8s -import kubernetes_asyncio as kubernetes -from distributed.core import rpc -from kr8s.asyncio.objects import Pod -from tornado.iostream import StreamClosedError - -from dask_kubernetes.common.utils import check_dependency -from dask_kubernetes.exceptions import CrashLoopBackOffError - - -async def get_internal_address_for_scheduler_service( - service, - port_forward_cluster_ip=None, - service_name_resolution_retries=20, - port_name="tcp-comm", - local_port=None, -): - """Take a service object and return the scheduler address.""" - port = _get_port(service, port_name) - if not port_forward_cluster_ip: - with suppress(socket.gaierror): - # Try to resolve the service name. If we are inside the cluster this should succeed. - host = f"{service.metadata.name}.{service.metadata.namespace}" - if await _is_service_available( - host=host, port=port, retries=service_name_resolution_retries - ): - return f"tcp://{host}:{port}" - - # If the service name is unresolvable, we are outside the cluster and we need to port forward the service. - host = "localhost" - - port = await port_forward_service( - service.metadata.name, service.metadata.namespace, port, local_port - ) - return f"tcp://{host}:{port}" - - -async def get_external_address_for_scheduler_service( - core_api, - service, - port_forward_cluster_ip=None, - service_name_resolution_retries=20, - port_name="tcp-comm", - local_port=None, -): - """Take a service object and return the scheduler address.""" - if service.spec.type == "LoadBalancer": - port = _get_port(service, port_name) - lb = service.status.load_balancer.ingress[0] - host = lb.hostname or lb.ip - elif service.spec.type == "NodePort": - port = _get_port(service, port_name, is_node_port=True) - nodes = await core_api.list_node() - host = nodes.items[0].status.addresses[0].address - elif service.spec.type == "ClusterIP": - port = _get_port(service, port_name) - if not port_forward_cluster_ip: - with suppress(socket.gaierror): - # Try to resolve the service name. If we are inside the cluster this should succeed. - host = f"{service.metadata.name}.{service.metadata.namespace}" - if await _is_service_available( - host=host, port=port, retries=service_name_resolution_retries - ): - return f"tcp://{host}:{port}" - - # If the service name is unresolvable, we are outside the cluster and we need to port forward the service. - host = "localhost" - - port = await port_forward_service( - service.metadata.name, service.metadata.namespace, port, local_port - ) - return f"tcp://{host}:{port}" - - -def _get_port(service, port_name, is_node_port=False): - """NodePort is a special case when we have to use node_port instead of node""" - [port] = [ - port.port if not is_node_port else port.nodePort - for port in service.spec.ports - if port.name == service.metadata.name or port.name == port_name - ] - return port - - -async def _is_service_available(host, port, retries=20): - for i in range(retries): - try: - return await asyncio.get_event_loop().getaddrinfo(host, port) - except socket.gaierror as e: - if i >= retries - 1: - raise e - await asyncio.sleep(0.5) - - -def _port_in_use(port): - if port is None: - return True - conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - conn.bind(("", port)) - conn.close() - return False - except OSError: - return True - - -def _random_free_port(low, high, retries=20): - conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - while retries: - guess = random.randint(low, high) - try: - conn.bind(("", guess)) - conn.close() - return guess - except OSError: - retries -= 1 - raise ConnectionError("Not able to find a free port.") - - -async def port_forward_service(service_name, namespace, remote_port, local_port=None): - check_dependency("kubectl") - if not local_port: - local_port = _random_free_port(49152, 65535) # IANA suggested range - elif _port_in_use(local_port): - raise ConnectionError("Specified Port already in use.") - kproc = subprocess.Popen( - [ - "kubectl", - "port-forward", - "--address", - "0.0.0.0", - "--namespace", - f"{namespace}", - f"service/{service_name}", - f"{local_port}:{remote_port}", - ], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - finalize(kproc, kproc.kill) - - if await is_comm_open("localhost", local_port, retries=2000): - return local_port - raise ConnectionError("kubectl port forward failed") - - -async def is_comm_open(ip, port, retries=200): - while retries > 0: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - result = sock.connect_ex((ip, port)) - if result == 0: - return True - else: - time.sleep(0.1) - retries -= 1 - return False - - -async def port_forward_dashboard(service_name, namespace): - port = await port_forward_service(service_name, namespace, 8787) - return port - - -async def get_scheduler_address( - service_name, - namespace, - port_name="tcp-comm", - port_forward_cluster_ip=None, - local_port=None, - allow_external=True, -): - async with kubernetes.client.api_client.ApiClient() as api_client: - api = kubernetes.client.CoreV1Api(api_client) - service = await api.read_namespaced_service(service_name, namespace) - if allow_external: - address = await get_external_address_for_scheduler_service( - api, - service, - port_forward_cluster_ip=port_forward_cluster_ip, - port_name=port_name, - local_port=local_port, - ) - else: - address = await get_internal_address_for_scheduler_service( - service, - port_forward_cluster_ip=port_forward_cluster_ip, - port_name=port_name, - local_port=local_port, - ) - return address - - -async def wait_for_scheduler(cluster_name, namespace, timeout=None): - pod_start_time = None - while True: - try: - pod = await Pod.get( - label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={cluster_name}", - field_selector="status.phase=Running", - namespace=namespace, - ) - except kr8s.NotFoundError: - await asyncio.sleep(0.25) - continue - if pod.status.phase == "Running": - if not pod_start_time: - pod_start_time = time.time() - if await pod.ready(): - return - if "containerStatuses" in pod.status: - for container in pod.status.containerStatuses: - if ( - "waiting" in container.state - and container.state.waiting.reason == "CrashLoopBackOff" - and timeout - and pod_start_time + timeout < time.time() - ): - raise CrashLoopBackOffError( - f"Scheduler in CrashLoopBackOff for more than {timeout} seconds." - ) - await asyncio.sleep(0.25) - - -async def wait_for_scheduler_comm(address): - while True: - try: - async with rpc(address) as scheduler_comm: - await scheduler_comm.versions() - except (StreamClosedError, OSError): - await asyncio.sleep(0.1) - continue - break diff --git a/dask_kubernetes/common/objects.py b/dask_kubernetes/common/objects.py deleted file mode 100644 index 1eedeebd7..000000000 --- a/dask_kubernetes/common/objects.py +++ /dev/null @@ -1,386 +0,0 @@ -""" -Convenience functions for creating pod templates. -""" - -import copy -import json -from collections import namedtuple - -from kubernetes import client -from kubernetes.client.configuration import Configuration - -from dask_kubernetes.constants import ( - KUBECLUSTER_CONTAINER_NAME, - MAX_CLUSTER_NAME_LEN, - VALID_CLUSTER_NAME, -) -from dask_kubernetes.exceptions import ValidationError - -_FakeResponse = namedtuple("_FakeResponse", ["data"]) - - -class DummyApiClient(client.ApiClient): - """A Dummy API client that is to be used solely for serialization/deserialization. - - This is to avoid starting a threadpool at initialization and for adapting the - deserialize method to accept a python dictionary instead of a Response-like - interface. - """ - - def __init__(self): - self.configuration = Configuration.get_default_copy() - - def deserialize(self, dict_, klass): - return super().deserialize(_FakeResponse(json.dumps(dict_)), klass) - - -SERIALIZATION_API_CLIENT = DummyApiClient() - - -def _set_k8s_attribute(obj, attribute, value): - """ - Set a specific value on a kubernetes object's attribute - - obj - an object from Kubernetes Python API client - attribute - Should be a Kubernetes API style attribute (with camelCase) - value - Can be anything (string, list, dict, k8s objects) that can be - accepted by the k8s python client - """ - current_value = None - attribute_name = None - # All k8s python client objects have an 'attribute_map' property - # which has as keys python style attribute names (api_client) - # and as values the kubernetes JSON API style attribute names - # (apiClient). We want to allow users to use the JSON API style attribute - # names only. - for python_attribute, json_attribute in obj.attribute_map.items(): - if json_attribute == attribute: - attribute_name = python_attribute - break - else: - raise ValueError( - "Attribute must be one of {}".format(obj.attribute_map.values()) - ) - - if hasattr(obj, attribute_name): - current_value = getattr(obj, attribute_name) - - if current_value is not None: - # This will ensure that current_value is something JSONable, - # so a dict, list, or scalar - current_value = SERIALIZATION_API_CLIENT.sanitize_for_serialization( - current_value - ) - - if isinstance(current_value, dict): - # Deep merge our dictionaries! - setattr(obj, attribute_name, merge_dictionaries(current_value, value)) - elif isinstance(current_value, list): - # Just append lists - setattr(obj, attribute_name, current_value + value) - else: - # Replace everything else - setattr(obj, attribute_name, value) - - -def merge_dictionaries(a, b, path=None, update=True): - """ - Merge two dictionaries recursively. - - From https://stackoverflow.com/a/25270947 - """ - if path is None: - path = [] - for key in b: - if key in a: - if isinstance(a[key], dict) and isinstance(b[key], dict): - merge_dictionaries(a[key], b[key], path + [str(key)]) - elif a[key] == b[key]: - pass # same leaf value - elif isinstance(a[key], list) and isinstance(b[key], list): - for idx, _ in enumerate(b[key]): - a[key][idx] = merge_dictionaries( - a[key][idx], - b[key][idx], - path + [str(key), str(idx)], - update=update, - ) - elif update: - a[key] = b[key] - else: - raise Exception("Conflict at %s" % ".".join(path + [str(key)])) - else: - a[key] = b[key] - return a - - -def make_pod_spec( - image, - labels={}, - threads_per_worker=1, - env={}, - extra_container_config={}, - extra_pod_config={}, - resources=None, - memory_limit=None, - memory_request=None, - cpu_limit=None, - cpu_request=None, - gpu_limit=None, - annotations={}, -): - """ - Create generic pod template from input parameters - - Parameters - ---------- - image : str - Docker image name - labels : dict - Dict of labels to pass to ``V1ObjectMeta`` - threads_per_worker : int - Number of threads per each worker - env : dict - Dict of environment variables to pass to ``V1Container`` - extra_container_config : dict - Extra config attributes to set on the container object - extra_pod_config : dict - Extra config attributes to set on the pod object - resources : str - Resources for task constraints like "GPU=2 MEM=10e9". Resources are applied - separately to each worker process (only relevant when starting multiple - worker processes. Passed to the `--resources` option in ``dask-worker``. - memory_limit : int, float, or str - Bytes of memory per process that the worker can use (applied to both - ``dask-worker --memory-limit`` and ``spec.containers[].resources.limits.memory``). - This can be: - - an integer (bytes), note 0 is a special case for no memory management. - - a float (bytes). Note: fraction of total system memory is not supported by k8s. - - a string (like 5GiB or 5000M). Note: 'GB' is not supported by k8s. - - 'auto' for automatically computing the memory limit. [default: auto] - memory_request : int, float, or str - Like ``memory_limit`` (applied only to ``spec.containers[].resources.requests.memory`` - and ignored by ``dask-worker``). - cpu_limit : float or str - CPU resource limits (applied to ``spec.containers[].resources.limits.cpu``). - cpu_request : float or str - CPU resource requests (applied to ``spec.containers[].resources.requests.cpu``). - gpu_limit : int - GPU resource limits (applied to ``spec.containers[].resources.limits."nvidia.com/gpu"``). - annotations : dict - Dict of annotations passed to ``V1ObjectMeta`` - - Returns - ------- - pod : V1PodSpec - - Examples - -------- - >>> make_pod_spec(image='ghcr.io/dask/dask:latest', memory_limit='4G', memory_request='4G') - """ - args = [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - str(threads_per_worker), - "--death-timeout", - "60", - ] - if memory_limit: - args.extend(["--memory-limit", str(memory_limit)]) - if resources: - args.extend(["--resources", str(resources)]) - pod = client.V1Pod( - metadata=client.V1ObjectMeta(labels=labels, annotations=annotations), - spec=client.V1PodSpec( - restart_policy="Never", - containers=[ - client.V1Container( - name=KUBECLUSTER_CONTAINER_NAME, - image=image, - args=args, - env=[client.V1EnvVar(name=k, value=v) for k, v in env.items()], - ) - ], - ), - ) - - resources = client.V1ResourceRequirements(limits={}, requests={}) - - if cpu_request: - resources.requests["cpu"] = cpu_request - if memory_request: - resources.requests["memory"] = memory_request - - if cpu_limit: - resources.limits["cpu"] = cpu_limit - if gpu_limit: - resources.limits["nvidia.com/gpu"] = gpu_limit - if memory_limit: - resources.limits["memory"] = memory_limit - - pod.spec.containers[0].resources = resources - - for key, value in extra_container_config.items(): - _set_k8s_attribute(pod.spec.containers[0], key, value) - - for key, value in extra_pod_config.items(): - _set_k8s_attribute(pod.spec, key, value) - return pod - - -def make_pod_from_dict(dict_): - containers = dict_.get("spec", {}).get("containers", []) - for i, container in enumerate(containers): - container.setdefault("name", f"dask-{i}") - return SERIALIZATION_API_CLIENT.deserialize(dict_, client.V1Pod) - - -def make_service_from_dict(dict_): - return SERIALIZATION_API_CLIENT.deserialize(dict_, client.V1Service) - - -def make_pdb_from_dict(dict_): - return SERIALIZATION_API_CLIENT.deserialize(dict_, client.V1PodDisruptionBudget) - - -def clean_pod_template( - pod_template, apply_default_affinity="preferred", pod_type="worker" -): - """Normalize pod template""" - pod_template = copy.deepcopy(pod_template) - - # Make sure metadata / labels / env objects exist, so they can be modified - # later without a lot of `is None` checks - if pod_template.metadata is None: - pod_template.metadata = client.V1ObjectMeta() - if pod_template.metadata.labels is None: - pod_template.metadata.labels = {} - - if pod_template.spec.containers[0].env is None: - pod_template.spec.containers[0].env = [] - - # add default tolerations - tolerations = [ - client.V1Toleration( - key="k8s.dask.org/dedicated", - operator="Equal", - value=pod_type, - effect="NoSchedule", - ), - # GKE currently does not permit creating taints on a node pool - # with a `/` in the key field - client.V1Toleration( - key="k8s.dask.org_dedicated", - operator="Equal", - value=pod_type, - effect="NoSchedule", - ), - ] - - if pod_template.spec.tolerations is None: - pod_template.spec.tolerations = tolerations - else: - pod_template.spec.tolerations.extend(tolerations) - - # add default node affinity to k8s.dask.org/node-purpose=worker - if apply_default_affinity != "none": - # for readability - affinity = pod_template.spec.affinity - - if affinity is None: - affinity = client.V1Affinity() - if affinity.node_affinity is None: - affinity.node_affinity = client.V1NodeAffinity() - - # a common object for both a preferred and a required node affinity - node_selector_term = client.V1NodeSelectorTerm( - match_expressions=[ - client.V1NodeSelectorRequirement( - key="k8s.dask.org/node-purpose", operator="In", values=[pod_type] - ) - ] - ) - - if apply_default_affinity == "required": - if ( - affinity.node_affinity.required_during_scheduling_ignored_during_execution - is None - ): - affinity.node_affinity.required_during_scheduling_ignored_during_execution = client.V1NodeSelector( - node_selector_terms=[] - ) - affinity.node_affinity.required_during_scheduling_ignored_during_execution.node_selector_terms.append( - node_selector_term - ) - elif apply_default_affinity == "preferred": - if ( - affinity.node_affinity.preferred_during_scheduling_ignored_during_execution - is None - ): - affinity.node_affinity.preferred_during_scheduling_ignored_during_execution = ( - [] - ) - preferred_scheduling_terms = [ - client.V1PreferredSchedulingTerm( - preference=node_selector_term, weight=100 - ) - ] - affinity.node_affinity.preferred_during_scheduling_ignored_during_execution.extend( - preferred_scheduling_terms - ) - else: - raise ValueError( - 'Attribute apply_default_affinity must be one of "none", "preferred", or "required".' - ) - pod_template.spec.affinity = affinity - - return pod_template - - -def clean_service_template(service_template): - """Normalize service template and check for type errors""" - - service_template = copy.deepcopy(service_template) - - # Make sure metadata / labels objects exist, so they can be modified - # later without a lot of `is None` checks - if service_template.metadata is None: - service_template.metadata = client.V1ObjectMeta() - if service_template.metadata.labels is None: - service_template.metadata.labels = {} - - return service_template - - -def clean_pdb_template(pdb_template): - """Normalize pdb template and check for type errors""" - - pdb_template = copy.deepcopy(pdb_template) - - # Make sure metadata / labels objects exist, so they can be modified - # later without a lot of `is None` checks - if pdb_template.metadata is None: - pdb_template.metadata = client.V1ObjectMeta() - if pdb_template.metadata.labels is None: - pdb_template.metadata.labels = {} - if pdb_template.spec.selector is None: - pdb_template.spec.selector = client.V1LabelSelector() - - return pdb_template - - -def validate_cluster_name(cluster_name: str) -> None: - """Raise exception if cluster name is too long and/or has invalid characters""" - if not VALID_CLUSTER_NAME.match(cluster_name): - raise ValidationError( - message=( - f"The DaskCluster {cluster_name} is invalid: a lowercase RFC 1123 subdomain must " - "consist of lower case alphanumeric characters, '-' or '.', and must start " - "and end with an alphanumeric character. DaskCluster name must also be under " - f"{MAX_CLUSTER_NAME_LEN} characters." - ) - ) diff --git a/dask_kubernetes/common/tests/test_kind.py b/dask_kubernetes/common/tests/test_kind.py deleted file mode 100644 index 2af77bfe6..000000000 --- a/dask_kubernetes/common/tests/test_kind.py +++ /dev/null @@ -1,25 +0,0 @@ -from subprocess import check_output - -import kubernetes_asyncio as kubernetes -import pytest - -from dask_kubernetes.common.auth import ClusterAuth -from dask_kubernetes.common.utils import get_current_namespace - - -def test_config_detection(k8s_cluster): - assert b"pytest-kind" in check_output(["kubectl", "config", "current-context"]) - - -@pytest.mark.anyio -@pytest.mark.xfail(reason="Has asyncio issues on CI") -async def test_auth(k8s_cluster): - await ClusterAuth.load_first(ClusterAuth.DEFAULT) - core_v1_api = kubernetes.client.CoreV1Api() - request = await core_v1_api.list_namespace() - assert get_current_namespace() in [ - namespace.metadata.name for namespace in request.items - ] - - request = await core_v1_api.list_node() - assert "pytest-kind-control-plane" in [node.metadata.name for node in request.items] diff --git a/dask_kubernetes/common/tests/test_objects.py b/dask_kubernetes/common/tests/test_objects.py deleted file mode 100644 index 23e318961..000000000 --- a/dask_kubernetes/common/tests/test_objects.py +++ /dev/null @@ -1,88 +0,0 @@ -import pytest - -from dask_kubernetes.common.objects import make_pod_from_dict, validate_cluster_name -from dask_kubernetes.constants import KUBECLUSTER_CONTAINER_NAME, MAX_CLUSTER_NAME_LEN -from dask_kubernetes.exceptions import ValidationError - - -def test_make_pod_from_dict(): - d = { - "kind": "Pod", - "metadata": {"labels": {"app": "dask", "dask.org/component": "dask-worker"}}, - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - ], - "image": "image-name", - "name": KUBECLUSTER_CONTAINER_NAME, - "securityContext": { - "capabilities": {"add": ["SYS_ADMIN"]}, - "privileged": True, - }, - } - ], - "restartPolicy": "Never", - }, - } - - pod = make_pod_from_dict(d) - - assert pod.spec.restart_policy == "Never" - assert pod.spec.containers[0].security_context.privileged - assert pod.spec.containers[0].security_context.capabilities.add == ["SYS_ADMIN"] - - -def test_make_pod_from_dict_default_container_name(): - d = { - "kind": "Pod", - "metadata": {"labels": {"app": "dask", "dask.org/component": "dask-worker"}}, - "spec": { - "containers": [ - { - "args": [ - "dask-worker", - "$(DASK_SCHEDULER_ADDRESS)", - "--nthreads", - "1", - ], - "image": "image-name", - "securityContext": { - "capabilities": {"add": ["SYS_ADMIN"]}, - "privileged": True, - }, - }, - {"image": "image-name2", "name": "sidecar"}, - {"image": "image-name3"}, - ], - "restartPolicy": "Never", - }, - } - - pod = make_pod_from_dict(d) - assert pod.spec.containers[0].name == "dask-0" - assert pod.spec.containers[1].name == "sidecar" - assert pod.spec.containers[2].name == "dask-2" - - -@pytest.mark.parametrize( - "cluster_name", - [ - (MAX_CLUSTER_NAME_LEN + 1) * "a", - "invalid.chars.in.name", - ], -) -def test_validate_cluster_name_raises_on_invalid_name( - cluster_name, -): - - with pytest.raises(ValidationError): - validate_cluster_name(cluster_name) - - -def test_validate_cluster_name_success_on_valid_name(): - assert validate_cluster_name("valid-cluster-name-123") is None diff --git a/dask_kubernetes/common/utils.py b/dask_kubernetes/common/utils.py deleted file mode 100644 index 8ad64f0bd..000000000 --- a/dask_kubernetes/common/utils.py +++ /dev/null @@ -1,47 +0,0 @@ -"""Utility functions.""" -import os -import shutil -import string - -import kubernetes_asyncio as kubernetes - - -def format_labels(labels): - """Convert a dictionary of labels into a comma separated string""" - if labels: - return ",".join(["{}={}".format(k, v) for k, v in labels.items()]) - else: - return "" - - -def escape(s): - valid_characters = string.ascii_letters + string.digits + "-" - return "".join(c for c in s if c in valid_characters).lower() - - -def get_current_namespace(): - """ - Get current namespace if running in a k8s cluster - - If not in a k8s cluster with service accounts enabled, default to - 'default' - - Taken from https://github.com/jupyterhub/kubespawner/blob/master/kubespawner/spawner.py#L125 - """ - ns_path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" - if os.path.exists(ns_path): - with open(ns_path) as f: - return f.read().strip() - try: - _, active_context = kubernetes.config.list_kube_config_contexts() - return active_context["context"]["namespace"] - except KeyError: - return "default" - - -def check_dependency(dependency): - if shutil.which(dependency) is None: - raise RuntimeError( - f"Missing dependency {dependency}. " - f"Please install {dependency} following the instructions for your OS. " - ) diff --git a/dask_kubernetes/conftest.py b/dask_kubernetes/conftest.py index aadaa2aac..52a036f59 100644 --- a/dask_kubernetes/conftest.py +++ b/dask_kubernetes/conftest.py @@ -1,6 +1,7 @@ import logging import os import pathlib +import shutil import subprocess import sys import tempfile @@ -10,10 +11,17 @@ from kopf.testing import KopfRunner from pytest_kind.cluster import KindCluster -from dask_kubernetes.common.utils import check_dependency - DIR = pathlib.Path(__file__).parent.absolute() + +def check_dependency(dependency): + if shutil.which(dependency) is None: + raise RuntimeError( + f"Missing dependency {dependency}. " + f"Please install {dependency} following the instructions for your OS. " + ) + + check_dependency("helm") check_dependency("kubectl") check_dependency("docker") diff --git a/dask_kubernetes/experimental/__init__.py b/dask_kubernetes/experimental/__init__.py deleted file mode 100644 index 43aafdb0e..000000000 --- a/dask_kubernetes/experimental/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -from importlib import import_module -from warnings import warn - - -def __getattr__(name): - no_longer_experimental = [ - "KubeCluster", - "make_cluster_spec", - "make_scheduler_spec", - "make_worker_spec", - "discover", - ] - if name in no_longer_experimental: - warn( - f"Yay {name} is no longer experimental 🎉. " - "You can import it directly from dask_kubernetes or explicitly from dask_kubernetes.operator", - DeprecationWarning, - stacklevel=2, - ) - new_module = import_module("dask_kubernetes.operator") - return getattr(new_module, name) - - raise AttributeError(f"module {__name__} has no attribute {name}") diff --git a/dask_kubernetes/helm/__init__.py b/dask_kubernetes/helm/__init__.py deleted file mode 100644 index 761caccaa..000000000 --- a/dask_kubernetes/helm/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .helmcluster import HelmCluster, discover diff --git a/dask_kubernetes/helm/helmcluster.py b/dask_kubernetes/helm/helmcluster.py deleted file mode 100644 index b3390b664..000000000 --- a/dask_kubernetes/helm/helmcluster.py +++ /dev/null @@ -1,335 +0,0 @@ -import asyncio -import json -import subprocess -import warnings -from contextlib import suppress - -import aiohttp -import kubernetes_asyncio as kubernetes -from distributed.core import Status, rpc -from distributed.deploy import Cluster -from distributed.utils import Log, Logs - -from ..common.auth import ClusterAuth -from ..common.networking import get_external_address_for_scheduler_service -from ..common.utils import ( - check_dependency, - get_current_namespace, -) - - -class HelmCluster(Cluster): - """Connect to a Dask cluster deployed via the Helm Chart. - - This cluster manager connects to an existing Dask deployment that was - created by the Dask Helm Chart. Enabling you to perform basic cluster actions - such as scaling and log retrieval. - - Parameters - ---------- - release_name: str - Name of the helm release to connect to. - namespace: str (optional) - Namespace in which to launch the workers. - Defaults to current namespace if available or "default" - port_forward_cluster_ip: bool (optional) - If the chart uses ClusterIP type services, forward the ports locally. - If you are using ``HelmCluster`` from the Jupyter session that was installed - by the helm chart this should be ``False``. If you are running it locally it should - be the port you are forwarding to ````. - auth: List[ClusterAuth] (optional) - Configuration methods to attempt in order. Defaults to - ``[InCluster(), KubeConfig()]``. - scheduler_name: str (optional) - Name of the Dask scheduler deployment in the current release. - Defaults to "scheduler". - worker_name: str (optional) - Name of the Dask worker deployment in the current release. - Defaults to "worker". - node_host: str (optional) - A node address. Can be provided in case scheduler service type is - ``NodePort`` and you want to manually specify which node to connect to. - node_port: int (optional) - A node address. Can be provided in case scheduler service type is - ``NodePort`` and you want to manually specify which port to connect to. - **kwargs: dict - Additional keyword arguments to pass to Cluster. - - Examples - -------- - >>> from dask_kubernetes import HelmCluster - >>> cluster = HelmCluster(release_name="myhelmrelease") - - You can then resize the cluster with the scale method - - >>> cluster.scale(10) - - You can pass this cluster directly to a Dask client - - >>> from dask.distributed import Client - >>> client = Client(cluster) - - You can also access cluster logs - - >>> cluster.get_logs() - - See Also - -------- - HelmCluster.scale - HelmCluster.logs - """ - - def __init__( - self, - release_name=None, - auth=ClusterAuth.DEFAULT, - namespace=None, - port_forward_cluster_ip=False, - scheduler_name="scheduler", - worker_name="worker", - node_host=None, - node_port=None, - name=None, - **kwargs, - ): - warnings.warn( - "HelmCluster is going away. " - "Please migrate to the new operator based implementation " - "https://kubernetes.dask.org/en/latest/kubecluster_migrating.html. ", - DeprecationWarning, - stacklevel=2, - ) - self.release_name = release_name - self.namespace = namespace or get_current_namespace() - if name is None: - name = self.release_name + "." + self.namespace - check_dependency("helm") - check_dependency("kubectl") - status = subprocess.run( - ["helm", "-n", self.namespace, "status", self.release_name], - capture_output=True, - encoding="utf-8", - ) - if status.returncode != 0: - raise RuntimeError(f"No such helm release {self.release_name}.") - self.auth = auth - self.core_api = None - self.scheduler_comm = None - self.port_forward_cluster_ip = port_forward_cluster_ip - self._supports_scaling = True - self.scheduler_name = scheduler_name - self.worker_name = worker_name - self.node_host = node_host - self.node_port = node_port - - super().__init__(name=name, **kwargs) - if not self.asynchronous: - self._loop_runner.start() - self.sync(self._start) - - async def _start(self): - await ClusterAuth.load_first(self.auth) - self.core_api = kubernetes.client.CoreV1Api() - self.apps_api = kubernetes.client.AppsV1Api() - self.scheduler_comm = rpc(await self._get_scheduler_address()) - await super()._start() - - async def _get_scheduler_address(self): - # Get the chart name - chart = subprocess.check_output( - [ - "helm", - "-n", - self.namespace, - "list", - "-f", - self.release_name, - "--output", - "json", - ], - encoding="utf-8", - ) - chart = json.loads(chart)[0]["chart"] - # extract name from {{.Chart.Name }}-{{ .Chart.Version }} - chart_name = "-".join(chart.split("-")[:-1]) - # Follow the spec in the dask/dask helm chart - self.chart_name = ( - f"{chart_name}-" if chart_name not in self.release_name else "" - ) - - service_name = f"{self.release_name}-{self.chart_name}{self.scheduler_name}" - service = await self.core_api.read_namespaced_service( - service_name, self.namespace - ) - address = await get_external_address_for_scheduler_service( - self.core_api, service, port_forward_cluster_ip=self.port_forward_cluster_ip - ) - if address is None: - raise RuntimeError("Unable to determine scheduler address.") - return address - - async def _wait_for_workers(self): - while True: - n_workers = len(self.scheduler_info["workers"]) - deployments = await self.apps_api.list_namespaced_deployment( - namespace=self.namespace - ) - deployment_replicas = 0 - for deployment in deployments.items: - if ( - f"{self.release_name}-{self.chart_name}{self.worker_name}" - in deployment.metadata.name - ): - deployment_replicas += deployment.spec.replicas - if n_workers == deployment_replicas: - return - else: - await asyncio.sleep(0.2) - - def get_logs(self): - """Get logs for Dask scheduler and workers. - - Examples - -------- - >>> cluster.get_logs() - {'testdask-scheduler-5c8ffb6b7b-sjgrg': ..., - 'testdask-worker-64c8b78cc-992z8': ..., - 'testdask-worker-64c8b78cc-hzpdc': ..., - 'testdask-worker-64c8b78cc-wbk4f': ...} - - Each log will be a string of all logs for that container. To view - it is recommeded that you print each log. - - >>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"]) - ... - distributed.scheduler - INFO - ----------------------------------------------- - distributed.scheduler - INFO - Clear task state - distributed.scheduler - INFO - Scheduler at: tcp://10.1.6.131:8786 - distributed.scheduler - INFO - dashboard at: :8787 - ... - """ - return self.sync(self._get_logs) - - async def _get_logs(self): - logs = Logs() - - pods = await self.core_api.list_namespaced_pod( - namespace=self.namespace, - label_selector=f"release={self.release_name},app=dask", - ) - - for pod in pods.items: - if "scheduler" in pod.metadata.name or "worker" in pod.metadata.name: - try: - if pod.status.phase != "Running": - raise ValueError( - f"Cannot get logs for pod with status {pod.status.phase}.", - ) - log = Log( - await self.core_api.read_namespaced_pod_log( - pod.metadata.name, pod.metadata.namespace - ) - ) - except (ValueError, kubernetes.client.exceptions.ApiException): - log = Log(f"Cannot find logs. Pod is {pod.status.phase}.") - logs[pod.metadata.name] = log - - return logs - - def __await__(self): - async def _(): - if self.status == Status.created: - await self._start() - elif self.status == Status.running: - await self._wait_for_workers() - return self - - return _().__await__() - - def scale(self, n_workers, worker_group=None): - """Scale cluster to n workers. - - This sets the Dask worker deployment size to the requested number. - It also allows you to set the worker deployment size of another worker group. - Workers will not be terminated gracefull so be sure to only scale down - when all futures have been retrieved by the client and the cluster is idle. - - Examples - -------- - - >>> cluster - HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=4, threads=241, memory=2.95 TiB) - >>> cluster.scale(4) - >>> cluster - HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=5, threads=321, memory=3.94 TiB) - >>> cluster.scale(5, worker_group="high-mem-workers") - >>> cluster - HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=9, threads=325, memory=3.94 TiB) - """ - return self.sync(self._scale, n_workers, worker_group=worker_group) - - async def _scale(self, n_workers, worker_group=None): - deployment = f"{self.release_name}-{self.chart_name}{self.worker_name}" - if worker_group: - deployment += f"-{worker_group}" - try: - await self.apps_api.patch_namespaced_deployment( - name=deployment, - namespace=self.namespace, - body={ - "spec": { - "replicas": n_workers, - } - }, - ) - except kubernetes.client.exceptions.ApiException as e: - if worker_group: - raise ValueError(f"No such worker group {worker_group}") from e - raise e - - def adapt(self, *args, **kwargs): - """Turn on adaptivity (Not recommended).""" - raise NotImplementedError( - "It is not recommended to run ``HelmCluster`` in adaptive mode. " - "When scaling down workers the decision on which worker to remove is left to Kubernetes, which " - "will not necessarily remove the same worker that Dask would choose. This may result in lost futures and " - "recalculation. It is recommended to manage scaling yourself with the ``HelmCluster.scale`` method." - ) - - async def _adapt(self, *args, **kwargs): - return super().adapt(*args, **kwargs) - - async def _close(self, *args, **kwargs): - """Close the cluster.""" - warnings.warn( - "It is not possible to close a HelmCluster object. \n" - "Please delete the cluster via the helm CLI: \n\n" - f" $ helm delete --namespace {self.namespace} {self.release_name}" - ) - - @classmethod - def from_name(cls, name): - release_name, namespace = name.split(".") - return cls(release_name=release_name, namespace=namespace) - - -async def discover( - auth=ClusterAuth.DEFAULT, - namespace=None, -): - await ClusterAuth.load_first(auth) - async with kubernetes.client.api_client.ApiClient() as api: - core_api = kubernetes.client.CoreV1Api(api) - namespace = namespace or get_current_namespace() - try: - pods = await core_api.list_pod_for_all_namespaces( - label_selector="app=dask,component=scheduler", - ) - for pod in pods.items: - with suppress(KeyError): - yield ( - pod.metadata.labels["release"] + "." + pod.metadata.namespace, - HelmCluster, - ) - except aiohttp.client_exceptions.ClientConnectorError: - warnings.warn("Unable to connect to Kubernetes cluster") diff --git a/dask_kubernetes/helm/tests/resources/values.yaml b/dask_kubernetes/helm/tests/resources/values.yaml deleted file mode 100644 index b742b060e..000000000 --- a/dask_kubernetes/helm/tests/resources/values.yaml +++ /dev/null @@ -1,20 +0,0 @@ -webUI: - servicePort: 8087 - -jupyter: - enabled: false - -scheduler: - serviceType: "ClusterIP" - image: - repository: "dask-kubernetes" # Container image repository. - tag: "dev" # Container image tag. - -worker: - image: - repository: "dask-kubernetes" # Container image repository. - tag: "dev" # Container image tag. - -additional_worker_groups: - - name: foo - replicas: 1 diff --git a/dask_kubernetes/helm/tests/test_helm.py b/dask_kubernetes/helm/tests/test_helm.py deleted file mode 100644 index 0ac9e824a..000000000 --- a/dask_kubernetes/helm/tests/test_helm.py +++ /dev/null @@ -1,241 +0,0 @@ -import os.path -import subprocess - -import dask.config -import pytest -from dask_ctl.discovery import ( - discover_cluster_names, - discover_clusters, - list_discovery_methods, -) -from distributed import Client -from distributed.core import Status - -############### -# Fixtures -## - - -@pytest.fixture(scope="session") -def chart_repo(): - repo_name = "dask" - repo_url = "https://helm.dask.org/" - output = subprocess.run(["helm", "repo", "list"], capture_output=True) - repo_lines = output.stdout.decode().splitlines()[1:] # First line is header - dask_repo_present = False - for repo_line in repo_lines: - repo, url = repo_line.replace(" ", "").split("\t") - if repo == repo_name: - if url.rstrip("/") != repo_url.rstrip("/"): - raise ValueError(f"Dask repo already present with different URL {url}") - dask_repo_present = True - if not dask_repo_present: - subprocess.run( - ["helm", "repo", "add", repo_name, repo_url], - check=True, - ) - subprocess.run(["helm", "repo", "update"], check=True) - return repo_name - - -@pytest.fixture(scope="session") -def chart_name(chart_repo): - chart = "dask" - return f"{chart_repo}/{chart}" - - -@pytest.fixture(scope="session") -def config_path(): - return os.path.join(os.path.dirname(__file__), "resources", "values.yaml") - - -@pytest.fixture(scope="session") -def release_name(): - return "testrelease" - - -@pytest.fixture(scope="session") -def test_namespace(): - return "testdaskns" - - -@pytest.fixture(scope="session") # Creating this fixture is slow so we should reuse it. -def release(k8s_cluster, chart_name, test_namespace, release_name, config_path): - subprocess.run( - [ - "helm", - "install", - "--create-namespace", - "-n", - test_namespace, - release_name, - chart_name, - "--wait", - "-f", - config_path, - ], - check=True, - ) - # Scale back the additional workers group for now - subprocess.run( - [ - "kubectl", - "scale", - "-n", - test_namespace, - "deployment", - f"{release_name}-dask-worker-foo", - "--replicas=0", - ], - check=True, - ) - yield release_name - subprocess.run(["helm", "delete", "-n", test_namespace, release_name], check=True) - - -@pytest.fixture -async def cluster(k8s_cluster, release, test_namespace): - from dask_kubernetes import HelmCluster - - tries = 5 - while True: - try: - cluster = await HelmCluster( - release_name=release, namespace=test_namespace, asynchronous=True - ) - break - except ConnectionError as e: - if tries > 0: - tries -= 1 - else: - raise e - - yield cluster - await cluster.close() - - -@pytest.fixture -def sync_cluster(k8s_cluster, release, test_namespace): - from dask_kubernetes import HelmCluster - - with HelmCluster( - release_name=release, namespace=test_namespace, asynchronous=False - ) as cluster: - yield cluster - - -############### -# Tests -## - - -def test_import(): - from distributed.deploy import Cluster - - from dask_kubernetes import HelmCluster - - assert issubclass(HelmCluster, Cluster) - - -def test_loop(k8s_cluster, release, test_namespace): - from dask_kubernetes import HelmCluster - - with Client(nthreads=[]) as client, HelmCluster( - release_name=release, namespace=test_namespace, loop=client.loop - ) as cluster: - assert cluster.loop is client.loop - - -def test_raises_on_non_existant_release(k8s_cluster): - from dask_kubernetes import HelmCluster - - with pytest.raises(RuntimeError): - HelmCluster(release_name="nosuchrelease", namespace="default") - - -@pytest.mark.anyio -async def test_create_helm_cluster(cluster, release_name): - assert cluster.status == Status.running - assert cluster.release_name == release_name - assert "id" in cluster.scheduler_info - - -def test_create_sync_helm_cluster(sync_cluster, release_name): - cluster = sync_cluster - assert cluster.status == Status.running - assert cluster.release_name == release_name - assert "id" in cluster.scheduler_info - - -@pytest.mark.anyio -async def test_scale_cluster(cluster): - # Scale up - await cluster.scale(4) - await cluster # Wait for workers - assert len(cluster.scheduler_info["workers"]) == 4 - - # Scale down - await cluster.scale(3) - await cluster # Wait for workers - assert len(cluster.scheduler_info["workers"]) == 3 - - # Scale up an additional worker group 'foo' - await cluster.scale(2, worker_group="foo") - await cluster # Wait for workers - assert len(cluster.scheduler_info["workers"]) == 5 - - # Scale down an additional worker group 'foo' - await cluster.scale(0, worker_group="foo") - await cluster # Wait for workers - assert len(cluster.scheduler_info["workers"]) == 3 - - # Scaling a non-existent eorker group 'bar' raises a ValueError - import kubernetes_asyncio as kubernetes - - with pytest.raises((ValueError, kubernetes.client.exceptions.ApiException)): - await cluster.scale(2, worker_group="bar") - - -@pytest.mark.anyio -async def test_logs(cluster): - from distributed.utils import Logs - - logs = await cluster.get_logs() - - assert isinstance(logs, Logs) - assert any(["scheduler" in log for log in logs]) - assert any(["worker" in log for log in logs]) - - [scheduler_logs] = [logs[log] for log in logs if "scheduler" in log] - assert "Scheduler at:" in scheduler_logs - - -@pytest.mark.anyio -async def test_adaptivity_warning(cluster): - with pytest.raises(NotImplementedError): - await cluster.adapt(minimum=3, maximum=3) - - -@pytest.mark.anyio -@pytest.mark.xfail(reason="Has asyncio issues on CI") -async def test_discovery(release, release_name): - discovery = "helmcluster" - methods = list_discovery_methods() - - assert discovery in methods - - methods.pop(discovery) - dask.config.set({"ctl.disable-discovery": methods}) - - clusters_names = [ - cluster async for cluster in discover_cluster_names(discovery=discovery) - ] - assert len(clusters_names) == 1 - - clusters = [cluster async for cluster in discover_clusters(discovery=discovery)] - assert len(clusters) == 1 - - [cluster] = clusters - assert cluster.status == Status.running - assert cluster.release_name == release_name - assert "id" in cluster.scheduler_info diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 07f33cbda..320ea7579 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -15,7 +15,6 @@ from distributed.protocol.pickle import dumps from kr8s.asyncio.objects import Deployment, Pod, Service -from dask_kubernetes.common.objects import validate_cluster_name from dask_kubernetes.constants import SCHEDULER_NAME_TEMPLATE from dask_kubernetes.exceptions import ValidationError from dask_kubernetes.operator._objects import ( @@ -25,6 +24,7 @@ DaskWorkerGroup, ) from dask_kubernetes.operator.networking import get_scheduler_address +from dask_kubernetes.operator.validation import validate_cluster_name _ANNOTATION_NAMESPACES_TO_IGNORE = ( "kopf.zalando.org", diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 5b2b540d0..80aa22249 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -29,7 +29,6 @@ from rich.table import Table from tornado.ioloop import IOLoop -from dask_kubernetes.common.objects import validate_cluster_name from dask_kubernetes.exceptions import CrashLoopBackOffError, SchedulerStartupError from dask_kubernetes.operator._objects import ( DaskAutoscaler, @@ -41,6 +40,7 @@ wait_for_scheduler, wait_for_scheduler_comm, ) +from dask_kubernetes.operator.validation import validate_cluster_name logger = logging.getLogger(__name__) diff --git a/dask_kubernetes/operator/validation.py b/dask_kubernetes/operator/validation.py new file mode 100644 index 000000000..efa5ac06e --- /dev/null +++ b/dask_kubernetes/operator/validation.py @@ -0,0 +1,17 @@ +"""Validate resources""" + +from dask_kubernetes.constants import MAX_CLUSTER_NAME_LEN, VALID_CLUSTER_NAME +from dask_kubernetes.exceptions import ValidationError + + +def validate_cluster_name(cluster_name: str) -> None: + """Raise exception if cluster name is too long and/or has invalid characters""" + if not VALID_CLUSTER_NAME.match(cluster_name): + raise ValidationError( + message=( + f"The DaskCluster {cluster_name} is invalid: a lowercase RFC 1123 subdomain must " + "consist of lower case alphanumeric characters, '-' or '.', and must start " + "and end with an alphanumeric character. DaskCluster name must also be under " + f"{MAX_CLUSTER_NAME_LEN} characters." + ) + ) diff --git a/doc/source/helmcluster.rst b/doc/source/helmcluster.rst deleted file mode 100644 index d44ea1c13..000000000 --- a/doc/source/helmcluster.rst +++ /dev/null @@ -1,69 +0,0 @@ -.. _helmcluster: - -HelmCluster -=========== - -:doc:`helmcluster` is for managing an existing Dask cluster which has been deployed using -`Helm `_. - -Quickstart ----------- - -.. currentmodule:: dask_kubernetes - -First you must install the `Dask Helm chart `_ with ``helm`` -and have the cluster running. - -.. code-block:: bash - - helm repo add dask https://helm.dask.org - helm repo update - - helm install myrelease dask/dask - -You can then create a :class:`HelmCluster` object in Python to manage scaling the cluster and retrieve logs. - -.. code-block:: python - - from dask_kubernetes import HelmCluster - - cluster = HelmCluster(release_name="myrelease") - cluster.scale(10) # specify number of workers explicitly - -With this cluster object you can conveniently connect a Dask :class:`dask.distributed.Client` object to the cluster -and perform your work. Provided you have API access to Kubernetes and can run the ``kubectl`` command then -connectivity to the Dask cluster is handled automatically for you via services or port forwarding. - -.. code-block:: python - - # Example usage - from dask.distributed import Client - import dask.array as da - - # Connect Dask to the cluster - client = Client(cluster) - - # Create a large array and calculate the mean - array = da.ones((1000, 1000, 1000)) - print(array.mean().compute()) # Should print 1.0 - -For more information see the :class:`HelmCluster` API reference. - -.. warning:: - It is not possible to use ``HelmCluster`` from the Jupyter session - which is deployed as part of the Helm Chart without first copying your - ``~/.kube/config`` file to that Jupyter session. - -API ---- - -.. currentmodule:: dask_kubernetes - -.. autosummary:: - HelmCluster - HelmCluster.scale - HelmCluster.adapt - HelmCluster.logs - -.. autoclass:: HelmCluster - :members: diff --git a/doc/source/index.rst b/doc/source/index.rst index 3de09e815..3e99bcdab 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -28,7 +28,7 @@ Welcome to the documentation for ``dask-kubernetes``. Dask on Kubernetes new users should head to the `Dask documentation page on Kubernetes `_. -The package ``dask-kubernetes`` provides cluster managers for Kubernetes. ``dask-kubernetes`` is one of many options for deploying Dask clusters, see `Deploying Dask `_ in the Dask documentation for an overview of additional options. +The package ``dask-kubernetes`` provides a Dask operator for Kubernetes. ``dask-kubernetes`` is one of many options for deploying Dask clusters, see `Deploying Dask `_ in the Dask documentation for an overview of additional options. KubeCluster ----------- @@ -47,20 +47,6 @@ Kubernetes resources. It is designed to dynamically launch ad-hoc deployments. cluster = KubeCluster(name="my-dask-cluster", image='ghcr.io/dask/dask:latest') cluster.scale(10) -HelmCluster ------------ - -:class:`HelmCluster` is for managing an existing Dask cluster which has been deployed using -`Helm `_. You must have already installed the `Dask Helm chart `_ -and have the cluster running. You can then use it to manage scaling and retrieve logs. - -.. code-block:: python - - from dask_kubernetes import HelmCluster - - cluster = HelmCluster(release_name="myrelease") - cluster.scale(10) - .. toctree:: :maxdepth: 2 :hidden: @@ -68,14 +54,6 @@ and have the cluster running. You can then use it to manage scaling and retrieve installing -.. toctree:: - :maxdepth: 2 - :hidden: - :caption: Cluster Managers - - operator_kubecluster - helmcluster - .. toctree:: :maxdepth: 2 :hidden: @@ -83,6 +61,7 @@ and have the cluster running. You can then use it to manage scaling and retrieve operator operator_installation + operator_kubecluster operator_resources operator_extending operator_troubleshooting @@ -92,7 +71,6 @@ and have the cluster running. You can then use it to manage scaling and retrieve :hidden: :caption: Classic - kubecluster kubecluster_migrating .. toctree:: diff --git a/doc/source/kubecluster.rst b/doc/source/kubecluster.rst deleted file mode 100644 index 3495c55d9..000000000 --- a/doc/source/kubecluster.rst +++ /dev/null @@ -1,342 +0,0 @@ -.. _kubecluster: - -KubeCluster (classic) -===================== - -.. Warning:: - - This implementation of ``KubeCluster`` is being retired and we recommend :doc:`migrating to the operator based implementation `. - - -:class:`KubeCluster` deploys Dask clusters on Kubernetes clusters using native -Kubernetes APIs. It is designed to dynamically launch ad-hoc deployments. - -Quickstart ----------- - -.. currentmodule:: dask_kubernetes - -To launch a Dask cluster on Kubernetes with :class:`KubeCluster` you need to first configure your worker -pod specification. Then create a cluster with that spec. - -.. code-block:: python - - from dask_kubernetes.classic import KubeCluster, make_pod_spec - - pod_spec = make_pod_spec(image='ghcr.io/dask/dask:latest', - memory_limit='4G', memory_request='4G', - cpu_limit=1, cpu_request=1) - - cluster = KubeCluster(pod_spec) - - cluster.scale(10) # specify number of workers explicitly - cluster.adapt(minimum=1, maximum=100) # or dynamically scale based on current workload - -You can then connect a Dask :class:`dask.distributed.Client` object to the cluster and perform your work. - -.. code-block:: python - - # Example usage - from dask.distributed import Client - import dask.array as da - - # Connect Dask to the cluster - client = Client(cluster) - - # Create a large array and calculate the mean - array = da.ones((1000, 1000, 1000)) - print(array.mean().compute()) # Should print 1.0 - -You can alternatively define your worker specification via YAML by creating a `pod manifest `_ -that will be used as a template. - -.. code-block:: yaml - - # worker-spec.yml - - kind: Pod - metadata: - labels: - foo: bar - spec: - restartPolicy: Never - containers: - - image: ghcr.io/dask/dask:latest - imagePullPolicy: IfNotPresent - args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60'] - name: dask-worker - env: - - name: EXTRA_PIP_PACKAGES - value: git+https://github.com/dask/distributed - resources: - limits: - cpu: "2" - memory: 6G - requests: - cpu: "2" - memory: 6G - -.. code-block:: python - - from dask_kubernetes.classic import KubeCluster - - cluster = KubeCluster('worker-spec.yml') - cluster.scale(10) - -For more information see the :class:`KubeCluster` API reference. - -Best Practices --------------- - -1. Your worker pod image should have a similar environment to your local - environment, including versions of Python, dask, cloudpickle, and any - libraries that you may wish to use (like NumPy, Pandas, or Scikit-Learn). - See :py:class:`dask_kubernetes.classic.KubeCluster` docstring for guidance on how - to check and modify this. - -2. Your Kubernetes resource limits and requests should match the - ``--memory-limit`` and ``--nthreads`` parameters given to the - ``dask-worker`` command. Otherwise your workers may get killed by - Kubernetes as they pack into the same node and overwhelm that nodes' - available memory, leading to ``KilledWorker`` errors. - -3. We recommend adding the ``--death-timeout, '60'`` arguments and the - ``restartPolicy: Never`` attribute to your worker specification. - This ensures that these pods will clean themselves up if your Python - process disappears unexpectedly. - -GPUs ----- - -Because ``dask-kubernetes`` uses standard kubernetes pod specifications, we can -use `kubernetes device plugins -`_ -and add resource limits defining the number of GPUs per pod/worker. -Additionally, we can also use tools like `dask-cuda -`_ for optimized Dask/GPU interactions. - -.. code-block:: yaml - - kind: Pod - metadata: - labels: - foo: bar - spec: - restartPolicy: Never - containers: - - image: nvcr.io/nvidia/rapidsai/rapidsai-core:23.04-cuda11.8-runtime-ubuntu22.04-py3.10 - imagePullPolicy: IfNotPresent - args: [dask-cuda-worker, $(DASK_SCHEDULER_ADDRESS), --rmm-pool-size, 10GB] - name: dask-cuda - resources: - limits: - cpu: "2" - memory: 6G - nvidia.com/gpu: 1 # requesting 1 GPU - requests: - cpu: "2" - memory: 6G - nvidia.com/gpu: 1 # requesting 1 GPU - -.. _configuration: -Configuration -------------- - -You can use `Dask's configuration `_ -to control the behavior of Dask-kubernetes. You can see a full set of -configuration options -`here `_. -Some notable ones are described below: - -1. ``kubernetes.worker-template-path``: a path to a YAML file that holds a - Pod spec for the worker. If provided then this will be used when - :py:class:`dask_kubernetes.classic.KubeCluster` is called with no arguments:: - - cluster = KubeCluster() # reads provided yaml file - -2. ``distributed.dashboard.link``: a Python pre-formatted string that shows - the location of Dask's dashboard. This string will receive values for - ``host``, ``port``, and all environment variables. - - For example this is useful when using dask-kubernetes with JupyterHub and - `nbserverproxy `_ to route the - dashboard link to a proxied address as follows:: - - "{JUPYTERHUB_SERVICE_PREFIX}proxy/{port}/status" - -3. ``kubernetes.worker-name``: a Python pre-formatted string to use - when naming dask worker pods. This string will receive values for ``user``, - ``uuid``, and all environment variables. This is useful when you want to have - control over the naming convention for your pods and use other tokens from - the environment. For example when using zero-to-jupyterhub every user is - called ``jovyan`` and so you may wish to use ``dask-{JUPYTERHUB_USER}-{uuid}`` - instead of ``dask-{user}-{uuid}``. **Ensure you keep the ``uuid`` somewhere in - the template.** - -Role-Based Access Control (RBAC) --------------------------------- - -In order to spawn a Dask cluster, the service account creating those pods will require -a set of RBAC permissions. Create a service account you will use for Dask, and then attach the -following Role to that ServiceAccount via a RoleBinding: - -.. code-block:: yaml - - kind: Role - apiVersion: rbac.authorization.k8s.io/v1beta1 - metadata: - name: daskKubernetes - rules: - - apiGroups: - - "" # indicates the core API group - resources: - - "pods" - verbs: - - "get" - - "list" - - "watch" - - "create" - - "delete" - - apiGroups: - - "" # indicates the core API group - resources: - - "pods/log" - verbs: - - "get" - - "list" - - apiGroups: - - "" # indicates the core API group - resources: - - "services" - verbs: - - "get" - - "list" - - "watch" - - "create" - - "delete" - - apiGroups: - - "policy" # indicates the policy API group - resources: - - "poddisruptionbudgets" - verbs: - - "get" - - "list" - - "watch" - - "create" - - "delete" - - -Docker Images -------------- - -Example Dask docker images ghcr.io/dask/dask and ghcr.io/dask/dask-notebook -are available on https://github.com/orgs/dask/packages . -More information about these images is available at the -`Dask documentation `_. - -Note that these images can be further customized with extra packages using -``EXTRA_PIP_PACKAGES``, ``EXTRA_APT_PACKAGES``, and ``EXTRA_CONDA_PACKAGES`` -as described in the -`Extensibility section `_. - -Deployment Details ------------------- - -Scheduler -~~~~~~~~~ - -Before workers are created a scheduler will be deployed with the following resources: - -- A pod with a scheduler running -- A service (svc) to expose scheduler and dashboard ports -- A PodDisruptionBudget avoid voluntary disruptions of the scheduler pod - -By default the Dask configuration option ``kubernetes.scheduler-service-type`` is -set to ``ClusterIp``. In order to connect to the scheduler the ``KubeCluster`` will first attempt to connect directly, -but this will only be successful if ``dask-kubernetes`` is being run from within the Kubernetes cluster. -If it is unsuccessful it will attempt to port forward the service locally using the ``kubectl`` utility. - -If you update the service type to ``NodePort``. The scheduler will be exposed on the same random high port on all -nodes in the cluster. In this case ``KubeCluster`` will attempt to list nodes in order to get an IP to connect on -and requires additional permissions to do so. - -.. code-block:: yaml - - - apiGroups: - - "" # indicates the core API group - resources: - - "nodes" - verbs: - - "get" - - "list" - - -If you set the service type to ``LoadBalancer`` then ``KubeCluster`` will connect to the external address of the assigned -loadbalancer, but this does require that your Kubernetes cluster has the appropriate operator to assign loadbalancers. - -Legacy mode -^^^^^^^^^^^ - -For backward compatibility with previous versions of ``dask-kubernetes`` it is also possible to run the scheduler locally. -A ``local`` scheduler is created where the Dask client will be created. - -.. code-block:: python - - from dask_kubernetes.classic import KubeCluster - from dask.distributed import Client - - cluster = KubeCluster.from_yaml('worker-spec.yml', deploy_mode='local') - cluster.scale(10) - client = Client(cluster) - -In this mode the Dask workers will attempt to connect to the machine where you are running ``dask-kubernetes``. -Generally this will need to be within the Kubernetes cluster in order for the workers to make a successful connection. - -Workers -~~~~~~~ - -Workers are created directly as simple pods. These worker pods are configured -to shutdown if they are unable to connect to the scheduler for 60 seconds. -The pods are cleaned up when :meth:`~dask_kubernetes.classic.KubeCluster.close` is called, -or the scheduler process exits. - -The pods are created with two default `tolerations `_: - -* ``k8s.dask.org/dedicated=worker:NoSchedule`` -* ``k8s.dask.org_dedicated=worker:NoSchedule`` - -If you have nodes with the corresponding taints, then the worker pods will -schedule to those nodes (and no other pods will be able to schedule to those -nodes). - -API ---- - -.. currentmodule:: dask_kubernetes - -.. autosummary:: - KubeCluster - KubeCluster.adapt - KubeCluster.from_dict - KubeCluster.from_yaml - KubeCluster.get_logs - KubeCluster.pods - KubeCluster.scale - InCluster - KubeConfig - KubeAuth - make_pod_spec - -.. autoclass:: KubeCluster - :members: - -.. autoclass:: ClusterAuth - :members: - -.. autoclass:: InCluster - -.. autoclass:: KubeConfig - -.. autoclass:: KubeAuth - -.. autofunction:: make_pod_spec diff --git a/doc/source/kubecluster_migrating.rst b/doc/source/kubecluster_migrating.rst index b3cd28371..699865341 100644 --- a/doc/source/kubecluster_migrating.rst +++ b/doc/source/kubecluster_migrating.rst @@ -1,32 +1,7 @@ -Migrating to the operator -========================= +Migrating from classic +====================== -The ``KubeCluster`` class is being replaced with a new version that is built using the Kubernetes Operator pattern. -We encourage all users of the classic implementation to migrate to the new version as it is generally unmaintained and will be removed in a future release. - -Why should you migrate? ------------------------ - -You may be thinking "why do I have to do this?" and "the old version works just fine" so let's take a moment to unpack why we have rebuilt ``KubeCluster`` and to hopefully convince you that this is a good decision and worth the effort. - -The :doc:`original implementation ` of ``KubeCluster`` was created shortly after Kubernetes went ``1.0`` and best practice design patterns were still emerging. -While it has been updated over time it has been more and more difficult to maintain due to the way it was designed. - -We decided to completely rebuild ``dask-kubernetes`` with the `operator pattern in mind `_ which is now an established design pattern for building composable Kubernetes applications. - -Here are some reasons why we decided to make this change: - -- Top level ``DaskCluster`` resource - - Cascade deletion -- Simpler Python API -- More powerful YAML API - - Create, scale and delete clusters with ``kubectl`` -- Detatch and reattactch from running clusters -- New resource types like ``DaskJob`` -- Multiple worker groups -- Autoscaling handled by the controller and not the cluster manager - -For more information watch the Dask blog for the Dask Kubernetes Operator announcement blog post. +The classic ``KubeCluster`` class has been replaced with a new version that is built using the Kubernetes Operator pattern. Installing the operator diff --git a/doc/source/operator_kubecluster.rst b/doc/source/operator_kubecluster.rst index a15ad517b..e2e801f5a 100644 --- a/doc/source/operator_kubecluster.rst +++ b/doc/source/operator_kubecluster.rst @@ -3,14 +3,10 @@ KubeCluster .. currentmodule:: dask_kubernetes.operator -.. note:: - - As of ``2022.10.0`` the default ``KubeCluster`` class requires the :doc:`Dask Kubernetes Operator `. For documentation on the classic KubeCluster implementation :doc:`see here `. - Cluster manager --------------- -The operator has a new cluster manager called :class:`dask_kubernetes.operator.KubeCluster` that you can use to conveniently create and manage a Dask cluster in Python. Then connect a Dask :class:`distributed.Client` object to it directly and perform your work. +The operator has a cluster manager called :class:`dask_kubernetes.operator.KubeCluster` that you can use to conveniently create and manage a Dask cluster in Python. Then connect a Dask :class:`distributed.Client` object to it directly and perform your work. The goal of the cluster manager is to abstract away the complexity of the Kubernetes resources and provide a clean and simple Python API to manager clusters while still getting all the benefits of the operator.