Skip to content

Commit

Permalink
Adjust KFTO PyTorchJob upgrade tests to be idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
sutaakar authored and openshift-merge-bot[bot] committed Dec 5, 2024
1 parent 03d5ee3 commit fcc8fc7
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 55 deletions.
8 changes: 8 additions & 0 deletions tests/kfto/core/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ func PyTorchJob(t Test, namespace, name string) func(g Gomega) *kftov1.PyTorchJo
}
}

func PyTorchJobs(t Test, namespace string) func(g Gomega) []kftov1.PyTorchJob {
return func(g Gomega) []kftov1.PyTorchJob {
jobs, err := t.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).List(t.Ctx(), metav1.ListOptions{})
g.Expect(err).NotTo(HaveOccurred())
return jobs.Items
}
}

func PyTorchJobConditionRunning(job *kftov1.PyTorchJob) corev1.ConditionStatus {
return PyTorchJobCondition(job, kftov1.JobRunning)
}
Expand Down
107 changes: 60 additions & 47 deletions tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
kueueacv1beta1 "sigs.k8s.io/kueue/client-go/applyconfiguration/kueue/v1beta1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -36,20 +37,14 @@ var (
namespaceName = "test-kfto-upgrade"
resourceFlavorName = "rf-upgrade"
clusterQueueName = "cq-upgrade"
localQueueName = "lq-upgrade"
pyTorchJobName = "pytorch-upgrade"
)

func TestSetupPytorchjob(t *testing.T) {
test := With(t)

// Create a namespace
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespaceName,
},
}
_, err := test.Client().Core().CoreV1().Namespaces().Create(test.Ctx(), namespace, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
createOrGetUpgradeTestNamespace(test, namespaceName)

// Create a ConfigMap with training dataset and configuration
configData := map[string][]byte{
Expand All @@ -59,50 +54,43 @@ func TestSetupPytorchjob(t *testing.T) {
config := CreateConfigMap(test, namespaceName, configData)

// Create Kueue resources
resourceFlavor := &kueuev1beta1.ResourceFlavor{
ObjectMeta: metav1.ObjectMeta{
Name: resourceFlavorName,
},
}
resourceFlavor, err = test.Client().Kueue().KueueV1beta1().ResourceFlavors().Create(test.Ctx(), resourceFlavor, metav1.CreateOptions{})
resourceFlavor := kueueacv1beta1.ResourceFlavor(resourceFlavorName)
appliedResourceFlavor, err := test.Client().Kueue().KueueV1beta1().ResourceFlavors().Apply(test.Ctx(), resourceFlavor, metav1.ApplyOptions{FieldManager: "setup-PyTorchJob", Force: true})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Applied Kueue ResourceFlavor %s successfully", appliedResourceFlavor.Name)

clusterQueue := &kueuev1beta1.ClusterQueue{
ObjectMeta: metav1.ObjectMeta{
Name: clusterQueueName,
},
Spec: kueuev1beta1.ClusterQueueSpec{
NamespaceSelector: &metav1.LabelSelector{},
ResourceGroups: []kueuev1beta1.ResourceGroup{
{
CoveredResources: []corev1.ResourceName{corev1.ResourceName("cpu"), corev1.ResourceName("memory")},
Flavors: []kueuev1beta1.FlavorQuotas{
{
Name: kueuev1beta1.ResourceFlavorReference(resourceFlavor.Name),
Resources: []kueuev1beta1.ResourceQuota{
{
Name: corev1.ResourceCPU,
NominalQuota: resource.MustParse("8"),
},
{
Name: corev1.ResourceMemory,
NominalQuota: resource.MustParse("12Gi"),
},
},
},
},
},
},
StopPolicy: Ptr(kueuev1beta1.Hold),
},
}
clusterQueue, err = test.Client().Kueue().KueueV1beta1().ClusterQueues().Create(test.Ctx(), clusterQueue, metav1.CreateOptions{})
clusterQueue := kueueacv1beta1.ClusterQueue(clusterQueueName).WithSpec(
kueueacv1beta1.ClusterQueueSpec().
WithNamespaceSelector(metav1.LabelSelector{}).
WithResourceGroups(
kueueacv1beta1.ResourceGroup().WithCoveredResources(
corev1.ResourceName("cpu"), corev1.ResourceName("memory"),
).WithFlavors(
kueueacv1beta1.FlavorQuotas().
WithName(kueuev1beta1.ResourceFlavorReference(resourceFlavorName)).
WithResources(
kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceCPU).WithNominalQuota(resource.MustParse("8")),
kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceMemory).WithNominalQuota(resource.MustParse("12Gi")),
),
),
).
WithStopPolicy(kueuev1beta1.Hold),
)
appliedClusterQueue, err := test.Client().Kueue().KueueV1beta1().ClusterQueues().Apply(test.Ctx(), clusterQueue, metav1.ApplyOptions{FieldManager: "setup-PyTorchJob", Force: true})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Applied Kueue ClusterQueue %s successfully", appliedClusterQueue.Name)

localQueue := CreateKueueLocalQueue(test, namespaceName, clusterQueue.Name, AsDefaultQueue)
localQueue := kueueacv1beta1.LocalQueue(localQueueName, namespaceName).
WithAnnotations(map[string]string{"kueue.x-k8s.io/default-queue": "true"}).
WithSpec(
kueueacv1beta1.LocalQueueSpec().WithClusterQueue(kueuev1beta1.ClusterQueueReference(clusterQueueName)),
)
appliedLocalQueue, err := test.Client().Kueue().KueueV1beta1().LocalQueues(namespaceName).Apply(test.Ctx(), localQueue, metav1.ApplyOptions{FieldManager: "setup-PyTorchJob", Force: true})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Applied Kueue LocalQueue %s/%s successfully", appliedLocalQueue.Namespace, appliedLocalQueue.Name)

// Create training PyTorch job
tuningJob := createPyTorchJob(test, namespaceName, localQueue.Name, *config)
tuningJob := createPyTorchJob(test, namespaceName, appliedLocalQueue.Name, *config)

// Make sure the PyTorch job is suspended, waiting for ClusterQueue to be enabled
test.Eventually(kftocore.PyTorchJob(test, tuningJob.Namespace, pyTorchJobName), TestTimeoutShort).
Expand Down Expand Up @@ -133,6 +121,17 @@ func TestRunPytorchjob(t *testing.T) {
}

func createPyTorchJob(test Test, namespace, localQueueName string, config corev1.ConfigMap) *kftov1.PyTorchJob {
// Does PyTorchJob already exist?
_, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Get(test.Ctx(), pyTorchJobName, metav1.GetOptions{})
if err == nil {
// If yes then delete it and wait until there are no PyTorchJobs in the namespace
err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), pyTorchJobName, metav1.DeleteOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.Eventually(kftocore.PyTorchJobs(test, namespace), TestTimeoutShort).Should(BeEmpty())
} else if !errors.IsNotFound(err) {
test.T().Fatalf("Error retrieving PyTorchJob with name `%s`: %v", pyTorchJobName, err)
}

tuningJob := &kftov1.PyTorchJob{
ObjectMeta: metav1.ObjectMeta{
Name: pyTorchJobName,
Expand Down Expand Up @@ -244,9 +243,23 @@ func createPyTorchJob(test Test, namespace, localQueueName string, config corev1
},
}

tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
tuningJob, err = test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name)

return tuningJob
}

