diff --git a/deploy/v2beta1/mpi-operator.yaml b/deploy/v2beta1/mpi-operator.yaml index dc6f32b9..420710d1 100644 --- a/deploy/v2beta1/mpi-operator.yaml +++ b/deploy/v2beta1/mpi-operator.yaml @@ -7759,12 +7759,12 @@ spec: type: string managedBy: description: |- - ManagedBy is used to indicate the controller or entity that manages a job. + ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. - The mpi-operator reconciles a job which doesn't have this + The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string - 'kubeflow.org/mpi-operator', but delegates reconciling the job + 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. type: string diff --git a/manifests/base/kubeflow.org_mpijobs.yaml b/manifests/base/kubeflow.org_mpijobs.yaml index a86fc73e..38169bb1 100644 --- a/manifests/base/kubeflow.org_mpijobs.yaml +++ b/manifests/base/kubeflow.org_mpijobs.yaml @@ -7736,12 +7736,12 @@ spec: type: string managedBy: description: |- - ManagedBy is used to indicate the controller or entity that manages a job. + ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. - The mpi-operator reconciles a job which doesn't have this + The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string - 'kubeflow.org/mpi-operator', but delegates reconciling the job + 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. type: string diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 80cc8569..61449c43 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -242,7 +242,7 @@ "type": "string" }, "managedBy": { - "description": "ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.", + "description": "ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.", "type": "string" }, "schedulingPolicy": { diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 9178c317..187f266a 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -140,12 +140,12 @@ type RunPolicy struct { // +kubebuilder:default:=false Suspend *bool `json:"suspend,omitempty"` - // ManagedBy is used to indicate the controller or entity that manages a job. + // ManagedBy is used to indicate the controller or entity that manages a MPIJob. // The value must be either an empty, 'kubeflow.org/mpi-operator' or // 'kueue.x-k8s.io/multikueue'. - // The mpi-operator reconciles a job which doesn't have this + // The mpi-operator reconciles a MPIJob which doesn't have this // field at all or the field value is the reserved string - // 'kubeflow.org/mpi-operator', but delegates reconciling the job + // 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob // with 'kueue.x-k8s.io/multikueue' to the Kueue. // The field is immutable. ManagedBy *string `json:"managedBy,omitempty"` diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 5ed8bc1e..3594d354 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -418,6 +418,11 @@ func (f *fixture) expectCreateSecretAction(d *corev1.Secret) { f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "secrets"}, d.Namespace, d)) } +func (f *fixture) expectNoKubeActions() bool { + k8sActions := filterInformerActions(f.kubeClient.Actions()) + return len(k8sActions) == 0 +} + func (f *fixture) expectUpdateMPIJobStatusAction(mpiJob *kubeflow.MPIJob) { action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "mpijobs"}, mpiJob.Namespace, mpiJob) action.Subresource = "status" @@ -514,6 +519,9 @@ func TestDoNothingWithMPIJobManagedExternally(t *testing.T) { mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController) f.setUpMPIJob(mpiJob) f.run(getKey(mpiJob, t)) + if !f.expectNoKubeActions() { + t.Fatalf("Expected no kubeActions (secrets, pods, services etc.)") + } } func TestAllResourcesCreated(t *testing.T) { diff --git a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md index 2c790c23..6aeaf09d 100644 --- a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md +++ b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md @@ -8,7 +8,7 @@ Name | Type | Description | Notes **active_deadline_seconds** | **int** | Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer. | [optional] **backoff_limit** | **int** | Optional number of retries before marking this job failed. | [optional] **clean_pod_policy** | **str** | CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. | [optional] -**managed_by** | **str** | ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. | [optional] +**managed_by** | **str** | ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. | [optional] **scheduling_policy** | [**V2beta1SchedulingPolicy**](V2beta1SchedulingPolicy.md) | | [optional] **suspend** | **bool** | suspend specifies whether the MPIJob controller should create Pods or not. If a MPIJob is created with suspend set to true, no Pods are created by the MPIJob controller. If a MPIJob is suspended after creation (i.e. the flag goes from false to true), the MPIJob controller will delete all active Pods and PodGroups associated with this MPIJob. Also, it will suspend the Launcher Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the MPIJob. Defaults to false. | [optional] **ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite. | [optional] diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py index b039bf66..1860c6dc 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py @@ -155,7 +155,7 @@ def clean_pod_policy(self, clean_pod_policy): def managed_by(self): """Gets the managed_by of this V2beta1RunPolicy. # noqa: E501 - ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 + ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 :return: The managed_by of this V2beta1RunPolicy. # noqa: E501 :rtype: str @@ -166,7 +166,7 @@ def managed_by(self): def managed_by(self, managed_by): """Sets the managed_by of this V2beta1RunPolicy. - ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 + ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 :param managed_by: The managed_by of this V2beta1RunPolicy. # noqa: E501 :type managed_by: str diff --git a/test/e2e/mpi_job_test.go b/test/e2e/mpi_job_test.go index 2359b62f..f9ccd41a 100644 --- a/test/e2e/mpi_job_test.go +++ b/test/e2e/mpi_job_test.go @@ -185,9 +185,23 @@ var _ = ginkgo.Describe("MPIJob", func() { gomega.Expect(condition).To(gomega.BeNil()) condition = getJobCondition(mpiJob, kubeflow.JobSucceeded) gomega.Expect(condition).To(gomega.BeNil()) + launcherPods, err := getLauncherPods(ctx, mpiJob) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(launcherPods.Items).To(gomega.BeZero()) + workerPods, err := getWorkerPods(ctx, mpiJob) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(workerPods.Items).To(gomega.BeZero()) + secret, err := getSecretsForJob(ctx, mpiJob) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(secret).To(gomega.BeNil()) }) - }) + ginkgo.It("should succeed when explicitly managed by mpi-operator", func() { + mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.KubeflowJobController) + mpiJob := createJobAndWaitForCompletion(mpiJob) + expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded) + }) + }) }) ginkgo.Context("with Intel Implementation", func() { @@ -558,7 +572,7 @@ func waitForCompletion(ctx context.Context, mpiJob *kubeflow.MPIJob) *kubeflow.M return mpiJob } -func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { +func getLauncherPods(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.PodList, error) { selector := metav1.LabelSelector{ MatchLabels: map[string]string{ kubeflow.OperatorNameLabel: kubeflow.OperatorName, @@ -570,7 +584,45 @@ func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { LabelSelector: metav1.FormatLabelSelector(&selector), }) if err != nil { - return fmt.Errorf("getting launcher Pods: %w", err) + return &corev1.PodList{}, fmt.Errorf("getting launcher Pods: %w", err) + } + return launcherPods, nil +} + +func getWorkerPods(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.PodList, error) { + selector := metav1.LabelSelector{ + MatchLabels: map[string]string{ + kubeflow.OperatorNameLabel: kubeflow.OperatorName, + kubeflow.JobNameLabel: mpiJob.Name, + kubeflow.JobRoleLabel: "worker", + }, + } + workerPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&selector), + }) + if err != nil { + return &corev1.PodList{}, fmt.Errorf("getting worker Pods: %w", err) + } + return workerPods, nil +} + +func getSecretsForJob(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.Secret, error) { + result, err := k8sClient.CoreV1().Secrets(mpiJob.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + for _, obj := range result.Items { + if metav1.IsControlledBy(&obj, mpiJob) { + return &obj, nil + } + } + return nil, nil +} + +func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { + launcherPods, err := getLauncherPods(ctx, mpiJob) + if err != nil { + return err } if len(launcherPods.Items) == 0 { return fmt.Errorf("no launcher Pods found") @@ -585,11 +637,7 @@ func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { if err != nil { return fmt.Errorf("obtaining launcher logs: %w", err) } - - selector.MatchLabels[kubeflow.JobRoleLabel] = "worker" - workerPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{ - LabelSelector: metav1.FormatLabelSelector(&selector), - }) + workerPods, err := getWorkerPods(ctx, mpiJob) if err != nil { return fmt.Errorf("getting worker Pods: %w", err) }