Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecated pointer, use ptr instead #627

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -673,7 +672,7 @@ func (c *MPIJobController) syncHandler(key string) error {
if launcher != nil {
if isMPIJobSuspended(mpiJob) != isJobSuspended(launcher) {
// align the suspension state of launcher with the MPIJob
launcher.Spec.Suspend = pointer.Bool(isMPIJobSuspended(mpiJob))
launcher.Spec.Suspend = ptr.To(isMPIJobSuspended(mpiJob))
if _, err := c.kubeClient.BatchV1().Jobs(namespace).Update(context.TODO(), launcher, metav1.UpdateOptions{}); err != nil {
return err
}
Expand Down Expand Up @@ -998,11 +997,11 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
}

func isMPIJobSuspended(mpiJob *kubeflow.MPIJob) bool {
return pointer.BoolDeref(mpiJob.Spec.RunPolicy.Suspend, false)
return ptr.Deref(mpiJob.Spec.RunPolicy.Suspend, false)
}

func isJobSuspended(job *batchv1.Job) bool {
return pointer.BoolDeref(job.Spec.Suspend, false)
return ptr.Deref(job.Spec.Suspend, false)
}

func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error {
Expand Down Expand Up @@ -1486,7 +1485,7 @@ func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job
},
}
if isMPIJobSuspended(mpiJob) {
job.Spec.Suspend = pointer.Bool(true)
job.Spec.Suspend = ptr.To(true)
}
return job
}
Expand Down
29 changes: 14 additions & 15 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
clocktesting "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
Expand Down Expand Up @@ -804,7 +803,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) {
// create a suspended job
var replicas int32 = 8
mpiJob := newMPIJob("test", &replicas, nil, nil)
mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true)
mpiJob.Spec.RunPolicy.Suspend = ptr.To(true)
mpiJob.Spec.MPIImplementation = implementation
f.setUpMPIJob(mpiJob)

Expand All @@ -823,7 +822,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) {
// expect creating of the launcher
fmjc := f.newFakeMPIJobController()
launcher := fmjc.newLauncherJob(mpiJob)
launcher.Spec.Suspend = pointer.Bool(true)
launcher.Spec.Suspend = ptr.To(true)
f.expectCreateJobAction(launcher)

// expect an update to add the conditions
Expand Down Expand Up @@ -851,7 +850,7 @@ func TestSuspendedRunningMPIJob(t *testing.T) {
var replicas int32 = 8
startTime := metav1.Now()
mpiJob := newMPIJob("test", &replicas, &startTime, nil)
mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(false)
mpiJob.Spec.RunPolicy.Suspend = ptr.To(false)
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg)
msg = fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name)
Expand Down Expand Up @@ -893,18 +892,18 @@ func TestSuspendedRunningMPIJob(t *testing.T) {

// setup launcher and its pod
launcher := fmjc.newLauncherJob(mpiJob)
launcher.Spec.Suspend = pointer.Bool(false)
launcher.Spec.Suspend = ptr.To(false)
launcherPod := mockJobPod(launcher)
launcherPod.Status.Phase = corev1.PodRunning
f.setUpLauncher(launcher)
f.setUpPod(launcherPod)

// transition the MPIJob into suspended state
mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true)
mpiJob.Spec.RunPolicy.Suspend = ptr.To(true)

// expect moving the launcher pod into suspended state
launcherCopy := launcher.DeepCopy()
launcherCopy.Spec.Suspend = pointer.Bool(true)
launcherCopy.Spec.Suspend = ptr.To(true)
f.expectUpdateJobAction(launcherCopy)