func createOrGetUpgradeTestNamespace(test Test, name string, options ...Option[*corev1.Namespace]) (namespace *corev1.Namespace) {
// Verify that the namespace really exists and return it, create it if doesn't exist yet
namespace, err := test.Client().Core().CoreV1().Namespaces().Get(test.Ctx(), name, metav1.GetOptions{})
if err == nil {
return
} else if errors.IsNotFound(err) {
test.T().Logf("%s namespace doesn't exists. Creating ...", name)
return CreateTestNamespaceWithName(test, name, options...)
} else {
test.T().Fatalf("Error retrieving namespace with name `%s`: %v", name, err)
}
return
}
22 changes: 14 additions & 8 deletions tests/kfto/upgrade/kfto_sft_upgrade_sleep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
. "github.com/project-codeflare/codeflare-common/support"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
Expand All @@ -38,13 +39,7 @@ func TestSetupSleepPytorchjob(t *testing.T) {
test := With(t)

// Create a namespace
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: sleepNamespaceName,
},
}
_, err := test.Client().Core().CoreV1().Namespaces().Create(test.Ctx(), namespace, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
createOrGetUpgradeTestNamespace(test, sleepNamespaceName)

// Create training PyTorch job
createSleepPyTorchJob(test, sleepNamespaceName)
Expand Down Expand Up @@ -76,6 +71,17 @@ func TestVerifySleepPytorchjob(t *testing.T) {
}

func createSleepPyTorchJob(test Test, namespace string) *kftov1.PyTorchJob {
// Does PyTorchJob already exist?
_, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Get(test.Ctx(), sleepPyTorchJobName, metav1.GetOptions{})
if err == nil {
// If yes then delete it and wait until there are no PyTorchJobs in the namespace
err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), sleepPyTorchJobName, metav1.DeleteOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.Eventually(kftocore.PyTorchJobs(test, namespace), TestTimeoutShort).Should(BeEmpty())
} else if !errors.IsNotFound(err) {
test.T().Fatalf("Error retrieving PyTorchJob with name `%s`: %v", sleepPyTorchJobName, err)
}

tuningJob := &kftov1.PyTorchJob{
ObjectMeta: metav1.ObjectMeta{
Name: sleepPyTorchJobName,
Expand All @@ -102,7 +108,7 @@ func createSleepPyTorchJob(test Test, namespace string) *kftov1.PyTorchJob {
},
}

tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
tuningJob, err = test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name)

Expand Down

0 comments on commit fcc8fc7

Please sign in to comment.