From 097262a4d0b07af58a6ddc267805953dc7131bb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szadkowski?= Date: Fri, 17 Jan 2025 11:28:08 +0100 Subject: [PATCH] [Feature] Kuberay RayJob MultiKueue adapter (#3892) * Setup e2e tests to support Kuberay * Introduce Kuberay RayJobs MultiKueue Adapter * Add the RayJob Multikueue e2e tests --- Makefile-deps.mk | 20 ++- Makefile-test.mk | 4 +- charts/kueue/templates/rbac/role.yaml | 7 + config/components/rbac/role.yaml | 7 + hack/e2e-common.sh | 35 ++++ hack/multikueue-e2e-test.sh | 9 +- pkg/controller/jobframework/validation.go | 4 +- .../jobs/rayjob/rayjob_controller.go | 11 +- .../jobs/rayjob/rayjob_multikueue_adapter.go | 123 ++++++++++++++ .../rayjob/rayjob_multikueue_adapter_test.go | 158 ++++++++++++++++++ pkg/util/testingjobs/rayjob/wrappers.go | 92 +++++++++- .../create-multikueue-kubeconfig.sh | 32 ++++ test/e2e/multikueue/e2e_test.go | 89 ++++++++-- test/e2e/multikueue/suite_test.go | 8 +- .../controller/jobs/raycluster/suite_test.go | 2 +- .../controller/jobs/rayjob/suite_test.go | 2 +- test/integration/webhook/jobs/suite_test.go | 2 +- test/util/e2e.go | 9 + 18 files changed, 581 insertions(+), 33 deletions(-) create mode 100644 pkg/controller/jobs/rayjob/rayjob_multikueue_adapter.go create mode 100644 pkg/controller/jobs/rayjob/rayjob_multikueue_adapter_test.go diff --git a/Makefile-deps.mk b/Makefile-deps.mk index f1e3e76bbd..71de5acaa7 100644 --- a/Makefile-deps.mk +++ b/Makefile-deps.mk @@ -128,8 +128,22 @@ kf-training-operator-manifests: ## Copy whole manifests folder from the training RAY_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" github.com/ray-project/kuberay/ray-operator) .PHONY: ray-operator-crd ray-operator-crd: ## Copy the CRDs from the ray-operator to the dep-crds directory. - mkdir -p $(EXTERNAL_CRDS_DIR)/ray-operator/ - cp -f $(RAY_ROOT)/config/crd/bases/* $(EXTERNAL_CRDS_DIR)/ray-operator/ + mkdir -p $(EXTERNAL_CRDS_DIR)/ray-operator-crds/ + cp -f $(RAY_ROOT)/config/crd/bases/* $(EXTERNAL_CRDS_DIR)/ray-operator-crds/ + +.PHONY: ray-operator-manifests +ray-operator-manifests: ## Copy the whole manifests content from the ray-operator to the dep-crds directory. + ## Full version of the manifest is required for e2e multikueue tests. + if [ -d "$(EXTERNAL_CRDS_DIR)/ray-operator" ]; then \ + chmod -R u+w "$(EXTERNAL_CRDS_DIR)/ray-operator" && \ + rm -rf "$(EXTERNAL_CRDS_DIR)/ray-operator"; \ + fi + mkdir -p "$(EXTERNAL_CRDS_DIR)/ray-operator"; \ + cp -rf "$(RAY_ROOT)/config/crd" "$(EXTERNAL_CRDS_DIR)/ray-operator" + cp -rf "$(RAY_ROOT)/config/default" "$(EXTERNAL_CRDS_DIR)/ray-operator" + cp -rf "$(RAY_ROOT)/config/rbac" "$(EXTERNAL_CRDS_DIR)/ray-operator" + cp -rf "$(RAY_ROOT)/config/manager" "$(EXTERNAL_CRDS_DIR)/ray-operator" + JOBSET_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" sigs.k8s.io/jobset) .PHONY: jobset-operator-crd @@ -162,7 +176,7 @@ appwrapper-manifests: kustomize ## Copy whole manifests folder from the appwrapp cd "$(EXTERNAL_CRDS_DIR)/appwrapper/config/manager" && chmod u+w kustomization.yaml && $(KUSTOMIZE) edit set image controller=quay.io/ibm/appwrapper:${APPWRAPPER_VERSION} && chmod u-w kustomization.yaml .PHONY: dep-crds -dep-crds: mpi-operator-crd kf-training-operator-crd ray-operator-crd jobset-operator-crd cluster-autoscaler-crd appwrapper-crd appwrapper-manifests kf-training-operator-manifests ## Copy the CRDs from the external operators to the dep-crds directory. +dep-crds: mpi-operator-crd kf-training-operator-crd ray-operator-crd jobset-operator-crd cluster-autoscaler-crd appwrapper-crd appwrapper-manifests kf-training-operator-manifests ray-operator-manifests## Copy the CRDs from the external operators to the dep-crds directory. @echo "Copying CRDs from external operators to dep-crds directory" .PHONY: kueuectl-docs diff --git a/Makefile-test.mk b/Makefile-test.mk index 82aac5907b..fd956f8452 100644 --- a/Makefile-test.mk +++ b/Makefile-test.mk @@ -71,6 +71,7 @@ APPWRAPPER_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/proj JOBSET_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" sigs.k8s.io/jobset) KUBEFLOW_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/kubeflow/training-operator) KUBEFLOW_MPI_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/kubeflow/mpi-operator) +KUBERAY_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/ray-project/kuberay/ray-operator) ##@ Tests @@ -120,7 +121,8 @@ run-test-multikueue-e2e-%: FORCE @echo Running multikueue e2e for k8s ${K8S_VERSION} E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) \ ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" \ - JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) \ + JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) \ + KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) KUBERAY_VERSION=$(KUBERAY_VERSION) \ ./hack/multikueue-e2e-test.sh $(PROJECT_DIR)/bin/ginkgo-top -i $(ARTIFACTS)/$@/e2e.json > $(ARTIFACTS)/$@/e2e-top.yaml diff --git a/charts/kueue/templates/rbac/role.yaml b/charts/kueue/templates/rbac/role.yaml index 6d262be414..56cac5cef4 100644 --- a/charts/kueue/templates/rbac/role.yaml +++ b/charts/kueue/templates/rbac/role.yaml @@ -298,9 +298,16 @@ rules: - rayclusters/finalizers - rayclusters/status - rayjobs/finalizers + verbs: + - get + - update + - apiGroups: + - ray.io + resources: - rayjobs/status verbs: - get + - patch - update - apiGroups: - scheduling.k8s.io diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index a30483a581..d7a0941a9b 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -297,9 +297,16 @@ rules: - rayclusters/finalizers - rayclusters/status - rayjobs/finalizers + verbs: + - get + - update +- apiGroups: + - ray.io + resources: - rayjobs/status verbs: - get + - patch - update - apiGroups: - scheduling.k8s.io diff --git a/hack/e2e-common.sh b/hack/e2e-common.sh index 81ca22574f..9e28f34f62 100644 --- a/hack/e2e-common.sh +++ b/hack/e2e-common.sh @@ -45,6 +45,14 @@ if [[ -n ${KUBEFLOW_MPI_VERSION:-} ]]; then export KUBEFLOW_MPI_IMAGE=mpioperator/mpi-operator:${KUBEFLOW_MPI_VERSION/#v} fi +if [[ -n ${KUBERAY_VERSION:-} ]]; then + export KUBERAY_MANIFEST="${ROOT_DIR}/dep-crds/ray-operator/default/" + export KUBERAY_IMAGE=bitnami/kuberay-operator:${KUBERAY_VERSION/#v} + export KUBERAY_RAY_IMAGE=rayproject/ray:2.9.0 + export KUBERAY_RAY_IMAGE_ARM=rayproject/ray:2.9.0-aarch64 + export KUBERAY_CRDS=${ROOT_DIR}/dep-crds/ray-operator/crd/bases +fi + # sleep image to use for testing. export E2E_TEST_SLEEP_IMAGE_OLD=gcr.io/k8s-staging-perf-tests/sleep:v0.0.3@sha256:00ae8e01dd4439edfb7eb9f1960ac28eba16e952956320cce7f2ac08e3446e6b E2E_TEST_SLEEP_IMAGE_OLD_WITHOUT_SHA=${E2E_TEST_SLEEP_IMAGE_OLD%%@*} @@ -97,6 +105,17 @@ function prepare_docker_images { if [[ -n ${KUBEFLOW_MPI_VERSION:-} ]]; then docker pull "${KUBEFLOW_MPI_IMAGE}" fi + if [[ -n ${KUBERAY_VERSION:-} ]]; then + docker pull "${KUBERAY_IMAGE}" + + # Extra e2e images required for Kuberay + unamestr=$(uname) + if [[ "$unamestr" == 'Linux' ]]; then + docker pull "${KUBERAY_RAY_IMAGE}" + elif [[ "$unamestr" == 'Darwin' ]]; then + docker pull "${KUBERAY_RAY_IMAGE_ARM}" + fi + fi } # $1 cluster @@ -151,6 +170,22 @@ function install_mpi { kubectl apply --server-side -f "${KUBEFLOW_MPI_MANIFEST}" } +#$1 - cluster name +function install_kuberay { + # Extra e2e images required for Kuberay + unamestr=$(uname) + if [[ "$unamestr" == 'Linux' ]]; then + cluster_kind_load_image "${1}" "${KUBERAY_RAY_IMAGE}" + elif [[ "$unamestr" == 'Darwin' ]]; then + cluster_kind_load_image "${1}" "${KUBERAY_RAY_IMAGE_ARM}" + fi + + cluster_kind_load_image "${1}" "${KUBERAY_IMAGE}" + kubectl config use-context "kind-${1}" + # create used instead of apply - https://github.com/ray-project/kuberay/issues/504 + kubectl create -k "${KUBERAY_MANIFEST}" +} + INITIAL_IMAGE=$($YQ '.images[] | select(.name == "controller") | [.newName, .newTag] | join(":")' config/components/manager/kustomization.yaml) export INITIAL_IMAGE diff --git a/hack/multikueue-e2e-test.sh b/hack/multikueue-e2e-test.sh index 8dd3d87dd1..b626452549 100755 --- a/hack/multikueue-e2e-test.sh +++ b/hack/multikueue-e2e-test.sh @@ -100,10 +100,17 @@ function kind_load { install_kubeflow "$WORKER1_KIND_CLUSTER_NAME" install_kubeflow "$WORKER2_KIND_CLUSTER_NAME" - ## MPI + ## MPI install_mpi "$MANAGER_KIND_CLUSTER_NAME" install_mpi "$WORKER1_KIND_CLUSTER_NAME" install_mpi "$WORKER2_KIND_CLUSTER_NAME" + + ## KUBERAY + kubectl config use-context "kind-${MANAGER_KIND_CLUSTER_NAME}" + kubectl apply --server-side -f "${KUBERAY_CRDS}" + + install_kuberay "$WORKER1_KIND_CLUSTER_NAME" + install_kuberay "$WORKER2_KIND_CLUSTER_NAME" } function kueue_deploy { diff --git a/pkg/controller/jobframework/validation.go b/pkg/controller/jobframework/validation.go index 98be6809da..3e84a532ee 100644 --- a/pkg/controller/jobframework/validation.go +++ b/pkg/controller/jobframework/validation.go @@ -23,6 +23,7 @@ import ( kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" batchv1 "k8s.io/api/batch/v1" apivalidation "k8s.io/apimachinery/pkg/api/validation" "k8s.io/apimachinery/pkg/util/sets" @@ -47,7 +48,8 @@ var ( kftraining.SchemeGroupVersion.WithKind(kftraining.PaddleJobKind).String(), kftraining.SchemeGroupVersion.WithKind(kftraining.PyTorchJobKind).String(), kftraining.SchemeGroupVersion.WithKind(kftraining.XGBoostJobKind).String(), - kfmpi.SchemeGroupVersion.WithKind(kfmpi.Kind).String()) + kfmpi.SchemeGroupVersion.WithKind(kfmpi.Kind).String(), + rayv1.SchemeGroupVersion.WithKind("RayJob").String()) ) // ValidateJobOnCreate encapsulates all GenericJob validations that must be performed on a Create operation diff --git a/pkg/controller/jobs/rayjob/rayjob_controller.go b/pkg/controller/jobs/rayjob/rayjob_controller.go index 16488da894..d0ad44da2b 100644 --- a/pkg/controller/jobs/rayjob/rayjob_controller.go +++ b/pkg/controller/jobs/rayjob/rayjob_controller.go @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -55,12 +56,13 @@ func init() { JobType: &rayv1.RayJob{}, AddToScheme: rayv1.AddToScheme, IsManagingObjectsOwner: isRayJob, + MultiKueueAdapter: &multikueueAdapter{}, })) } // +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update // +kubebuilder:rbac:groups=ray.io,resources=rayjobs,verbs=get;list;watch;update;patch -// +kubebuilder:rbac:groups=ray.io,resources=rayjobs/status,verbs=get;update +// +kubebuilder:rbac:groups=ray.io,resources=rayjobs/status,verbs=get;update;patch // +kubebuilder:rbac:groups=ray.io,resources=rayjobs/finalizers,verbs=get;update // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch @@ -82,12 +84,17 @@ func (j *RayJob) Object() client.Object { return (*rayv1.RayJob)(j) } +func fromObject(obj runtime.Object) *RayJob { + return (*RayJob)(obj.(*rayv1.RayJob)) +} + func (j *RayJob) IsSuspended() bool { return j.Spec.Suspend } func (j *RayJob) IsActive() bool { - return j.Status.JobDeploymentStatus != rayv1.JobDeploymentStatusSuspended + // When the status is Suspended or New there should be no running Pods, and so the Job is not active. + return !(j.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusSuspended || j.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusNew) } func (j *RayJob) Suspend() { diff --git a/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter.go b/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter.go new file mode 100644 index 0000000000..79ae8e0598 --- /dev/null +++ b/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter.go @@ -0,0 +1,123 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rayjob + +import ( + "context" + "errors" + "fmt" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/util/api" + clientutil "sigs.k8s.io/kueue/pkg/util/client" +) + +type multikueueAdapter struct{} + +var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) + +func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { + log := ctrl.LoggerFrom(ctx) + + localJob := rayv1.RayJob{} + err := localClient.Get(ctx, key, &localJob) + if err != nil { + return err + } + + remoteJob := rayv1.RayJob{} + err = remoteClient.Get(ctx, key, &remoteJob) + if client.IgnoreNotFound(err) != nil { + return err + } + + // if the remote exists, just copy the status + if err == nil { + if fromObject(&localJob).IsSuspended() { + // Ensure the job is unsuspended before updating its status; otherwise, it will fail when patching the spec. + log.V(2).Info("Skipping the sync since the local job is still suspended") + return nil + } + return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) { + localJob.Status = remoteJob.Status + return true, nil + }) + } + + remoteJob = rayv1.RayJob{ + ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), + Spec: *localJob.Spec.DeepCopy(), + } + + // add the prebuilt workload + if remoteJob.Labels == nil { + remoteJob.Labels = make(map[string]string, 2) + } + remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName + remoteJob.Labels[kueue.MultiKueueOriginLabel] = origin + + return remoteClient.Create(ctx, &remoteJob) +} + +func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { + job := rayv1.RayJob{} + job.SetName(key.Name) + job.SetNamespace(key.Namespace) + return client.IgnoreNotFound(remoteClient.Delete(ctx, &job)) +} + +func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { + return false +} + +func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) { + return true, "", nil +} + +func (b *multikueueAdapter) GVK() schema.GroupVersionKind { + return gvk +} + +var _ jobframework.MultiKueueWatcher = (*multikueueAdapter)(nil) + +func (*multikueueAdapter) GetEmptyList() client.ObjectList { + return &rayv1.RayJobList{} +} + +func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { + job, isJob := o.(*rayv1.RayJob) + if !isJob { + return types.NamespacedName{}, errors.New("not a rayjob") + } + + prebuiltWl, hasPrebuiltWorkload := job.Labels[constants.PrebuiltWorkloadLabel] + if !hasPrebuiltWorkload { + return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for rayjob: %s", klog.KObj(job)) + } + + return types.NamespacedName{Name: prebuiltWl, Namespace: job.Namespace}, nil +} diff --git a/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter_test.go b/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter_test.go new file mode 100644 index 0000000000..f173edda23 --- /dev/null +++ b/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter_test.go @@ -0,0 +1,158 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rayjob + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/util/slices" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + utiltestingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob" +) + +const ( + TestNamespace = "ns" +) + +func TestMultikueueAdapter(t *testing.T) { + objCheckOpts := []cmp.Option{ + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + cmpopts.EquateEmpty(), + } + + rayJobBuilder := utiltestingrayjob.MakeJob("rayjob1", TestNamespace).Suspend(false) + + cases := map[string]struct { + managersRayJobs []rayv1.RayJob + workerRayJobs []rayv1.RayJob + + operation func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error + + wantError error + wantManagersRayJobs []rayv1.RayJob + wantWorkerRayJobs []rayv1.RayJob + }{ + "sync creates missing remote rayjob": { + managersRayJobs: []rayv1.RayJob{ + *rayJobBuilder.DeepCopy(), + }, + operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "rayjob1", Namespace: TestNamespace}, "wl1", "origin1") + }, + + wantManagersRayJobs: []rayv1.RayJob{ + *rayJobBuilder.DeepCopy(), + }, + wantWorkerRayJobs: []rayv1.RayJob{ + *rayJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueue.MultiKueueOriginLabel, "origin1"). + Obj(), + }, + }, + "sync status from remote rayjob": { + managersRayJobs: []rayv1.RayJob{ + *rayJobBuilder.DeepCopy(), + }, + workerRayJobs: []rayv1.RayJob{ + *rayJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueue.MultiKueueOriginLabel, "origin1"). + JobDeploymentStatus(rayv1.JobDeploymentStatusComplete). + Obj(), + }, + operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "rayjob1", Namespace: TestNamespace}, "wl1", "origin1") + }, + + wantManagersRayJobs: []rayv1.RayJob{ + *rayJobBuilder.Clone(). + JobDeploymentStatus(rayv1.JobDeploymentStatusComplete). + Obj(), + }, + wantWorkerRayJobs: []rayv1.RayJob{ + *rayJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueue.MultiKueueOriginLabel, "origin1"). + JobDeploymentStatus(rayv1.JobDeploymentStatusComplete). + Obj(), + }, + }, + "remote rayjob is deleted": { + workerRayJobs: []rayv1.RayJob{ + *rayJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueue.MultiKueueOriginLabel, "origin1"). + Obj(), + }, + operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + return adapter.DeleteRemoteObject(ctx, workerClient, types.NamespacedName{Name: "rayjob1", Namespace: TestNamespace}) + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + managerBuilder := utiltesting.NewClientBuilder(rayv1.AddToScheme).WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) + managerBuilder = managerBuilder.WithLists(&rayv1.RayJobList{Items: tc.managersRayJobs}) + managerBuilder = managerBuilder.WithStatusSubresource(slices.Map(tc.managersRayJobs, func(w *rayv1.RayJob) client.Object { return w })...) + managerClient := managerBuilder.Build() + + workerBuilder := utiltesting.NewClientBuilder(rayv1.AddToScheme).WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) + workerBuilder = workerBuilder.WithLists(&rayv1.RayJobList{Items: tc.workerRayJobs}) + workerClient := workerBuilder.Build() + + ctx, _ := utiltesting.ContextWithLog(t) + + adapter := &multikueueAdapter{} + + gotErr := tc.operation(ctx, adapter, managerClient, workerClient) + + if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("unexpected error (-want/+got):\n%s", diff) + } + + gotManagersRayJobs := &rayv1.RayJobList{} + if err := managerClient.List(ctx, gotManagersRayJobs); err != nil { + t.Errorf("unexpected list manager's rayjobs error %s", err) + } else { + if diff := cmp.Diff(tc.wantManagersRayJobs, gotManagersRayJobs.Items, objCheckOpts...); diff != "" { + t.Errorf("unexpected manager's rayjobs (-want/+got):\n%s", diff) + } + } + + gotWorkerRayJobs := &rayv1.RayJobList{} + if err := workerClient.List(ctx, gotWorkerRayJobs); err != nil { + t.Errorf("unexpected list worker's rayjobs error %s", err) + } else { + if diff := cmp.Diff(tc.wantWorkerRayJobs, gotWorkerRayJobs.Items, objCheckOpts...); diff != "" { + t.Errorf("unexpected worker's rayjobs (-want/+got):\n%s", diff) + } + } + }) + } +} diff --git a/pkg/util/testingjobs/rayjob/wrappers.go b/pkg/util/testingjobs/rayjob/wrappers.go index 4718f7cfae..1790078c3c 100644 --- a/pkg/util/testingjobs/rayjob/wrappers.go +++ b/pkg/util/testingjobs/rayjob/wrappers.go @@ -40,15 +40,23 @@ func MakeJob(name, ns string) *JobWrapper { Spec: rayv1.RayJobSpec{ ShutdownAfterJobFinishes: true, RayClusterSpec: &rayv1.RayClusterSpec{ + RayVersion: "2.9.0", HeadGroupSpec: rayv1.HeadGroupSpec{ - RayStartParams: map[string]string{"p1": "v1"}, + RayStartParams: map[string]string{}, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ + RestartPolicy: "Never", Containers: []corev1.Container{ { - Name: "head-container", + Name: "head-container", + Command: []string{}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{}, + Limits: corev1.ResourceList{}, + }, }, }, + NodeSelector: map[string]string{}, }, }, }, @@ -58,14 +66,21 @@ func MakeJob(name, ns string) *JobWrapper { Replicas: ptr.To[int32](1), MinReplicas: ptr.To[int32](0), MaxReplicas: ptr.To[int32](10), - RayStartParams: map[string]string{"p1": "v1"}, + RayStartParams: map[string]string{}, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ + RestartPolicy: "Never", Containers: []corev1.Container{ { - Name: "worker-container", + Name: "worker-container", + Command: []string{}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{}, + Limits: corev1.ResourceList{}, + }, }, }, + NodeSelector: map[string]string{}, }, }, }, @@ -180,3 +195,72 @@ func (j *JobWrapper) Generation(num int64) *JobWrapper { func (j *JobWrapper) Clone() *JobWrapper { return &JobWrapper{*j.DeepCopy()} } + +// Label sets the label key and value +func (j *JobWrapper) Label(key, value string) *JobWrapper { + if j.Labels == nil { + j.Labels = make(map[string]string) + } + j.Labels[key] = value + return j +} + +// JobDeploymentStatus sets a deployment status of the job +func (j *JobWrapper) JobDeploymentStatus(ds rayv1.JobDeploymentStatus) *JobWrapper { + j.Status.JobDeploymentStatus = ds + return j +} + +// JobStatus sets a status of the job +func (j *JobWrapper) JobStatus(s rayv1.JobStatus) *JobWrapper { + j.Status.JobStatus = s + return j +} + +// Request adds a resource request to the default container. +func (j *JobWrapper) Request(rayType rayv1.RayNodeType, r corev1.ResourceName, v string) *JobWrapper { + if rayType == rayv1.HeadNode { + j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v) + } else if rayType == rayv1.WorkerNode { + j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v) + } + return j +} + +func (j *JobWrapper) Image(rayType rayv1.RayNodeType, image string, args []string) *JobWrapper { + if rayType == rayv1.HeadNode { + j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Image = image + j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Args = args + j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullIfNotPresent + } else if rayType == rayv1.WorkerNode { + j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Image = image + j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Args = args + j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].ImagePullPolicy = corev1.PullIfNotPresent + } + return j +} + +func (j *JobWrapper) Entrypoint(e string) *JobWrapper { + j.Spec.Entrypoint = e + return j +} + +func (j *JobWrapper) RayVersion(rv string) *JobWrapper { + j.Spec.RayClusterSpec.RayVersion = rv + return j +} + +func (j *JobWrapper) Env(rayType rayv1.RayNodeType, name, value string) *JobWrapper { + if rayType == rayv1.HeadNode { + if j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env == nil { + j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env = make([]corev1.EnvVar, 0) + } + j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env = append(j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: name, Value: value}) + } else if rayType == rayv1.WorkerNode { + if j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env == nil { + j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env = make([]corev1.EnvVar, 0) + } + j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env = append(j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env, corev1.EnvVar{Name: name, Value: value}) + } + return j +} diff --git a/site/static/examples/multikueue/create-multikueue-kubeconfig.sh b/site/static/examples/multikueue/create-multikueue-kubeconfig.sh index 4ad71e36df..0790025642 100644 --- a/site/static/examples/multikueue/create-multikueue-kubeconfig.sh +++ b/site/static/examples/multikueue/create-multikueue-kubeconfig.sh @@ -165,6 +165,38 @@ rules: - mpijobs/status verbs: - get +- apiGroups: + - ray.io + resources: + - rayjobs + verbs: + - create + - delete + - get + - list + - watch +- apiGroups: + - ray.io + resources: + - rayjobs/status + verbs: + - get +- apiGroups: + - ray.io + resources: + - rayclusters + verbs: + - create + - delete + - get + - list + - watch +- apiGroups: + - ray.io + resources: + - rayclusters/status + verbs: + - get --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index 817ed7e65a..20bc1874c1 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -18,13 +18,16 @@ package mke2e import ( "fmt" + "os" "os/exec" + "runtime" "github.com/google/go-cmp/cmp/cmpopts" kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -40,11 +43,13 @@ import ( workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" workloadpytorchjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/pytorchjob" workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob" + workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob" testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob" + testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob" "sigs.k8s.io/kueue/pkg/workload" "sigs.k8s.io/kueue/test/util" ) @@ -491,6 +496,57 @@ var _ = ginkgo.Describe("MultiKueue", func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) }) + + ginkgo.It("Should run a RayJob on worker if admitted", func() { + var found bool + E2eKuberayTestImage, found := os.LookupEnv("KUBERAY_RAY_IMAGE") + gomega.Expect(found).To(gomega.BeTrue()) + if runtime.GOOS == "darwin" { + E2eKuberayTestImage, found = os.LookupEnv("KUBERAY_RAY_IMAGE_ARM") + gomega.Expect(found).To(gomega.BeTrue()) + } + + // Since it requires 1.5 CPU, this job can only be admitted in worker 1. + rayjob := testingrayjob.MakeJob("rayjob1", managerNs.Name). + Suspend(true). + Queue(managerLq.Name). + WithSubmissionMode(rayv1.K8sJobMode). + Request(rayv1.HeadNode, corev1.ResourceCPU, "1"). + Request(rayv1.WorkerNode, corev1.ResourceCPU, "0.5"). + Entrypoint("python -c \"import ray; ray.init(); print(ray.cluster_resources())\""). + Image(rayv1.HeadNode, E2eKuberayTestImage, []string{}). + Image(rayv1.WorkerNode, E2eKuberayTestImage, []string{}). + Obj() + + ginkgo.By("Creating the RayJob", func() { + gomega.Expect(k8sManagerClient.Create(ctx, rayjob)).Should(gomega.Succeed()) + }) + + wlLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(rayjob.Name, rayjob.UID), Namespace: managerNs.Name} + // the execution should be given to the worker1 + waitForJobAdmitted(wlLookupKey, multiKueueAc.Name, "worker1") + + ginkgo.By("Waiting for the RayJob to finish", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdRayJob := &rayv1.RayJob{} + g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(rayjob), createdRayJob)).To(gomega.Succeed()) + g.Expect(createdRayJob.Status.JobDeploymentStatus).To(gomega.Equal(rayv1.JobDeploymentStatusComplete)) + finishReasonMessage := "Job finished successfully." + checkFinishStatusCondition(g, wlLookupKey, finishReasonMessage) + }, 5*util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Checking no objects are left in the worker clusters and the RayJob is completed", func() { + gomega.Eventually(func(g gomega.Gomega) { + workerWl := &kueue.Workload{} + g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) + g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) + workerRayJob := &rayv1.RayJob{} + g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(rayjob), workerRayJob)).To(utiltesting.BeNotFoundError()) + g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(rayjob), workerRayJob)).To(utiltesting.BeNotFoundError()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) }) ginkgo.When("The connection to a worker cluster is unreliable", func() { ginkgo.It("Should update the cluster status to reflect the connection state", func() { @@ -571,23 +627,22 @@ var _ = ginkgo.Describe("MultiKueue", func() { }) func waitForJobAdmitted(wlLookupKey types.NamespacedName, acName, workerName string) { - ginkgo.By(fmt.Sprintf("Waiting to be admitted in %s and manager", workerName), func() { - gomega.Eventually(func(g gomega.Gomega) { - createdWorkload := &kueue.Workload{} - g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ - Type: kueue.WorkloadAdmitted, - Status: metav1.ConditionTrue, - Reason: "Admitted", - Message: "The workload is admitted", - }, util.IgnoreConditionTimestampsAndObservedGeneration)) - g.Expect(workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, acName)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ - Name: acName, - State: kueue.CheckStateReady, - Message: fmt.Sprintf(`The workload got reservation on "%s"`, workerName), - }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - }) + ginkgo.By(fmt.Sprintf("Waiting to be admitted in %s and manager", workerName)) + gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Admitted", + Message: "The workload is admitted", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + g.Expect(workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, acName)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ + Name: acName, + State: kueue.CheckStateReady, + Message: fmt.Sprintf(`The workload got reservation on "%s"`, workerName), + }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) } func checkFinishStatusCondition(g gomega.Gomega, wlLookupKey types.NamespacedName, finishReasonMessage string) { diff --git a/test/e2e/multikueue/suite_test.go b/test/e2e/multikueue/suite_test.go index a6a2d4ac9b..ec6ae2f776 100644 --- a/test/e2e/multikueue/suite_test.go +++ b/test/e2e/multikueue/suite_test.go @@ -27,6 +27,7 @@ import ( kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" authenticationv1 "k8s.io/api/authentication/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -98,6 +99,8 @@ func kubeconfigForMultiKueueSA(ctx context.Context, c client.Client, restConfig policyRule(kftraining.SchemeGroupVersion.Group, "xgboostjobs/status", "get"), policyRule(kfmpi.SchemeGroupVersion.Group, "mpijobs", resourceVerbs...), policyRule(kfmpi.SchemeGroupVersion.Group, "mpijobs/status", "get"), + policyRule(rayv1.SchemeGroupVersion.Group, "rayjobs", resourceVerbs...), + policyRule(rayv1.SchemeGroupVersion.Group, "rayjobs/status", "get"), }, } err := c.Create(ctx, cr) @@ -273,7 +276,10 @@ var _ = ginkgo.BeforeSuite(func() { util.WaitForKubeFlowMPIOperatorAvailability(ctx, k8sWorker1Client) util.WaitForKubeFlowMPIOperatorAvailability(ctx, k8sWorker2Client) - ginkgo.GinkgoLogr.Info("Kueue and JobSet operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart)) + util.WaitForKubeRayOperatorAvailability(ctx, k8sWorker1Client) + util.WaitForKubeRayOperatorAvailability(ctx, k8sWorker2Client) + + ginkgo.GinkgoLogr.Info("Kueue and all integration operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart)) discoveryClient, err := discovery.NewDiscoveryClientForConfig(managerCfg) gomega.Expect(err).NotTo(gomega.HaveOccurred()) diff --git a/test/integration/controller/jobs/raycluster/suite_test.go b/test/integration/controller/jobs/raycluster/suite_test.go index 6dd6ab1f7e..8c9d7a0918 100644 --- a/test/integration/controller/jobs/raycluster/suite_test.go +++ b/test/integration/controller/jobs/raycluster/suite_test.go @@ -46,7 +46,7 @@ var ( ctx context.Context fwk *framework.Framework crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases") - rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator") + rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator-crds") ) func TestAPIs(t *testing.T) { diff --git a/test/integration/controller/jobs/rayjob/suite_test.go b/test/integration/controller/jobs/rayjob/suite_test.go index c59ac16141..773965b347 100644 --- a/test/integration/controller/jobs/rayjob/suite_test.go +++ b/test/integration/controller/jobs/rayjob/suite_test.go @@ -45,7 +45,7 @@ var ( ctx context.Context fwk *framework.Framework crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases") - rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator") + rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator-crds") ) func TestAPIs(t *testing.T) { diff --git a/test/integration/webhook/jobs/suite_test.go b/test/integration/webhook/jobs/suite_test.go index 8260b74a96..dda6f432d6 100644 --- a/test/integration/webhook/jobs/suite_test.go +++ b/test/integration/webhook/jobs/suite_test.go @@ -46,7 +46,7 @@ var ( webhookPath = filepath.Join("..", "..", "..", "..", "config", "components", "webhook") mpiCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "mpi-operator") jobsetCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "jobset-operator") - rayCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "ray-operator") + rayCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "ray-operator-crds") kubeflowCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "training-operator-crds") ) diff --git a/test/util/e2e.go b/test/util/e2e.go index a39b469ccd..34f003897b 100644 --- a/test/util/e2e.go +++ b/test/util/e2e.go @@ -10,6 +10,7 @@ import ( kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/gomega" awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -64,6 +65,9 @@ func CreateClientUsingCluster(kContext string) (client.WithWatch, *rest.Config) err = awv1beta2.AddToScheme(scheme.Scheme) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) + err = rayv1.AddToScheme(scheme.Scheme) + gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) + client, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme.Scheme}) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) return client, cfg @@ -139,3 +143,8 @@ func WaitForKubeFlowMPIOperatorAvailability(ctx context.Context, k8sClient clien kftoKey := types.NamespacedName{Namespace: "mpi-operator", Name: "mpi-operator"} waitForOperatorAvailability(ctx, k8sClient, kftoKey) } + +func WaitForKubeRayOperatorAvailability(ctx context.Context, k8sClient client.Client) { + kroKey := types.NamespacedName{Namespace: "ray-system", Name: "kuberay-operator"} + waitForOperatorAvailability(ctx, k8sClient, kroKey) +}