From b711963055d464f658ad0fb293a47efb35034822 Mon Sep 17 00:00:00 2001 From: Safoine El Khabich <34200873+safoinme@users.noreply.github.com> Date: Wed, 25 Sep 2024 08:24:45 +0100 Subject: [PATCH] Skypilot with Kubernetes (#3033) * Add Skypilot Kubernetes integration for remote orchestration This commit adds the Skypilot Kubernetes integration to enable remote orchestration of ZenML pipelines on VMs. The integration includes the necessary initialization, configuration, and flavor classes. It also adds the Skypilot Kubernetes orchestrator and its settings. This integration provides an alternative to the local orchestrator for running pipelines on Kubernetes. * Add Skypilot Kubernetes integration for remote orchestration * Refactor SkypilotKubernetesOrchestrator prepare_environment_variable method * Refactor SkypilotKubernetesOrchestrator prepare_environment_variable method * Refactor SkypilotKubernetesOrchestrator prepare_environment_variable method * Refactor SkypilotBaseOrchestrator setup_credentials method * Refactor SkypilotBaseOrchestrator setup_credentials method and SkypilotKubernetesOrchestrator prepare_environment_variable method * Refactor SkypilotBaseOrchestrator setup_credentials method and SkypilotKubernetesOrchestrator prepare_environment_variable method * Update src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py Co-authored-by: Stefan Nica * Refactor SkypilotKubernetesOrchestrator to handle missing service connector * Refactor SkypilotBaseOrchestrator to use the correct virtual environment path for Kubernetes * specll check * fix run command * Refactor SkypilotKubernetesVMOrchestrator to remove unnecessary code * Refactor SkypilotKubernetesVMOrchestrator to remove unnecessary code --------- Co-authored-by: Stefan Nica --- .../orchestrators/skypilot-vm.md | 77 +++++++++++ src/zenml/integrations/__init__.py | 1 + src/zenml/integrations/constants.py | 1 + .../skypilot_base_vm_orchestrator.py | 64 +++++---- .../skypilot_kubernetes/__init__.py | 52 ++++++++ .../skypilot_kubernetes/flavors/__init__.py | 26 ++++ ...pilot_orchestrator_kubernetes_vm_flavor.py | 125 ++++++++++++++++++ .../orchestrators/__init__.py | 25 ++++ .../skypilot_kubernetes_vm_orchestrator.py | 74 +++++++++++ 9 files changed, 419 insertions(+), 26 deletions(-) create mode 100644 src/zenml/integrations/skypilot_kubernetes/__init__.py create mode 100644 src/zenml/integrations/skypilot_kubernetes/flavors/__init__.py create mode 100644 src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py create mode 100644 src/zenml/integrations/skypilot_kubernetes/orchestrators/__init__.py create mode 100644 src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py diff --git a/docs/book/component-guide/orchestrators/skypilot-vm.md b/docs/book/component-guide/orchestrators/skypilot-vm.md index 96d05c64fd4..61d489223a5 100644 --- a/docs/book/component-guide/orchestrators/skypilot-vm.md +++ b/docs/book/component-guide/orchestrators/skypilot-vm.md @@ -242,6 +242,55 @@ The Lambda Labs orchestrator does not support some of the features like `job_rec While testing the orchestrator, we noticed that the Lambda Labs orchestrator does not support the `down` flag. This means the orchestrator will not automatically tear down the cluster after all jobs finish. We recommend manually tearing down the cluster after all jobs finish to avoid unnecessary costs. {% endhint %} {% endtab %} + +{% tab title="Kubernetes" %} +We need first to install the SkyPilot integration for Kubernetes, using the following two commands: + +```shell + zenml integration install skypilot_kubernetes +``` + +To provision skypilot on kubernetes cluster, your orchestrator stack components needs to be configured to authenticate with a +[Service Connector](../../how-to/auth-management/service-connectors-guide.md). To configure the Service Connector, you need to register a new service connector configured with the appropriate credentials and permissions to access the K8s cluster. You can then use the service connector to configure your registered the Orchestrator stack component using the following command: + +First, check that the Kubernetes service connector type is available using the following command: + +```shell +zenml service-connector list-types --type kubernetes +``` +```shell +┏━━━━━━━━━━━━┯━━━━━━━━━━━━┯━━━━━━━━━━━━┯━━━━━━━━━━━┯━━━━━━━┯━━━━━━━━┓ +┃ │ │ RESOURCE │ AUTH │ │ ┃ +┃ NAME │ TYPE │ TYPES │ METHODS │ LOCAL │ REMOTE ┃ +┠────────────┼────────────┼────────────┼───────────┼───────┼────────┨ +┃ Kubernetes │ 🌀 │ 🌀 │ password │ ✅ │ ✅ ┃ +┃ Service │ kubernetes │ kubernetes │ token │ │ ┃ +┃ Connector │ │ -cluster │ │ │ ┃ +┗━━━━━━━━━━━━┷━━━━━━━━━━━━┷━━━━━━━━━━━━┷━━━━━━━━━━━┷━━━━━━━┷━━━━━━━━┛ +``` + +Next, configure a service connector using the CLI or the dashboard with the AWS credentials. For example, the following command uses the local AWS CLI credentials to auto-configure the service connector: + +```shell +zenml service-connector register kubernetes-skypilot --type kubernetes -i +``` + +This will automatically configure the service connector with the appropriate credentials and permissions to provision VMs on AWS. You can then use the service connector to configure your registered VM Orchestrator stack component using the following command: + +```shell +# Register the orchestrator +zenml orchestrator register --flavor sky_kubernetes +# Connect the orchestrator to the service connector +zenml orchestrator connect --connector kubernetes-skypilot + +# Register and activate a stack with the new orchestrator +zenml stack register -o ... --set +``` + +{% hint style="warning" %} +Some of the features like `job_recovery`, `disk_tier`, `image_id`, `zone`, `idle_minutes_to_autostop`, `disk_size`, `use_spot` are not supported by the Kubernetes orchestrator. It is recommended not to use these features with the Kubernetes orchestrator and not to use [step-specific settings](skypilot-vm.md#configuring-step-specific-resources). +{% endhint %} +{% endtab %} {% endtabs %} #### Additional Configuration @@ -392,6 +441,34 @@ skypilot_settings = SkypilotLambdaOrchestratorSettings( ) +@pipeline( + settings={ + "orchestrator": skypilot_settings + } +) +``` +{% endtab %} + +{% tab title="Kubernetes" %} + +**Code Example:** + +```python +from zenml.integrations.skypilot_kubernetes.flavors.skypilot_orchestrator_kubernetes_vm_flavor import SkypilotKubernetesOrchestratorSettings + +skypilot_settings = SkypilotKubernetesOrchestratorSettings( + cpus="2", + memory="16", + accelerators="V100:2", + image_id="ami-1234567890abcdef0", + disk_size=100, + cluster_name="my_cluster", + retry_until_up=True, + stream_logs=True + docker_run_args=["--gpus=all"] +) + + @pipeline( settings={ "orchestrator": skypilot_settings diff --git a/src/zenml/integrations/__init__.py b/src/zenml/integrations/__init__.py index f6273ba2928..da08157e416 100644 --- a/src/zenml/integrations/__init__.py +++ b/src/zenml/integrations/__init__.py @@ -69,6 +69,7 @@ from zenml.integrations.skypilot_gcp import SkypilotGCPIntegration # noqa from zenml.integrations.skypilot_azure import SkypilotAzureIntegration # noqa from zenml.integrations.skypilot_lambda import SkypilotLambdaIntegration # noqa +from zenml.integrations.skypilot_kubernetes import SkypilotKubernetesIntegration # noqa from zenml.integrations.slack import SlackIntegration # noqa from zenml.integrations.spark import SparkIntegration # noqa from zenml.integrations.tekton import TektonIntegration # noqa diff --git a/src/zenml/integrations/constants.py b/src/zenml/integrations/constants.py index 5c780712237..3f06dbb452e 100644 --- a/src/zenml/integrations/constants.py +++ b/src/zenml/integrations/constants.py @@ -64,6 +64,7 @@ SKYPILOT_GCP = "skypilot_gcp" SKYPILOT_AZURE = "skypilot_azure" SKYPILOT_LAMBDA = "skypilot_lambda" +SKYPILOT_KUBERNETES = "skypilot_kubernetes" SLACK = "slack" SPARK = "spark" TEKTON = "tekton" diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index e6765c4c85a..3f643f2ae66 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -250,6 +250,7 @@ def prepare_or_run_pipeline( entrypoint_str = " ".join(command) arguments_str = " ".join(args) + task_envs = environment docker_environment_str = " ".join( f"-e {k}={v}" for k, v in environment.items() ) @@ -271,13 +272,10 @@ def prepare_or_run_pipeline( f"sudo docker login --username $DOCKER_USERNAME --password " f"$DOCKER_PASSWORD {stack.container_registry.config.uri}" ) - task_envs = { - "DOCKER_USERNAME": docker_username, - "DOCKER_PASSWORD": docker_password, - } + task_envs["DOCKER_USERNAME"] = docker_username + task_envs["DOCKER_PASSWORD"] = docker_password else: setup = None - task_envs = None # Run the entire pipeline @@ -285,15 +283,22 @@ def prepare_or_run_pipeline( self.prepare_environment_variable(set=True) try: + if isinstance(self.cloud, sky.clouds.Kubernetes): + run_command = f"${{VIRTUAL_ENV:+$VIRTUAL_ENV/bin/}}{entrypoint_str} {arguments_str}" + setup = None + down = False + idle_minutes_to_autostop = None + else: + run_command = f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" + down = settings.down + idle_minutes_to_autostop = settings.idle_minutes_to_autostop task = sky.Task( - run=f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}", + run=run_command, setup=setup, envs=task_envs, ) - logger.debug( - f"Running run: sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" - ) - logger.debug(f"Running run: {setup}") + logger.debug(f"Running run: {run_command}") + task = task.set_resources( sky.Resources( cloud=self.cloud, @@ -306,15 +311,24 @@ def prepare_or_run_pipeline( job_recovery=settings.job_recovery, region=settings.region, zone=settings.zone, - image_id=settings.image_id, + image_id=image + if isinstance(self.cloud, sky.clouds.Kubernetes) + else settings.image_id, disk_size=settings.disk_size, disk_tier=settings.disk_tier, ) ) - # Set the cluster name - cluster_name = settings.cluster_name - if cluster_name is None: + if settings.cluster_name: + sky.exec( + task, + settings.cluster_name, + down=down, + stream_logs=settings.stream_logs, + backend=None, + detach_run=True, + ) + else: # Find existing cluster for i in sky.status(refresh=True): if isinstance( @@ -324,21 +338,19 @@ def prepare_or_run_pipeline( logger.info( f"Found existing cluster {cluster_name}. Reusing..." ) - if cluster_name is None: cluster_name = self.sanitize_cluster_name( f"{orchestrator_run_name}" ) - - # Launch the cluster - sky.launch( - task, - cluster_name, - retry_until_up=settings.retry_until_up, - idle_minutes_to_autostop=settings.idle_minutes_to_autostop, - down=settings.down, - stream_logs=settings.stream_logs, - detach_setup=True, - ) + # Launch the cluster + sky.launch( + task, + cluster_name, + retry_until_up=settings.retry_until_up, + idle_minutes_to_autostop=idle_minutes_to_autostop, + down=down, + stream_logs=settings.stream_logs, + detach_setup=True, + ) except Exception as e: logger.error(f"Pipeline run failed: {e}") diff --git a/src/zenml/integrations/skypilot_kubernetes/__init__.py b/src/zenml/integrations/skypilot_kubernetes/__init__.py new file mode 100644 index 00000000000..0ba82327688 --- /dev/null +++ b/src/zenml/integrations/skypilot_kubernetes/__init__.py @@ -0,0 +1,52 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Initialization of the Skypilot Kubernetes integration for ZenML. + +The Skypilot integration sub-module powers an alternative to the local +orchestrator for a remote orchestration of ZenML pipelines on VMs. +""" +from typing import List, Type + +from zenml.integrations.constants import ( + SKYPILOT_KUBERNETES, +) +from zenml.integrations.integration import Integration +from zenml.stack import Flavor + +SKYPILOT_KUBERNETES_ORCHESTRATOR_FLAVOR = "vm_kubernetes" + + +class SkypilotKubernetesIntegration(Integration): + """Definition of Skypilot Kubernetes Integration for ZenML.""" + + NAME = SKYPILOT_KUBERNETES + # all 0.6.x versions of skypilot[kubernetes] are compatible + REQUIREMENTS = ["skypilot[kubernetes]~=0.6.1"] + APT_PACKAGES = ["openssh-client", "rsync"] + + @classmethod + def flavors(cls) -> List[Type[Flavor]]: + """Declare the stack component flavors for the Skypilot Kubernetes integration. + + Returns: + List of stack component flavors for this integration. + """ + from zenml.integrations.skypilot_kubernetes.flavors import ( + SkypilotKubernetesOrchestratorFlavor, + ) + + return [SkypilotKubernetesOrchestratorFlavor] + + +SkypilotKubernetesIntegration.check_installation() \ No newline at end of file diff --git a/src/zenml/integrations/skypilot_kubernetes/flavors/__init__.py b/src/zenml/integrations/skypilot_kubernetes/flavors/__init__.py new file mode 100644 index 00000000000..8b2e352d4a4 --- /dev/null +++ b/src/zenml/integrations/skypilot_kubernetes/flavors/__init__.py @@ -0,0 +1,26 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Skypilot integration flavor for Skypilot Kubernetes orchestrator.""" + +from zenml.integrations.skypilot_kubernetes.flavors.skypilot_orchestrator_kubernetes_vm_flavor import ( + SkypilotKubernetesOrchestratorConfig, + SkypilotKubernetesOrchestratorFlavor, + SkypilotKubernetesOrchestratorSettings, +) + +__all__ = [ + "SkypilotKubernetesOrchestratorConfig", + "SkypilotKubernetesOrchestratorFlavor", + "SkypilotKubernetesOrchestratorSettings", +] diff --git a/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py b/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py new file mode 100644 index 00000000000..7334750f6fd --- /dev/null +++ b/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py @@ -0,0 +1,125 @@ +# Copyright (c) ZenML GmbH 2023. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Skypilot orchestrator Kubernetes flavor.""" + +from typing import TYPE_CHECKING, Optional, Type + +from zenml.constants import KUBERNETES_CLUSTER_RESOURCE_TYPE +from zenml.integrations.skypilot.flavors.skypilot_orchestrator_base_vm_config import ( + SkypilotBaseOrchestratorConfig, + SkypilotBaseOrchestratorSettings, +) +from zenml.integrations.skypilot_kubernetes import ( + SKYPILOT_KUBERNETES_ORCHESTRATOR_FLAVOR, +) +from zenml.logger import get_logger +from zenml.models import ServiceConnectorRequirements +from zenml.orchestrators import BaseOrchestratorConfig, BaseOrchestratorFlavor + +if TYPE_CHECKING: + from zenml.integrations.skypilot_kubernetes.orchestrators import ( + SkypilotKubernetesOrchestrator, + ) + + +logger = get_logger(__name__) + + +class SkypilotKubernetesOrchestratorSettings(SkypilotBaseOrchestratorSettings): + """Skypilot orchestrator settings.""" + + +class SkypilotKubernetesOrchestratorConfig( + SkypilotBaseOrchestratorConfig, SkypilotKubernetesOrchestratorSettings +): + """Skypilot orchestrator config.""" + + +class SkypilotKubernetesOrchestratorFlavor(BaseOrchestratorFlavor): + """Flavor for the Skypilot Kubernetes orchestrator.""" + + @property + def name(self) -> str: + """Name of the orchestrator flavor. + + Returns: + Name of the orchestrator flavor. + """ + return SKYPILOT_KUBERNETES_ORCHESTRATOR_FLAVOR + + @property + def service_connector_requirements( + self, + ) -> Optional[ServiceConnectorRequirements]: + """Service connector resource requirements for service connectors. + + Specifies resource requirements that are used to filter the available + service connector types that are compatible with this flavor. + + Returns: + Requirements for compatible service connectors, if a service + connector is required for this flavor. + """ + return ServiceConnectorRequirements( + resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE, + ) + + @property + def docs_url(self) -> Optional[str]: + """A url to point at docs explaining this flavor. + + Returns: + A flavor docs url. + """ + return self.generate_default_docs_url() + + @property + def sdk_docs_url(self) -> Optional[str]: + """A url to point at SDK docs explaining this flavor. + + Returns: + A flavor SDK docs url. + """ + return self.generate_default_sdk_docs_url() + + @property + def logo_url(self) -> str: + """A url to represent the flavor in the dashboard. + + Returns: + The flavor logo. + """ + return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/kubernetes-skypilot.png" + + @property + def config_class(self) -> Type[BaseOrchestratorConfig]: + """Config class for the base orchestrator flavor. + + Returns: + The config class. + """ + return SkypilotKubernetesOrchestratorConfig + + @property + def implementation_class(self) -> Type["SkypilotKubernetesOrchestrator"]: + """Implementation class for this flavor. + + Returns: + Implementation class for this flavor. + """ + from zenml.integrations.skypilot_kubernetes.orchestrators import ( + SkypilotKubernetesOrchestrator, + ) + + return SkypilotKubernetesOrchestrator diff --git a/src/zenml/integrations/skypilot_kubernetes/orchestrators/__init__.py b/src/zenml/integrations/skypilot_kubernetes/orchestrators/__init__.py new file mode 100644 index 00000000000..c0ab27f1b4d --- /dev/null +++ b/src/zenml/integrations/skypilot_kubernetes/orchestrators/__init__.py @@ -0,0 +1,25 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Initialization of the Skypilot Kubernetes ZenML orchestrator.""" + +from zenml.integrations.skypilot.orchestrators.skypilot_base_vm_orchestrator import ( # noqa + SkypilotBaseOrchestrator, +) +from zenml.integrations.skypilot_kubernetes.orchestrators.skypilot_kubernetes_vm_orchestrator import ( # noqa + SkypilotKubernetesOrchestrator, +) +__all__ = [ + "SkypilotBaseOrchestrator", + "SkypilotKubernetesOrchestrator", +] diff --git a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py new file mode 100644 index 00000000000..e1ce34cd6de --- /dev/null +++ b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py @@ -0,0 +1,74 @@ +# Copyright (c) ZenML GmbH 2023. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Implementation of the a Skypilot based Kubernetes VM orchestrator.""" + +from typing import TYPE_CHECKING, Optional, Type, cast + +import sky + +from zenml.integrations.skypilot.orchestrators.skypilot_base_vm_orchestrator import ( + SkypilotBaseOrchestrator, +) +from zenml.integrations.skypilot_kubernetes.flavors.skypilot_orchestrator_kubernetes_vm_flavor import ( + SkypilotKubernetesOrchestratorConfig, + SkypilotKubernetesOrchestratorSettings, +) +from zenml.logger import get_logger + +if TYPE_CHECKING: + from zenml.config.base_settings import BaseSettings + +logger = get_logger(__name__) + + +class SkypilotKubernetesOrchestrator(SkypilotBaseOrchestrator): + """Orchestrator responsible for running pipelines remotely in a VM on Kubernetes. + + This orchestrator does not support running on a schedule. + """ + + @property + def cloud(self) -> sky.clouds.Cloud: + """The type of sky cloud to use. + + Returns: + A `sky.clouds.Cloud` instance. + """ + return sky.clouds.Kubernetes() + + @property + def config(self) -> SkypilotKubernetesOrchestratorConfig: + """Returns the `SkypilotKubernetesOrchestratorConfig` config. + + Returns: + The configuration. + """ + return cast(SkypilotKubernetesOrchestratorConfig, self._config) + + @property + def settings_class(self) -> Optional[Type["BaseSettings"]]: + """Settings class for the Skypilot orchestrator. + + Returns: + The settings class. + """ + return SkypilotKubernetesOrchestratorSettings + + def prepare_environment_variable(self, set: bool = True) -> None: + """Set up Environment variables that are required for the orchestrator. + + Args: + set: Whether to set the environment variables or not. + """ + pass