// expect removal of the pods
Expand Down Expand Up @@ -939,7 +938,7 @@ func TestResumeMPIJob(t *testing.T) {
var replicas int32 = 8
startTime := metav1.Now()
mpiJob := newMPIJob("test", &replicas, &startTime, nil)
mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true)
mpiJob.Spec.RunPolicy.Suspend = ptr.To(true)
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg)
updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, corev1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended")
Expand All @@ -966,14 +965,14 @@ func TestResumeMPIJob(t *testing.T) {
// expect creating of the launcher
fmjc := f.newFakeMPIJobController()
launcher := fmjc.newLauncherJob(mpiJob)
launcher.Spec.Suspend = pointer.Bool(true)
launcher.Spec.Suspend = ptr.To(true)
f.setUpLauncher(launcher)

// move the timer by a second so that the StartTime is updated after resume
fakeClock.Sleep(time.Second)

// resume the MPIJob
mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(false)
mpiJob.Spec.RunPolicy.Suspend = ptr.To(false)

// expect creation of the pods
for i := 0; i < int(replicas); i++ {
Expand All @@ -983,7 +982,7 @@ func TestResumeMPIJob(t *testing.T) {

// expect the launcher update to resume it
launcherCopy := launcher.DeepCopy()
launcherCopy.Spec.Suspend = pointer.Bool(false)
launcherCopy.Spec.Suspend = ptr.To(false)
f.expectUpdateJobAction(launcherCopy)

// expect an update to add the conditions
Expand Down Expand Up @@ -1545,7 +1544,7 @@ func TestNewConfigMap(t *testing.T) {
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: pointer.Bool(true),
RunLauncherAsWorker: ptr.To(true),
},
},
workerReplicas: 2,
Expand All @@ -1570,7 +1569,7 @@ func TestNewConfigMap(t *testing.T) {
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: pointer.Bool(true),
RunLauncherAsWorker: ptr.To(true),
},
},
workerReplicas: 0,
Expand Down Expand Up @@ -1618,7 +1617,7 @@ func TestNewConfigMap(t *testing.T) {
Namespace: "project-x",
},
Spec: kubeflow.MPIJobSpec{
SlotsPerWorker: pointer.Int32(10),
SlotsPerWorker: ptr.To[int32](10),
MPIImplementation: kubeflow.MPIImplementationIntel,
},
},
Expand All @@ -1643,7 +1642,7 @@ func TestNewConfigMap(t *testing.T) {
Namespace: "project-x",
},
Spec: kubeflow.MPIJobSpec{
SlotsPerWorker: pointer.Int32(10),
SlotsPerWorker: ptr.To[int32](10),
MPIImplementation: kubeflow.MPIImplementationMPICH,
},
},
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
schedulinglisters "k8s.io/client-go/listers/scheduling/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
schedinformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
Expand Down Expand Up @@ -242,7 +242,7 @@ func (s *SchedulerPluginsCtrl) newPodGroup(mpiJob *kubeflow.MPIJob) metav1.Objec
if mpiJob == nil {
return nil
}
scheduleTimeoutSec := pointer.Int32(0)
scheduleTimeoutSec := ptr.To[int32](0)
if schedPolicy := mpiJob.Spec.RunPolicy.SchedulingPolicy; schedPolicy != nil && schedPolicy.ScheduleTimeoutSeconds != nil {
scheduleTimeoutSec = schedPolicy.ScheduleTimeoutSeconds
}
Expand Down Expand Up @@ -364,9 +364,9 @@ func calPGMinResource(minMember *int32, mpiJob *kubeflow.MPIJob, pcLister schedu
klog.Warningf("Couldn't find the worker replicas")
return nil
}
order[wIndex].Replicas = pointer.Int32(*minMember - 1)
order[wIndex].Replicas = ptr.To(*minMember - 1)
} else {
order[1].Replicas = pointer.Int32(*minMember - 1)
order[1].Replicas = ptr.To(*minMember - 1)
}
}

Expand All @@ -390,7 +390,7 @@ func calculateMinAvailable(mpiJob *kubeflow.MPIJob) *int32 {
if schedulingPolicy := mpiJob.Spec.RunPolicy.SchedulingPolicy; schedulingPolicy != nil && schedulingPolicy.MinAvailable != nil {
return schedulingPolicy.MinAvailable
}
return pointer.Int32(workerReplicas(mpiJob) + 1)
return ptr.To(workerReplicas(mpiJob) + 1)
}

// calculatePriorityClassName calculates the priorityClass name needed for podGroup according to the following priorities:
Expand Down
Loading
Loading