Skip to content

Commit

Permalink
Update after code review
Browse files Browse the repository at this point in the history
  • Loading branch information
mszadkow committed Sep 25, 2024
1 parent 0e35914 commit 6c3e7f2
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 21 deletions.
6 changes: 3 additions & 3 deletions deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7759,12 +7759,12 @@ spec:
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
ManagedBy is used to indicate the controller or entity that manages a MPIJob.
The value must be either an empty, 'kubeflow.org/mpi-operator' or
'kueue.x-k8s.io/multikueue'.
The mpi-operator reconciles a job which doesn't have this
The mpi-operator reconciles a MPIJob which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/mpi-operator', but delegates reconciling the job
'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob
with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable.
type: string
Expand Down
6 changes: 3 additions & 3 deletions manifests/base/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7736,12 +7736,12 @@ spec:
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
ManagedBy is used to indicate the controller or entity that manages a MPIJob.
The value must be either an empty, 'kubeflow.org/mpi-operator' or
'kueue.x-k8s.io/multikueue'.
The mpi-operator reconciles a job which doesn't have this
The mpi-operator reconciles a MPIJob which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/mpi-operator', but delegates reconciling the job
'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob
with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable.
type: string
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@
"type": "string"
},
"managedBy": {
"description": "ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.",
"description": "ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.",
"type": "string"
},
"schedulingPolicy": {
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ type RunPolicy struct {
// +kubebuilder:default:=false
Suspend *bool `json:"suspend,omitempty"`

// ManagedBy is used to indicate the controller or entity that manages a job.
// ManagedBy is used to indicate the controller or entity that manages a MPIJob.
// The value must be either an empty, 'kubeflow.org/mpi-operator' or
// 'kueue.x-k8s.io/multikueue'.
// The mpi-operator reconciles a job which doesn't have this
// The mpi-operator reconciles a MPIJob which doesn't have this
// field at all or the field value is the reserved string
// 'kubeflow.org/mpi-operator', but delegates reconciling the job
// 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob
// with 'kueue.x-k8s.io/multikueue' to the Kueue.
// The field is immutable.
ManagedBy *string `json:"managedBy,omitempty"`
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ func (f *fixture) expectCreateSecretAction(d *corev1.Secret) {
f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "secrets"}, d.Namespace, d))
}

func (f *fixture) expectNoKubeActions() bool {
k8sActions := filterInformerActions(f.kubeClient.Actions())
return len(k8sActions) == 0
}

func (f *fixture) expectUpdateMPIJobStatusAction(mpiJob *kubeflow.MPIJob) {
action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "mpijobs"}, mpiJob.Namespace, mpiJob)
action.Subresource = "status"
Expand Down Expand Up @@ -514,6 +519,9 @@ func TestDoNothingWithMPIJobManagedExternally(t *testing.T) {
mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController)
f.setUpMPIJob(mpiJob)
f.run(getKey(mpiJob, t))
if !f.expectNoKubeActions() {
t.Fatalf("Expected no kubeActions (secrets, pods, services etc.)")
}
}

func TestAllResourcesCreated(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/v2beta1/docs/V2beta1RunPolicy.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 56 additions & 8 deletions test/e2e/mpi_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,23 @@ var _ = ginkgo.Describe("MPIJob", func() {
gomega.Expect(condition).To(gomega.BeNil())
condition = getJobCondition(mpiJob, kubeflow.JobSucceeded)
gomega.Expect(condition).To(gomega.BeNil())
launcherPods, err := getLauncherPods(ctx, mpiJob)
gomega.Expect(err).To(gomega.BeNil())
gomega.Expect(launcherPods.Items).To(gomega.BeZero())
workerPods, err := getWorkerPods(ctx, mpiJob)
gomega.Expect(err).To(gomega.BeNil())
gomega.Expect(workerPods.Items).To(gomega.BeZero())
secret, err := getSecretsForJob(ctx, mpiJob)
gomega.Expect(err).To(gomega.BeNil())
gomega.Expect(secret).To(gomega.BeNil())
})
})

ginkgo.It("should succeed when explicitly managed by mpi-operator", func() {
mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.KubeflowJobController)
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})
})
})

ginkgo.Context("with Intel Implementation", func() {
Expand Down Expand Up @@ -558,7 +572,7 @@ func waitForCompletion(ctx context.Context, mpiJob *kubeflow.MPIJob) *kubeflow.M
return mpiJob
}

func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error {
func getLauncherPods(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.PodList, error) {
selector := metav1.LabelSelector{
MatchLabels: map[string]string{
kubeflow.OperatorNameLabel: kubeflow.OperatorName,
Expand All @@ -570,7 +584,45 @@ func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error {
LabelSelector: metav1.FormatLabelSelector(&selector),
})
if err != nil {
return fmt.Errorf("getting launcher Pods: %w", err)
return &corev1.PodList{}, fmt.Errorf("getting launcher Pods: %w", err)
}
return launcherPods, nil
}

func getWorkerPods(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.PodList, error) {
selector := metav1.LabelSelector{
MatchLabels: map[string]string{
kubeflow.OperatorNameLabel: kubeflow.OperatorName,
kubeflow.JobNameLabel: mpiJob.Name,
kubeflow.JobRoleLabel: "worker",
},
}
workerPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&selector),
})
if err != nil {
return &corev1.PodList{}, fmt.Errorf("getting worker Pods: %w", err)
}
return workerPods, nil
}

func getSecretsForJob(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.Secret, error) {
result, err := k8sClient.CoreV1().Secrets(mpiJob.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
for _, obj := range result.Items {
if metav1.IsControlledBy(&obj, mpiJob) {
return &obj, nil
}
}
return nil, nil
}

func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error {
launcherPods, err := getLauncherPods(ctx, mpiJob)
if err != nil {
return err
}
if len(launcherPods.Items) == 0 {
return fmt.Errorf("no launcher Pods found")
Expand All @@ -585,11 +637,7 @@ func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error {
if err != nil {
return fmt.Errorf("obtaining launcher logs: %w", err)
}

selector.MatchLabels[kubeflow.JobRoleLabel] = "worker"
workerPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&selector),
})
workerPods, err := getWorkerPods(ctx, mpiJob)
if err != nil {
return fmt.Errorf("getting worker Pods: %w", err)
}
Expand Down

0 comments on commit 6c3e7f2

Please sign in to comment.