Skip to content

Commit

Permalink
[Feature] Kuberay RayJob MultiKueue adapter (#3892)
Browse files Browse the repository at this point in the history
* Setup e2e tests to support Kuberay

* Introduce Kuberay RayJobs MultiKueue Adapter

* Add the RayJob Multikueue e2e tests
  • Loading branch information
mszadkow authored Jan 17, 2025
1 parent a1c4c4b commit 097262a
Show file tree
Hide file tree
Showing 18 changed files with 581 additions and 33 deletions.
20 changes: 17 additions & 3 deletions Makefile-deps.mk
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,22 @@ kf-training-operator-manifests: ## Copy whole manifests folder from the training
RAY_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" github.com/ray-project/kuberay/ray-operator)
.PHONY: ray-operator-crd
ray-operator-crd: ## Copy the CRDs from the ray-operator to the dep-crds directory.
mkdir -p $(EXTERNAL_CRDS_DIR)/ray-operator/
cp -f $(RAY_ROOT)/config/crd/bases/* $(EXTERNAL_CRDS_DIR)/ray-operator/
mkdir -p $(EXTERNAL_CRDS_DIR)/ray-operator-crds/
cp -f $(RAY_ROOT)/config/crd/bases/* $(EXTERNAL_CRDS_DIR)/ray-operator-crds/

.PHONY: ray-operator-manifests
ray-operator-manifests: ## Copy the whole manifests content from the ray-operator to the dep-crds directory.
## Full version of the manifest is required for e2e multikueue tests.
if [ -d "$(EXTERNAL_CRDS_DIR)/ray-operator" ]; then \
chmod -R u+w "$(EXTERNAL_CRDS_DIR)/ray-operator" && \
rm -rf "$(EXTERNAL_CRDS_DIR)/ray-operator"; \
fi
mkdir -p "$(EXTERNAL_CRDS_DIR)/ray-operator"; \
cp -rf "$(RAY_ROOT)/config/crd" "$(EXTERNAL_CRDS_DIR)/ray-operator"
cp -rf "$(RAY_ROOT)/config/default" "$(EXTERNAL_CRDS_DIR)/ray-operator"
cp -rf "$(RAY_ROOT)/config/rbac" "$(EXTERNAL_CRDS_DIR)/ray-operator"
cp -rf "$(RAY_ROOT)/config/manager" "$(EXTERNAL_CRDS_DIR)/ray-operator"


JOBSET_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" sigs.k8s.io/jobset)
.PHONY: jobset-operator-crd
Expand Down Expand Up @@ -162,7 +176,7 @@ appwrapper-manifests: kustomize ## Copy whole manifests folder from the appwrapp
cd "$(EXTERNAL_CRDS_DIR)/appwrapper/config/manager" && chmod u+w kustomization.yaml && $(KUSTOMIZE) edit set image controller=quay.io/ibm/appwrapper:${APPWRAPPER_VERSION} && chmod u-w kustomization.yaml

.PHONY: dep-crds
dep-crds: mpi-operator-crd kf-training-operator-crd ray-operator-crd jobset-operator-crd cluster-autoscaler-crd appwrapper-crd appwrapper-manifests kf-training-operator-manifests ## Copy the CRDs from the external operators to the dep-crds directory.
dep-crds: mpi-operator-crd kf-training-operator-crd ray-operator-crd jobset-operator-crd cluster-autoscaler-crd appwrapper-crd appwrapper-manifests kf-training-operator-manifests ray-operator-manifests## Copy the CRDs from the external operators to the dep-crds directory.
@echo "Copying CRDs from external operators to dep-crds directory"

.PHONY: kueuectl-docs
Expand Down
4 changes: 3 additions & 1 deletion Makefile-test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ APPWRAPPER_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/proj
JOBSET_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" sigs.k8s.io/jobset)
KUBEFLOW_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/kubeflow/training-operator)
KUBEFLOW_MPI_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/kubeflow/mpi-operator)
KUBERAY_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/ray-project/kuberay/ray-operator)

##@ Tests

Expand Down Expand Up @@ -120,7 +121,8 @@ run-test-multikueue-e2e-%: FORCE
@echo Running multikueue e2e for k8s ${K8S_VERSION}
E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) \
ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" \
JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) \
JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) \
KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) KUBERAY_VERSION=$(KUBERAY_VERSION) \
./hack/multikueue-e2e-test.sh
$(PROJECT_DIR)/bin/ginkgo-top -i $(ARTIFACTS)/$@/e2e.json > $(ARTIFACTS)/$@/e2e-top.yaml

Expand Down
7 changes: 7 additions & 0 deletions charts/kueue/templates/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,16 @@ rules:
- rayclusters/finalizers
- rayclusters/status
- rayjobs/finalizers
verbs:
- get
- update
- apiGroups:
- ray.io
resources:
- rayjobs/status
verbs:
- get
- patch
- update
- apiGroups:
- scheduling.k8s.io
Expand Down
7 changes: 7 additions & 0 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,16 @@ rules:
- rayclusters/finalizers
- rayclusters/status
- rayjobs/finalizers
verbs:
- get
- update
- apiGroups:
- ray.io
resources:
- rayjobs/status
verbs:
- get
- patch
- update
- apiGroups:
- scheduling.k8s.io
Expand Down
35 changes: 35 additions & 0 deletions hack/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ if [[ -n ${KUBEFLOW_MPI_VERSION:-} ]]; then
export KUBEFLOW_MPI_IMAGE=mpioperator/mpi-operator:${KUBEFLOW_MPI_VERSION/#v}
fi

if [[ -n ${KUBERAY_VERSION:-} ]]; then
export KUBERAY_MANIFEST="${ROOT_DIR}/dep-crds/ray-operator/default/"
export KUBERAY_IMAGE=bitnami/kuberay-operator:${KUBERAY_VERSION/#v}
export KUBERAY_RAY_IMAGE=rayproject/ray:2.9.0
export KUBERAY_RAY_IMAGE_ARM=rayproject/ray:2.9.0-aarch64
export KUBERAY_CRDS=${ROOT_DIR}/dep-crds/ray-operator/crd/bases
fi

# sleep image to use for testing.
export E2E_TEST_SLEEP_IMAGE_OLD=gcr.io/k8s-staging-perf-tests/sleep:v0.0.3@sha256:00ae8e01dd4439edfb7eb9f1960ac28eba16e952956320cce7f2ac08e3446e6b
E2E_TEST_SLEEP_IMAGE_OLD_WITHOUT_SHA=${E2E_TEST_SLEEP_IMAGE_OLD%%@*}
Expand Down Expand Up @@ -97,6 +105,17 @@ function prepare_docker_images {
if [[ -n ${KUBEFLOW_MPI_VERSION:-} ]]; then
docker pull "${KUBEFLOW_MPI_IMAGE}"
fi
if [[ -n ${KUBERAY_VERSION:-} ]]; then
docker pull "${KUBERAY_IMAGE}"

# Extra e2e images required for Kuberay
unamestr=$(uname)
if [[ "$unamestr" == 'Linux' ]]; then
docker pull "${KUBERAY_RAY_IMAGE}"
elif [[ "$unamestr" == 'Darwin' ]]; then
docker pull "${KUBERAY_RAY_IMAGE_ARM}"
fi
fi
}

# $1 cluster
Expand Down Expand Up @@ -151,6 +170,22 @@ function install_mpi {
kubectl apply --server-side -f "${KUBEFLOW_MPI_MANIFEST}"
}

#$1 - cluster name
function install_kuberay {
# Extra e2e images required for Kuberay
unamestr=$(uname)
if [[ "$unamestr" == 'Linux' ]]; then
cluster_kind_load_image "${1}" "${KUBERAY_RAY_IMAGE}"
elif [[ "$unamestr" == 'Darwin' ]]; then
cluster_kind_load_image "${1}" "${KUBERAY_RAY_IMAGE_ARM}"
fi

cluster_kind_load_image "${1}" "${KUBERAY_IMAGE}"
kubectl config use-context "kind-${1}"
# create used instead of apply - https://github.com/ray-project/kuberay/issues/504
kubectl create -k "${KUBERAY_MANIFEST}"
}

INITIAL_IMAGE=$($YQ '.images[] | select(.name == "controller") | [.newName, .newTag] | join(":")' config/components/manager/kustomization.yaml)
export INITIAL_IMAGE

Expand Down
9 changes: 8 additions & 1 deletion hack/multikueue-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,17 @@ function kind_load {
install_kubeflow "$WORKER1_KIND_CLUSTER_NAME"
install_kubeflow "$WORKER2_KIND_CLUSTER_NAME"

## MPI
## MPI
install_mpi "$MANAGER_KIND_CLUSTER_NAME"
install_mpi "$WORKER1_KIND_CLUSTER_NAME"
install_mpi "$WORKER2_KIND_CLUSTER_NAME"

## KUBERAY
kubectl config use-context "kind-${MANAGER_KIND_CLUSTER_NAME}"
kubectl apply --server-side -f "${KUBERAY_CRDS}"

install_kuberay "$WORKER1_KIND_CLUSTER_NAME"
install_kuberay "$WORKER2_KIND_CLUSTER_NAME"
}

function kueue_deploy {
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/jobframework/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
batchv1 "k8s.io/api/batch/v1"
apivalidation "k8s.io/apimachinery/pkg/api/validation"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -47,7 +48,8 @@ var (
kftraining.SchemeGroupVersion.WithKind(kftraining.PaddleJobKind).String(),
kftraining.SchemeGroupVersion.WithKind(kftraining.PyTorchJobKind).String(),
kftraining.SchemeGroupVersion.WithKind(kftraining.XGBoostJobKind).String(),
kfmpi.SchemeGroupVersion.WithKind(kfmpi.Kind).String())
kfmpi.SchemeGroupVersion.WithKind(kfmpi.Kind).String(),
rayv1.SchemeGroupVersion.WithKind("RayJob").String())
)

// ValidateJobOnCreate encapsulates all GenericJob validations that must be performed on a Create operation
Expand Down
11 changes: 9 additions & 2 deletions pkg/controller/jobs/rayjob/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -55,12 +56,13 @@ func init() {
JobType: &rayv1.RayJob{},
AddToScheme: rayv1.AddToScheme,
IsManagingObjectsOwner: isRayJob,
MultiKueueAdapter: &multikueueAdapter{},
}))
}

// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update
// +kubebuilder:rbac:groups=ray.io,resources=rayjobs,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=ray.io,resources=rayjobs/status,verbs=get;update
// +kubebuilder:rbac:groups=ray.io,resources=rayjobs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=ray.io,resources=rayjobs/finalizers,verbs=get;update
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch
Expand All @@ -82,12 +84,17 @@ func (j *RayJob) Object() client.Object {
return (*rayv1.RayJob)(j)
}

func fromObject(obj runtime.Object) *RayJob {
return (*RayJob)(obj.(*rayv1.RayJob))
}

func (j *RayJob) IsSuspended() bool {
return j.Spec.Suspend
}

func (j *RayJob) IsActive() bool {
return j.Status.JobDeploymentStatus != rayv1.JobDeploymentStatusSuspended
// When the status is Suspended or New there should be no running Pods, and so the Job is not active.
return !(j.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusSuspended || j.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusNew)
}

func (j *RayJob) Suspend() {
Expand Down
123 changes: 123 additions & 0 deletions pkg/controller/jobs/rayjob/rayjob_multikueue_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rayjob

import (
"context"
"errors"
"fmt"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/util/api"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)

type multikueueAdapter struct{}

var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil)

func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error {
log := ctrl.LoggerFrom(ctx)

localJob := rayv1.RayJob{}
err := localClient.Get(ctx, key, &localJob)
if err != nil {
return err
}

remoteJob := rayv1.RayJob{}
err = remoteClient.Get(ctx, key, &remoteJob)
if client.IgnoreNotFound(err) != nil {
return err
}

// if the remote exists, just copy the status
if err == nil {
if fromObject(&localJob).IsSuspended() {
// Ensure the job is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local job is still suspended")
return nil
}
return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) {
localJob.Status = remoteJob.Status
return true, nil
})
}

remoteJob = rayv1.RayJob{
ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta),
Spec: *localJob.Spec.DeepCopy(),
}

// add the prebuilt workload
if remoteJob.Labels == nil {
remoteJob.Labels = make(map[string]string, 2)
}
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueue.MultiKueueOriginLabel] = origin

return remoteClient.Create(ctx, &remoteJob)
}

func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error {
job := rayv1.RayJob{}
job.SetName(key.Name)
job.SetNamespace(key.Namespace)
return client.IgnoreNotFound(remoteClient.Delete(ctx, &job))
}

func (b *multikueueAdapter) KeepAdmissionCheckPending() bool {
return false
}

func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) {
return true, "", nil
}

func (b *multikueueAdapter) GVK() schema.GroupVersionKind {
return gvk
}

var _ jobframework.MultiKueueWatcher = (*multikueueAdapter)(nil)

func (*multikueueAdapter) GetEmptyList() client.ObjectList {
return &rayv1.RayJobList{}
}

func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) {
job, isJob := o.(*rayv1.RayJob)
if !isJob {
return types.NamespacedName{}, errors.New("not a rayjob")
}

prebuiltWl, hasPrebuiltWorkload := job.Labels[constants.PrebuiltWorkloadLabel]
if !hasPrebuiltWorkload {
return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for rayjob: %s", klog.KObj(job))
}

return types.NamespacedName{Name: prebuiltWl, Namespace: job.Namespace}, nil
}
Loading

0 comments on commit 097262a

Please sign in to comment.