diff --git a/demo-notebooks/guided-demos/4_training.ipynb b/demo-notebooks/guided-demos/4_training.ipynb new file mode 100644 index 000000000..b6b049727 --- /dev/null +++ b/demo-notebooks/guided-demos/4_training.ipynb @@ -0,0 +1,280 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "69f9bce8-b833-4b1e-af1b-a946f41d072f", + "metadata": {}, + "outputs": [], + "source": [ + "# Import pieces from codeflare-sdk\n", + "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication\n", + "from codeflare_sdk.kubeflow.client.training_client import TrainingClient\n", + "from kubeflow.training.constants import constants" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bdea2430-8e17-48d9-9e48-42830af4fa5c", + "metadata": {}, + "outputs": [], + "source": [ + "# Create authentication object for user permissions\n", + "# IF unused, SDK will automatically check for default kubeconfig, then in-cluster config\n", + "# KubeConfigFileAuthentication can also be used to specify kubeconfig path manually\n", + "auth = TokenAuthentication(\n", + " token = \"sha256~HEKZT_MDfpf7S3bKOrFaZaD6SDD0xoHY-o7YmYiaH-k\",\n", + " server = \"https://api.mark-rosa.wtsd.p3.openshiftapps.com:443\",\n", + " skip_tls=False\n", + ")\n", + "auth.login()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "daa18c62-8078-497d-9d78-7bc139f40571", + "metadata": {}, + "outputs": [], + "source": [ + "tc = TrainingClient(job_kind=constants.TFJOB_KIND)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e46effb4-96a5-466f-9dfa-5a95ed6cabfe", + "metadata": {}, + "outputs": [], + "source": [ + "from kubernetes.client import V1Container\n", + "from kubernetes.client import V1PodTemplateSpec\n", + "from kubernetes.client import V1ObjectMeta\n", + "from kubernetes.client import V1PodSpec\n", + "from kubernetes.client import V1Container\n", + "from kubeflow.training.api.training_client import TrainingClient\n", + "from kubeflow.training import KubeflowOrgV1ReplicaSpec, KubeflowOrgV1TFJobSpec,KubeflowOrgV1TFJob, KubeflowOrgV1RunPolicy\n", + "\n", + "container = V1Container(\n", + "\tname=\"tensorflow2\",\n", + "\timage=\"gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0\",\n", + "\tcommand=[\n", + " \t\"python\",\n", + " \t\"/var/tf_mnist/mnist_with_summaries.py\",\n", + " \t\"--learning_rate=0.01\",\n", + " \t\"--batch_size=150\"\n", + " \t]\n", + ")\n", + "\n", + "worker = KubeflowOrgV1ReplicaSpec(\n", + "\treplicas=1,\n", + "\trestart_policy=\"Never\",\n", + "\ttemplate=V1PodTemplateSpec(\n", + " \tspec=V1PodSpec(\n", + " \tcontainers=[container]\n", + " \t)\n", + "\t)\n", + ")\n", + "run_policy=KubeflowOrgV1RunPolicy(\n", + "\tactive_deadline_seconds = None,\n", + "\tbackoff_limit = None,\n", + "\tclean_pod_policy = None,\n", + "\tscheduling_policy = None,\n", + "\tsuspend = False,\n", + "\tttl_seconds_after_finished = None\n", + ")\n", + "tfjob = KubeflowOrgV1TFJob(\n", + "\tapi_version=\"kubeflow.org/v1\",\n", + "\tkind=\"TFJob\",\n", + "\tmetadata=V1ObjectMeta(name=\"mnist-examples\",namespace=\"mark-dsp\"),\n", + "\tspec=KubeflowOrgV1TFJobSpec(\n", + " \trun_policy=run_policy,\n", + " \ttf_replica_specs={\"Worker\": worker}\n", + "\t)\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12649abc-1169-42aa-af1c-cafeff5e10f8", + "metadata": {}, + "outputs": [], + "source": [ + "tc.create_job(namespace=\"mark-dsp\", job=tfjob)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0ea416bc-4907-41ee-8d4c-6b63fd07d68a", + "metadata": {}, + "outputs": [], + "source": [ + "print(tc.list_jobs(namespace=\"mark-dsp\", job_kind=constants.TFJOB_KIND))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "68b4ae57-c6ce-482a-8c43-5c3a4bc59268", + "metadata": {}, + "outputs": [], + "source": [ + "print(tc.get_job_conditions(name=\"mnist-examples\", namespace=\"mark-dsp\", job_kind=constants.TFJOB_KIND))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "78d89180-9b10-4af7-9a02-1214c51568df", + "metadata": {}, + "outputs": [], + "source": [ + "tc.is_job_created(name=\"mnist-examples\", namespace=\"mark-dsp\", job_kind=constants.TFJOB_KIND)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0606647e-6b15-4108-8684-9e525ca92a88", + "metadata": {}, + "outputs": [], + "source": [ + "tc.is_job_succeeded(name=\"mnist-examples\", namespace=\"mark-dsp\", job_kind=constants.TFJOB_KIND)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "27800f25-6345-4d4a-bf6c-a4a5a33bb7cd", + "metadata": {}, + "outputs": [], + "source": [ + "tc.is_job_restarting(name=\"mnist-examples\", namespace=\"mark-dsp\", job_kind=constants.TFJOB_KIND)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "174f611e-3647-42fe-be22-09e524d32ce6", + "metadata": {}, + "outputs": [], + "source": [ + "tc.is_job_running(name=\"mnist-examples\", namespace=\"mark-dsp\", job_kind=constants.TFJOB_KIND)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b23339f6-1c43-4d22-9694-b2c2374bf610", + "metadata": {}, + "outputs": [], + "source": [ + "tc.is_job_failed(name=\"mnist-examples\", namespace=\"mark-dsp\", job_kind=constants.TFJOB_KIND)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df5b3c0b-b3e8-4d2c-9a8f-51b823345586", + "metadata": {}, + "outputs": [], + "source": [ + "tfjob = tc.get_job(name=\"mnist-examples\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "48aa7c94-021a-46a5-9567-aa47719d5a73", + "metadata": {}, + "outputs": [], + "source": [ + "tc.wait_for_job_conditions(name=\"mnist-examples\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f802b544-fdb4-43c4-b856-69754608373e", + "metadata": {}, + "outputs": [], + "source": [ + "pods = tc.get_job_pods(name=\"mnist-examples\")\n", + "print(pods)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "03824d58-93d1-4043-a711-36aefbce3fa9", + "metadata": {}, + "outputs": [], + "source": [ + "pod_names = tc.get_job_pod_names(name=\"mnist-examples\")\n", + "print(pod_names)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f4b2fd03-6ced-4eba-803a-71c3ab77e04a", + "metadata": {}, + "outputs": [], + "source": [ + "print(tc.get_job_logs(name=\"mnist-examples\"))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1e591d4b-1ae2-4d48-9412-25a7dcea5a25", + "metadata": {}, + "outputs": [], + "source": [ + "tc.update_job(tfjob, \"mnist-examples\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2cc73adf-8033-42b7-a776-0e1c27ccfcc0", + "metadata": {}, + "outputs": [], + "source": [ + "tc.delete_job(name=\"mnist-examples\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3ebf2511-412d-4dba-b2dd-f3f5c3097eb6", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.11", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/poetry.lock b/poetry.lock index 5c4ce93f8..c2903d715 100644 --- a/poetry.lock +++ b/poetry.lock @@ -884,6 +884,17 @@ files = [ {file = "docutils-0.20.1.tar.gz", hash = "sha256:f08a4e276c3a1583a86dce3e34aba3fe04d02bba2dd51ed16106244e8a923e3b"}, ] +[[package]] +name = "durationpy" +version = "0.9" +description = "Module for converting between datetime.timedelta and Go's Duration strings." +optional = false +python-versions = "*" +files = [ + {file = "durationpy-0.9-py3-none-any.whl", hash = "sha256:e65359a7af5cedad07fb77a2dd3f390f8eb0b74cb845589fa6c057086834dd38"}, + {file = "durationpy-0.9.tar.gz", hash = "sha256:fd3feb0a69a0057d582ef643c355c40d2fa1c942191f914d12203b1a01ac722a"}, +] + [[package]] name = "exceptiongroup" version = "1.2.2" @@ -1751,25 +1762,49 @@ files = [ {file = "jupyterlab_widgets-3.0.13.tar.gz", hash = "sha256:a2966d385328c1942b683a8cd96b89b8dd82c8b8f81dda902bb2bc06d46f5bed"}, ] +[[package]] +name = "kubeflow-training" +version = "1.8.1" +description = "Training Operator Python SDK" +optional = false +python-versions = "*" +files = [ + {file = "kubeflow-training-1.8.1.tar.gz", hash = "sha256:137cdeb5843f90e637fd53df988cbf6894501900c8f372e7de75a6a1302fe8f5"}, + {file = "kubeflow_training-1.8.1-py3-none-any.whl", hash = "sha256:ef36df107b850c6e6587dc1b9b7a92aaa7b6cf28e9d73b5f1eb87dd84e80222d"}, +] + +[package.dependencies] +certifi = ">=14.05.14" +kubernetes = ">=27.2.0" +retrying = ">=1.3.3" +setuptools = ">=21.0.0" +six = ">=1.10" +urllib3 = ">=1.15.1" + +[package.extras] +huggingface = ["peft (==0.3.0)", "transformers (==4.38.0)"] +test = ["black (==24.3.0)", "flake8 (==4.0.1)", "mypy", "pytest", "pytest-tornasync"] + [[package]] name = "kubernetes" -version = "26.1.0" +version = "31.0.0" description = "Kubernetes python client" optional = false python-versions = ">=3.6" files = [ - {file = "kubernetes-26.1.0-py2.py3-none-any.whl", hash = "sha256:e3db6800abf7e36c38d2629b5cb6b74d10988ee0cba6fba45595a7cbe60c0042"}, - {file = "kubernetes-26.1.0.tar.gz", hash = "sha256:5854b0c508e8d217ca205591384ab58389abdae608576f9c9afc35a3c76a366c"}, + {file = "kubernetes-31.0.0-py2.py3-none-any.whl", hash = "sha256:bf141e2d380c8520eada8b351f4e319ffee9636328c137aa432bc486ca1200e1"}, + {file = "kubernetes-31.0.0.tar.gz", hash = "sha256:28945de906c8c259c1ebe62703b56a03b714049372196f854105afe4e6d014c0"}, ] [package.dependencies] certifi = ">=14.05.14" +durationpy = ">=0.7" google-auth = ">=1.0.1" +oauthlib = ">=3.2.2" python-dateutil = ">=2.5.3" pyyaml = ">=5.4.1" requests = "*" requests-oauthlib = "*" -setuptools = ">=21.0.0" six = ">=1.9.0" urllib3 = ">=1.24.2" websocket-client = ">=0.32.0,<0.40.0 || >0.40.0,<0.41.dev0 || >=0.43.dev0" @@ -3285,6 +3320,20 @@ requests = ">=2.0.0" [package.extras] rsa = ["oauthlib[signedtoken] (>=3.0.0)"] +[[package]] +name = "retrying" +version = "1.3.4" +description = "Retrying" +optional = false +python-versions = "*" +files = [ + {file = "retrying-1.3.4-py3-none-any.whl", hash = "sha256:8cc4d43cb8e1125e0ff3344e9de678fefd85db3b750b81b2240dc0183af37b35"}, + {file = "retrying-1.3.4.tar.gz", hash = "sha256:345da8c5765bd982b1d1915deb9102fd3d1f7ad16bd84a9700b85f64d24e8f3e"}, +] + +[package.dependencies] +six = ">=1.7.0" + [[package]] name = "rfc3339-validator" version = "0.1.4" @@ -4180,4 +4229,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "4463099e8d145fd823f523b134f18d48766038cc3d2ad466864e5a2debcc3479" +content-hash = "0f9e5183c4f451aba37e9061689a2c9bf36aedf0242bafebd6b066b24e274c4c" diff --git a/pyproject.toml b/pyproject.toml index 17b598804..ee088531e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,11 +24,12 @@ python = "^3.9" openshift-client = "1.0.18" rich = "^12.5" ray = {version = "2.35.0", extras = ["data", "default"]} -kubernetes = ">= 25.3.0, < 27" +kubernetes = ">= 27.2.0" cryptography = "40.0.2" executing = "1.2.0" pydantic = "< 2" ipywidgets = "8.1.2" +kubeflow-training = "1.8.1" [tool.poetry.group.docs] optional = true diff --git a/src/codeflare_sdk/__init__.py b/src/codeflare_sdk/__init__.py index 9ab5c7450..e34c0f94c 100644 --- a/src/codeflare_sdk/__init__.py +++ b/src/codeflare_sdk/__init__.py @@ -12,6 +12,8 @@ RayJobClient, ) +from .kubeflow import TrainingClient + from .common.widgets import view_clusters from .common import ( diff --git a/src/codeflare_sdk/kubeflow/__init__.py b/src/codeflare_sdk/kubeflow/__init__.py new file mode 100644 index 000000000..b1dd6c8ef --- /dev/null +++ b/src/codeflare_sdk/kubeflow/__init__.py @@ -0,0 +1 @@ +from .client import TrainingClient diff --git a/src/codeflare_sdk/kubeflow/client/__init__.py b/src/codeflare_sdk/kubeflow/client/__init__.py new file mode 100644 index 000000000..8e21a0e6c --- /dev/null +++ b/src/codeflare_sdk/kubeflow/client/__init__.py @@ -0,0 +1 @@ +from .training_client import TrainingClient diff --git a/src/codeflare_sdk/kubeflow/client/training_client.py b/src/codeflare_sdk/kubeflow/client/training_client.py new file mode 100644 index 000000000..43b3441ea --- /dev/null +++ b/src/codeflare_sdk/kubeflow/client/training_client.py @@ -0,0 +1,272 @@ +from kubeflow.training.api.training_client import TrainingClient as tc +from kubeflow.training.api_client import ApiClient +from kubeflow.training.constants import constants +from kubeflow.training import models + +from ...ray.cluster.cluster import get_current_namespace +from ...common.kubernetes_cluster.auth import config_check, get_api_client +from kubernetes import client +from typing import Optional, Union, Dict, List, Callable, Any, Set, Tuple + + +class TrainingClient: + def __init__( + self, + config_file: Optional[str] = None, + context: Optional[str] = None, + client_configuration: Optional[client.Configuration] = None, + namespace: str = get_current_namespace(), + job_kind: str = constants.PYTORCHJOB_KIND, + ): + config_check() + if ( + get_api_client() != None + ): # Can save the user from passing config themselves using get_api_client + client_configuration = get_api_client().configuration + + self.trainingClient = tc( + client_configuration=client_configuration, + config_file=config_file, + context=context, + namespace=namespace, + job_kind=job_kind, + ) + + def train( + self, + name: str, + namespace: Optional[str] = None, + num_workers: int = 1, + num_procs_per_worker: int = 1, + resources_per_worker: Union[dict, client.V1ResourceRequirements, None] = None, + model_provider_parameters=None, + dataset_provider_parameters=None, + trainer_parameters=None, + storage_config: Dict[str, Optional[Union[str, List[str]]]] = { + "size": constants.PVC_DEFAULT_SIZE, + "storage_class": None, + "access_modes": constants.PVC_DEFAULT_ACCESS_MODES, + }, + ): + self.trainingClient.train( + name, + namespace, + num_workers, + num_procs_per_worker, + resources_per_worker, + model_provider_parameters, + dataset_provider_parameters, + trainer_parameters, + storage_config, + ) + + def create_job( + self, + job: Optional[constants.JOB_MODELS_TYPE] = None, + name: Optional[str] = None, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + base_image: Optional[str] = None, + train_func: Optional[Callable] = None, + parameters: Optional[Dict[str, Any]] = None, + num_workers: Optional[int] = None, + resources_per_worker: Union[dict, models.V1ResourceRequirements, None] = None, + num_chief_replicas: Optional[int] = None, + num_ps_replicas: Optional[int] = None, + packages_to_install: Optional[List[str]] = None, + pip_index_url: str = constants.DEFAULT_PIP_INDEX_URL, + ): + self.trainingClient.create_job( + job, + name, + namespace, + job_kind, + base_image, + train_func, + parameters, + num_workers, + resources_per_worker, + num_chief_replicas, + num_ps_replicas, + packages_to_install, + pip_index_url, + ) + + def get_job( + self, + name: str, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ) -> constants.JOB_MODELS_TYPE: + return self.trainingClient.get_job(name, namespace, job_kind, timeout) + + def list_jobs( + self, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ) -> List[constants.JOB_MODELS_TYPE]: + return self.trainingClient.list_jobs(namespace, job_kind, timeout) + + def get_job_conditions( + self, + name: Optional[str] = None, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + job: Optional[constants.JOB_MODELS_TYPE] = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ) -> List[models.V1JobCondition]: + return self.trainingClient.get_job_conditions( + name, namespace, job_kind, job, timeout + ) + + def is_job_created( + self, + name: Optional[str] = None, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + job: Optional[constants.JOB_MODELS_TYPE] = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ) -> bool: + return self.trainingClient.is_job_created( + name, namespace, job_kind, job, timeout + ) + + def is_job_running( + self, + name: Optional[str] = None, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + job: Optional[constants.JOB_MODELS_TYPE] = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ) -> bool: + return self.trainingClient.is_job_running( + name, namespace, job_kind, job, timeout + ) + + def is_job_restarting( + self, + name: Optional[str] = None, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + job: Optional[constants.JOB_MODELS_TYPE] = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ) -> bool: + return self.trainingClient.is_job_restarting( + name, namespace, job_kind, job, timeout + ) + + def is_job_succeeded( + self, + name: Optional[str] = None, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + job: Optional[constants.JOB_MODELS_TYPE] = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ) -> bool: + return self.trainingClient.is_job_succeeded( + name, namespace, job_kind, job, timeout + ) + + def is_job_failed( + self, + name: Optional[str] = None, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + job: Optional[constants.JOB_MODELS_TYPE] = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ) -> bool: + return self.trainingClient.is_job_failed( + name, namespace, job_kind, job, timeout + ) + + def wait_for_job_conditions( + self, + name: str, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + expected_conditions: Set = {constants.JOB_CONDITION_SUCCEEDED}, + wait_timeout: int = 600, + polling_interval: int = 15, + callback: Optional[Callable] = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ) -> constants.JOB_MODELS_TYPE: + return self.trainingClient.wait_for_job_conditions( + name, + namespace, + job_kind, + expected_conditions, + wait_timeout, + polling_interval, + callback, + timeout, + ) + + def get_job_pods( + self, + name: str, + namespace: Optional[str] = None, + is_master: bool = False, + replica_type: Optional[str] = None, + replica_index: Optional[int] = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ) -> List[models.V1Pod]: + return self.trainingClient.get_job_pods( + name, namespace, is_master, replica_type, replica_index, timeout + ) + + def get_job_pod_names( + self, + name: str, + namespace: Optional[str] = None, + is_master: bool = False, + replica_type: Optional[str] = None, + replica_index: Optional[int] = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ) -> List[str]: + return self.trainingClient.get_job_pod_names( + name, namespace, is_master, replica_type, replica_index, timeout + ) + + def get_job_logs( + self, + name: str, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + is_master: bool = True, + replica_type: Optional[str] = None, + replica_index: Optional[int] = None, + follow: bool = False, + timeout: int = constants.DEFAULT_TIMEOUT, + verbose: bool = False, + ) -> Tuple[Dict[str, str], Dict[str, List[str]]]: + return self.trainingClient.get_job_logs( + name, + namespace, + job_kind, + is_master, + replica_type, + replica_index, + follow, + timeout, + verbose, + ) + + def update_job( + self, + job: constants.JOB_MODELS_TYPE, + name: str, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + ): + self.trainingClient.update_job(job, name, namespace, job_kind) + + def delete_job( + self, + name: str, + namespace: Optional[str] = None, + job_kind: Optional[str] = None, + delete_options: Optional[models.V1DeleteOptions] = None, + ): + self.trainingClient.delete_job(name, namespace, job_kind, delete_options)