diff --git a/go.mod b/go.mod index 3024ed8fb..13edc6b8d 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.19 require ( github.com/google/go-cmp v0.5.9 - github.com/kubeflow/common v0.4.6 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.20.1 github.com/prometheus/client_golang v1.12.2 diff --git a/go.sum b/go.sum index ce54f973e..c64e677e4 100644 --- a/go.sum +++ b/go.sum @@ -229,8 +229,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubeflow/common v0.4.6 h1:yzJf/HEdS6ginD0GlVkgbOFie0Sp66VdGjXidAGZIlk= -github.com/kubeflow/common v0.4.6/go.mod h1:43MAof/uhpJA2C0urynqatE3oKFQc7m2HLmJty7waqY= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= diff --git a/hack/python-sdk/gen-sdk.sh b/hack/python-sdk/gen-sdk.sh index 032a02d91..95d7f9a80 100755 --- a/hack/python-sdk/gen-sdk.sh +++ b/hack/python-sdk/gen-sdk.sh @@ -34,7 +34,7 @@ mv pkg/apis/kubeflow/v2beta1/openapi_generated.go pkg/apis/kubeflow/v2beta1/open CODEGEN_VERSION=$(grep 'k8s.io/code-generator' go.sum | awk '{print $2}' | sed 's/\/go.mod//g' | head -1) GOBIN="${PWD}/bin" go install "k8s.io/code-generator/cmd/openapi-gen@${CODEGEN_VERSION}" echo "Generating V2 OpenAPI specification ..." -"${PWD}/bin/openapi-gen" --input-dirs github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1,github.com/kubeflow/common/pkg/apis/common/v1 --output-package github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1 --go-header-file hack/boilerplate/boilerplate.go.txt +"${PWD}/bin/openapi-gen" --input-dirs github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1 --output-package github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1 --go-header-file hack/boilerplate/boilerplate.go.txt echo "Generating V2 swagger file ..." go run hack/python-sdk/main.go v2beta1 > ${SWAGGER_V2_CODEGEN_FILE} diff --git a/hack/python-sdk/main.go b/hack/python-sdk/main.go index 91bbc6f9a..18541b883 100644 --- a/hack/python-sdk/main.go +++ b/hack/python-sdk/main.go @@ -73,7 +73,6 @@ func main() { func swaggify(name string) string { name = strings.Replace(name, "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/", "", -1) - name = strings.Replace(name, "github.com/kubeflow/common/pkg/apis/common/", "", -1) name = strings.Replace(name, "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/", "", -1) name = strings.Replace(name, "k8s.io/api/core/", "", -1) name = strings.Replace(name, "k8s.io/apimachinery/pkg/apis/meta/", "", -1) diff --git a/pkg/apis/kubeflow/v2beta1/constants.go b/pkg/apis/kubeflow/v2beta1/constants.go index 2f4319141..3387176b4 100644 --- a/pkg/apis/kubeflow/v2beta1/constants.go +++ b/pkg/apis/kubeflow/v2beta1/constants.go @@ -14,15 +14,33 @@ package v2beta1 -import common "github.com/kubeflow/common/pkg/apis/common/v1" - const ( // EnvKubeflowNamespace is ENV for kubeflow namespace specified by user. EnvKubeflowNamespace = "KUBEFLOW_NAMESPACE" // DefaultRestartPolicy is default RestartPolicy for ReplicaSpec. - DefaultRestartPolicy = common.RestartPolicyNever + DefaultRestartPolicy = RestartPolicyNever // DefaultLauncherRestartPolicy is default RestartPolicy for Launcher Job. - DefaultLauncherRestartPolicy = common.RestartPolicyOnFailure + DefaultLauncherRestartPolicy = RestartPolicyOnFailure // OperatorName is the name of the operator used as value to the label common.OperatorLabelName OperatorName = "mpi-operator" ) + +// merge from common.v1 +// reference https://github.com/kubeflow/training-operator/blob/master/pkg/apis/kubeflow.org/v1/common_types.go +const ( + + // ReplicaIndexLabel represents the label key for the replica-index, e.g. 0, 1, 2.. etc + ReplicaIndexLabel = "training.kubeflow.org/replica-index" + + // ReplicaTypeLabel represents the label key for the replica-type, e.g. ps, worker etc. + ReplicaTypeLabel = "training.kubeflow.org/replica-type" + + // OperatorNameLabel represents the label key for the operator name, e.g. tf-operator, mpi-operator, etc. + OperatorNameLabel = "training.kubeflow.org/operator-name" + + // JobNameLabel represents the label key for the job name, the value is the job name. + JobNameLabel = "training.kubeflow.org/job-name" + + // JobRoleLabel represents the label key for the job role, e.g. master. + JobRoleLabel = "training.kubeflow.org/job-role" +) diff --git a/pkg/apis/kubeflow/v2beta1/default.go b/pkg/apis/kubeflow/v2beta1/default.go index ee6fb2cfd..f1389463b 100644 --- a/pkg/apis/kubeflow/v2beta1/default.go +++ b/pkg/apis/kubeflow/v2beta1/default.go @@ -15,7 +15,6 @@ package v2beta1 import ( - common "github.com/kubeflow/common/pkg/apis/common/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -24,7 +23,7 @@ func addDefaultingFuncs(scheme *runtime.Scheme) error { } // setDefaultsTypeLauncher sets the default value to launcher. -func setDefaultsTypeLauncher(spec *common.ReplicaSpec) { +func setDefaultsTypeLauncher(spec *ReplicaSpec) { if spec == nil { return } @@ -37,7 +36,7 @@ func setDefaultsTypeLauncher(spec *common.ReplicaSpec) { } // setDefaultsTypeWorker sets the default value to worker. -func setDefaultsTypeWorker(spec *common.ReplicaSpec) { +func setDefaultsTypeWorker(spec *ReplicaSpec) { if spec == nil { return } diff --git a/pkg/apis/kubeflow/v2beta1/default_test.go b/pkg/apis/kubeflow/v2beta1/default_test.go index b7b506553..e2cac1c12 100644 --- a/pkg/apis/kubeflow/v2beta1/default_test.go +++ b/pkg/apis/kubeflow/v2beta1/default_test.go @@ -18,7 +18,6 @@ import ( "testing" "github.com/google/go-cmp/cmp" - common "github.com/kubeflow/common/pkg/apis/common/v1" ) func TestSetDefaults_MPIJob(t *testing.T) { @@ -102,7 +101,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { "launcher defaults": { job: MPIJob{ Spec: MPIJobSpec{ - MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[MPIReplicaType]*ReplicaSpec{ MPIReplicaTypeLauncher: {}, }, }, @@ -116,7 +115,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { SSHAuthMountPath: "/root/.ssh", MPIImplementation: MPIImplementationOpenMPI, LauncherCreationPolicy: "AtStartup", - MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[MPIReplicaType]*ReplicaSpec{ MPIReplicaTypeLauncher: { Replicas: newInt32(1), RestartPolicy: DefaultLauncherRestartPolicy, @@ -128,7 +127,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { "worker defaults": { job: MPIJob{ Spec: MPIJobSpec{ - MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[MPIReplicaType]*ReplicaSpec{ MPIReplicaTypeWorker: {}, }, }, @@ -142,7 +141,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { SSHAuthMountPath: "/root/.ssh", MPIImplementation: MPIImplementationOpenMPI, LauncherCreationPolicy: "AtStartup", - MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[MPIReplicaType]*ReplicaSpec{ MPIReplicaTypeWorker: { Replicas: newInt32(0), RestartPolicy: DefaultRestartPolicy, diff --git a/pkg/apis/kubeflow/v2beta1/openapi_generated.go b/pkg/apis/kubeflow/v2beta1/openapi_generated.go index 76ccc734c..02b34df05 100644 --- a/pkg/apis/kubeflow/v2beta1/openapi_generated.go +++ b/pkg/apis/kubeflow/v2beta1/openapi_generated.go @@ -28,21 +28,19 @@ import ( func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { return map[string]common.OpenAPIDefinition{ - "github.com/kubeflow/common/pkg/apis/common/v1.JobCondition": schema_pkg_apis_common_v1_JobCondition(ref), - "github.com/kubeflow/common/pkg/apis/common/v1.JobStatus": schema_pkg_apis_common_v1_JobStatus(ref), - "github.com/kubeflow/common/pkg/apis/common/v1.ReplicaSpec": schema_pkg_apis_common_v1_ReplicaSpec(ref), - "github.com/kubeflow/common/pkg/apis/common/v1.ReplicaStatus": schema_pkg_apis_common_v1_ReplicaStatus(ref), - "github.com/kubeflow/common/pkg/apis/common/v1.RunPolicy": schema_pkg_apis_common_v1_RunPolicy(ref), - "github.com/kubeflow/common/pkg/apis/common/v1.SchedulingPolicy": schema_pkg_apis_common_v1_SchedulingPolicy(ref), + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.JobCondition": schema_pkg_apis_kubeflow_v2beta1_JobCondition(ref), + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.JobStatus": schema_pkg_apis_kubeflow_v2beta1_JobStatus(ref), "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJob": schema_pkg_apis_kubeflow_v2beta1_MPIJob(ref), "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJobList": schema_pkg_apis_kubeflow_v2beta1_MPIJobList(ref), "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJobSpec": schema_pkg_apis_kubeflow_v2beta1_MPIJobSpec(ref), + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.ReplicaSpec": schema_pkg_apis_kubeflow_v2beta1_ReplicaSpec(ref), + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.ReplicaStatus": schema_pkg_apis_kubeflow_v2beta1_ReplicaStatus(ref), "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.RunPolicy": schema_pkg_apis_kubeflow_v2beta1_RunPolicy(ref), "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.SchedulingPolicy": schema_pkg_apis_kubeflow_v2beta1_SchedulingPolicy(ref), } } -func schema_pkg_apis_common_v1_JobCondition(ref common.ReferenceCallback) common.OpenAPIDefinition { +func schema_pkg_apis_kubeflow_v2beta1_JobCondition(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ @@ -51,7 +49,7 @@ func schema_pkg_apis_common_v1_JobCondition(ref common.ReferenceCallback) common Properties: map[string]spec.Schema{ "type": { SchemaProps: spec.SchemaProps{ - Description: "Type of job condition.", + Description: "type of job condition.", Default: "", Type: []string{"string"}, Format: "", @@ -59,7 +57,7 @@ func schema_pkg_apis_common_v1_JobCondition(ref common.ReferenceCallback) common }, "status": { SchemaProps: spec.SchemaProps{ - Description: "Status of the condition, one of True, False, Unknown.", + Description: "status of the condition, one of True, False, Unknown.", Default: "", Type: []string{"string"}, Format: "", @@ -74,7 +72,7 @@ func schema_pkg_apis_common_v1_JobCondition(ref common.ReferenceCallback) common }, "message": { SchemaProps: spec.SchemaProps{ - Description: "A human readable message indicating details about the transition.", + Description: "A human-readable message indicating details about the transition.", Type: []string{"string"}, Format: "", }, @@ -102,7 +100,7 @@ func schema_pkg_apis_common_v1_JobCondition(ref common.ReferenceCallback) common } } -func schema_pkg_apis_common_v1_JobStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { +func schema_pkg_apis_kubeflow_v2beta1_JobStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ @@ -110,14 +108,22 @@ func schema_pkg_apis_common_v1_JobStatus(ref common.ReferenceCallback) common.Op Type: []string{"object"}, Properties: map[string]spec.Schema{ "conditions": { + VendorExtensible: spec.VendorExtensible{ + Extensions: spec.Extensions{ + "x-kubernetes-list-map-keys": []interface{}{ + "type", + }, + "x-kubernetes-list-type": "map", + }, + }, SchemaProps: spec.SchemaProps{ - Description: "Conditions is an array of current observed job conditions.", + Description: "conditions is a list of current observed job conditions.", Type: []string{"array"}, Items: &spec.SchemaOrArray{ Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ Default: map[string]interface{}{}, - Ref: ref("github.com/kubeflow/common/pkg/apis/common/v1.JobCondition"), + Ref: ref("github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.JobCondition"), }, }, }, @@ -125,13 +131,13 @@ func schema_pkg_apis_common_v1_JobStatus(ref common.ReferenceCallback) common.Op }, "replicaStatuses": { SchemaProps: spec.SchemaProps{ - Description: "ReplicaStatuses is map of ReplicaType and ReplicaStatus, specifies the status of each replica.", + Description: "replicaStatuses is map of ReplicaType and ReplicaStatus, specifies the status of each replica.", Type: []string{"object"}, AdditionalProperties: &spec.SchemaOrBool{ Allows: true, Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ - Ref: ref("github.com/kubeflow/common/pkg/apis/common/v1.ReplicaStatus"), + Ref: ref("github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.ReplicaStatus"), }, }, }, @@ -156,198 +162,10 @@ func schema_pkg_apis_common_v1_JobStatus(ref common.ReferenceCallback) common.Op }, }, }, - Required: []string{"conditions", "replicaStatuses"}, - }, - }, - Dependencies: []string{ - "github.com/kubeflow/common/pkg/apis/common/v1.JobCondition", "github.com/kubeflow/common/pkg/apis/common/v1.ReplicaStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, - } -} - -func schema_pkg_apis_common_v1_ReplicaSpec(ref common.ReferenceCallback) common.OpenAPIDefinition { - return common.OpenAPIDefinition{ - Schema: spec.Schema{ - SchemaProps: spec.SchemaProps{ - Description: "ReplicaSpec is a description of the replica", - Type: []string{"object"}, - Properties: map[string]spec.Schema{ - "replicas": { - SchemaProps: spec.SchemaProps{ - Description: "Replicas is the desired number of replicas of the given template. If unspecified, defaults to 1.", - Type: []string{"integer"}, - Format: "int32", - }, - }, - "template": { - SchemaProps: spec.SchemaProps{ - Description: "Template is the object that describes the pod that will be created for this replica. RestartPolicy in PodTemplateSpec will be overide by RestartPolicy in ReplicaSpec", - Default: map[string]interface{}{}, - Ref: ref("k8s.io/api/core/v1.PodTemplateSpec"), - }, - }, - "restartPolicy": { - SchemaProps: spec.SchemaProps{ - Description: "Restart policy for all replicas within the job. One of Always, OnFailure, Never and ExitCode. Default to Never.", - Type: []string{"string"}, - Format: "", - }, - }, - }, - }, - }, - Dependencies: []string{ - "k8s.io/api/core/v1.PodTemplateSpec"}, - } -} - -func schema_pkg_apis_common_v1_ReplicaStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { - return common.OpenAPIDefinition{ - Schema: spec.Schema{ - SchemaProps: spec.SchemaProps{ - Description: "ReplicaStatus represents the current observed state of the replica.", - Type: []string{"object"}, - Properties: map[string]spec.Schema{ - "active": { - SchemaProps: spec.SchemaProps{ - Description: "The number of actively running pods.", - Type: []string{"integer"}, - Format: "int32", - }, - }, - "succeeded": { - SchemaProps: spec.SchemaProps{ - Description: "The number of pods which reached phase Succeeded.", - Type: []string{"integer"}, - Format: "int32", - }, - }, - "failed": { - SchemaProps: spec.SchemaProps{ - Description: "The number of pods which reached phase Failed.", - Type: []string{"integer"}, - Format: "int32", - }, - }, - "labelSelector": { - SchemaProps: spec.SchemaProps{ - Description: "Deprecated: Use Selector instead", - Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector"), - }, - }, - "selector": { - SchemaProps: spec.SchemaProps{ - Description: "A Selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty Selector matches all objects. A null Selector matches no objects.", - Type: []string{"string"}, - Format: "", - }, - }, - }, - }, - }, - Dependencies: []string{ - "k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector"}, - } -} - -func schema_pkg_apis_common_v1_RunPolicy(ref common.ReferenceCallback) common.OpenAPIDefinition { - return common.OpenAPIDefinition{ - Schema: spec.Schema{ - SchemaProps: spec.SchemaProps{ - Description: "RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active.", - Type: []string{"object"}, - Properties: map[string]spec.Schema{ - "cleanPodPolicy": { - SchemaProps: spec.SchemaProps{ - Description: "CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running.", - Type: []string{"string"}, - Format: "", - }, - }, - "ttlSecondsAfterFinished": { - SchemaProps: spec.SchemaProps{ - Description: "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.", - Type: []string{"integer"}, - Format: "int32", - }, - }, - "activeDeadlineSeconds": { - SchemaProps: spec.SchemaProps{ - Description: "Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer.", - Type: []string{"integer"}, - Format: "int64", - }, - }, - "backoffLimit": { - SchemaProps: spec.SchemaProps{ - Description: "Optional number of retries before marking this job failed.", - Type: []string{"integer"}, - Format: "int32", - }, - }, - "schedulingPolicy": { - SchemaProps: spec.SchemaProps{ - Description: "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling", - Ref: ref("github.com/kubeflow/common/pkg/apis/common/v1.SchedulingPolicy"), - }, - }, - }, - }, - }, - Dependencies: []string{ - "github.com/kubeflow/common/pkg/apis/common/v1.SchedulingPolicy"}, - } -} - -func schema_pkg_apis_common_v1_SchedulingPolicy(ref common.ReferenceCallback) common.OpenAPIDefinition { - return common.OpenAPIDefinition{ - Schema: spec.Schema{ - SchemaProps: spec.SchemaProps{ - Description: "SchedulingPolicy encapsulates various scheduling policies of the distributed training job, for example `minAvailable` for gang-scheduling.", - Type: []string{"object"}, - Properties: map[string]spec.Schema{ - "minAvailable": { - SchemaProps: spec.SchemaProps{ - Type: []string{"integer"}, - Format: "int32", - }, - }, - "queue": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, - "minResources": { - SchemaProps: spec.SchemaProps{ - Type: []string{"object"}, - AdditionalProperties: &spec.SchemaOrBool{ - Allows: true, - Schema: &spec.Schema{ - SchemaProps: spec.SchemaProps{ - Default: map[string]interface{}{}, - Ref: ref("k8s.io/apimachinery/pkg/api/resource.Quantity"), - }, - }, - }, - }, - }, - "priorityClass": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, - "scheduleTimeoutSeconds": { - SchemaProps: spec.SchemaProps{ - Type: []string{"integer"}, - Format: "int32", - }, - }, - }, }, }, Dependencies: []string{ - "k8s.io/apimachinery/pkg/api/resource.Quantity"}, + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.JobCondition", "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.ReplicaStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, } } @@ -386,14 +204,14 @@ func schema_pkg_apis_kubeflow_v2beta1_MPIJob(ref common.ReferenceCallback) commo "status": { SchemaProps: spec.SchemaProps{ Default: map[string]interface{}{}, - Ref: ref("github.com/kubeflow/common/pkg/apis/common/v1.JobStatus"), + Ref: ref("github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.JobStatus"), }, }, }, }, }, Dependencies: []string{ - "github.com/kubeflow/common/pkg/apis/common/v1.JobStatus", "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJobSpec", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"}, + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.JobStatus", "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.MPIJobSpec", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"}, } } @@ -458,13 +276,6 @@ func schema_pkg_apis_kubeflow_v2beta1_MPIJobSpec(ref common.ReferenceCallback) c Format: "int32", }, }, - "launcherCreationPolicy": { - SchemaProps: spec.SchemaProps{ - Description: "launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state", - Type: []string{"string"}, - Format: "", - }, - }, "runPolicy": { SchemaProps: spec.SchemaProps{ Description: "RunPolicy encapsulates various runtime policies of the job.", @@ -480,7 +291,7 @@ func schema_pkg_apis_kubeflow_v2beta1_MPIJobSpec(ref common.ReferenceCallback) c Allows: true, Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ - Ref: ref("github.com/kubeflow/common/pkg/apis/common/v1.ReplicaSpec"), + Ref: ref("github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.ReplicaSpec"), }, }, }, @@ -493,6 +304,13 @@ func schema_pkg_apis_kubeflow_v2beta1_MPIJobSpec(ref common.ReferenceCallback) c Format: "", }, }, + "launcherCreationPolicy": { + SchemaProps: spec.SchemaProps{ + Description: "launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state. Defaults to AtStartup.", + Type: []string{"string"}, + Format: "", + }, + }, "mpiImplementation": { SchemaProps: spec.SchemaProps{ Description: "MPIImplementation is the MPI implementation. Options are \"OpenMPI\" (default), \"Intel\" and \"MPICH\".", @@ -505,7 +323,92 @@ func schema_pkg_apis_kubeflow_v2beta1_MPIJobSpec(ref common.ReferenceCallback) c }, }, Dependencies: []string{ - "github.com/kubeflow/common/pkg/apis/common/v1.ReplicaSpec", "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.RunPolicy"}, + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.ReplicaSpec", "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.RunPolicy"}, + } +} + +func schema_pkg_apis_kubeflow_v2beta1_ReplicaSpec(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "ReplicaSpec is a description of the replica", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "replicas": { + SchemaProps: spec.SchemaProps{ + Description: "Replicas is the desired number of replicas of the given template. If unspecified, defaults to 1.", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "template": { + SchemaProps: spec.SchemaProps{ + Description: "Template is the object that describes the pod that will be created for this replica. RestartPolicy in PodTemplateSpec will be overide by RestartPolicy in ReplicaSpec", + Default: map[string]interface{}{}, + Ref: ref("k8s.io/api/core/v1.PodTemplateSpec"), + }, + }, + "restartPolicy": { + SchemaProps: spec.SchemaProps{ + Description: "Restart policy for all replicas within the job. One of Always, OnFailure, Never and ExitCode. Default to Never.", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + Dependencies: []string{ + "k8s.io/api/core/v1.PodTemplateSpec"}, + } +} + +func schema_pkg_apis_kubeflow_v2beta1_ReplicaStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "ReplicaStatus represents the current observed state of the replica.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "active": { + SchemaProps: spec.SchemaProps{ + Description: "The number of actively running pods.", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "succeeded": { + SchemaProps: spec.SchemaProps{ + Description: "The number of pods which reached phase succeeded.", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "failed": { + SchemaProps: spec.SchemaProps{ + Description: "The number of pods which reached phase failed.", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "labelSelector": { + SchemaProps: spec.SchemaProps{ + Description: "Deprecated: Use selector instead", + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector"), + }, + }, + "selector": { + SchemaProps: spec.SchemaProps{ + Description: "A selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty selector matches all objects. A null selector matches no objects.", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector"}, } } @@ -569,24 +472,27 @@ func schema_pkg_apis_kubeflow_v2beta1_SchedulingPolicy(ref common.ReferenceCallb return common.OpenAPIDefinition{ Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ - Description: "SchedulingPolicy encapsulates various scheduling policies of the distributed training job, for example `minAvailable` for gang-scheduling.", + Description: "SchedulingPolicy encapsulates various scheduling policies of the distributed training job, for example `minAvailable` for gang-scheduling. Now, it supports only for volcano and scheduler-plugins.", Type: []string{"object"}, Properties: map[string]spec.Schema{ "minAvailable": { SchemaProps: spec.SchemaProps{ - Type: []string{"integer"}, - Format: "int32", + Description: "MinAvailable defines the minimal number of member to run the PodGroup. If the gang-scheduling isn't empty, input is passed to `.spec.minMember` in PodGroup. Note that, when using this field, you need to make sure the application supports resizing (e.g., Elastic Horovod).\n\nIf not set, it defaults to the number of workers.", + Type: []string{"integer"}, + Format: "int32", }, }, "queue": { SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", + Description: "Queue defines the queue name to allocate resource for PodGroup. If the gang-scheduling is set to the volcano, input is passed to `.spec.queue` in PodGroup for the volcano, and if it is set to the scheduler-plugins, input isn't passed to PodGroup.", + Type: []string{"string"}, + Format: "", }, }, "minResources": { SchemaProps: spec.SchemaProps{ - Type: []string{"object"}, + Description: "MinResources defines the minimal resources of members to run the PodGroup. If the gang-scheduling isn't empty, input is passed to `.spec.minResources` in PodGroup for scheduler-plugins.", + Type: []string{"object"}, AdditionalProperties: &spec.SchemaOrBool{ Allows: true, Schema: &spec.Schema{ @@ -600,14 +506,16 @@ func schema_pkg_apis_kubeflow_v2beta1_SchedulingPolicy(ref common.ReferenceCallb }, "priorityClass": { SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", + Description: "PriorityClass defines the PodGroup's PriorityClass. If the gang-scheduling is set to the volcano, input is passed to `.spec.priorityClassName` in PodGroup for volcano, and if it is set to the scheduler-plugins, input isn't passed to PodGroup for scheduler-plugins.", + Type: []string{"string"}, + Format: "", }, }, "scheduleTimeoutSeconds": { SchemaProps: spec.SchemaProps{ - Type: []string{"integer"}, - Format: "int32", + Description: "SchedulerTimeoutSeconds defines the maximal time of members to wait before run the PodGroup. If the gang-scheduling is set to the scheduler-plugins, input is passed to `.spec.scheduleTimeoutSeconds` in PodGroup for the scheduler-plugins, and if it is set to the volcano, input isn't passed to PodGroup.", + Type: []string{"integer"}, + Format: "int32", }, }, }, diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index b53eb88bd..283f2125e 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -7,186 +7,6 @@ }, "paths": {}, "definitions": { - "v1.JobCondition": { - "description": "JobCondition describes the state of the job at a certain point.", - "type": "object", - "required": [ - "type", - "status" - ], - "properties": { - "lastTransitionTime": { - "description": "Last time the condition transitioned from one status to another.", - "default": {}, - "$ref": "#/definitions/v1.Time" - }, - "lastUpdateTime": { - "description": "The last time this condition was updated.", - "default": {}, - "$ref": "#/definitions/v1.Time" - }, - "message": { - "description": "A human readable message indicating details about the transition.", - "type": "string" - }, - "reason": { - "description": "The reason for the condition's last transition.", - "type": "string" - }, - "status": { - "description": "Status of the condition, one of True, False, Unknown.", - "type": "string", - "default": "" - }, - "type": { - "description": "Type of job condition.", - "type": "string", - "default": "" - } - } - }, - "v1.JobStatus": { - "description": "JobStatus represents the current observed state of the training Job.", - "type": "object", - "required": [ - "conditions", - "replicaStatuses" - ], - "properties": { - "completionTime": { - "description": "Represents time when the job was completed. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", - "$ref": "#/definitions/v1.Time" - }, - "conditions": { - "description": "Conditions is an array of current observed job conditions.", - "type": "array", - "items": { - "default": {}, - "$ref": "#/definitions/v1.JobCondition" - } - }, - "lastReconcileTime": { - "description": "Represents last time when the job was reconciled. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", - "$ref": "#/definitions/v1.Time" - }, - "replicaStatuses": { - "description": "ReplicaStatuses is map of ReplicaType and ReplicaStatus, specifies the status of each replica.", - "type": "object", - "additionalProperties": { - "$ref": "#/definitions/v1.ReplicaStatus" - } - }, - "startTime": { - "description": "Represents time when the job was acknowledged by the job controller. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", - "$ref": "#/definitions/v1.Time" - } - } - }, - "v1.ReplicaSpec": { - "description": "ReplicaSpec is a description of the replica", - "type": "object", - "properties": { - "replicas": { - "description": "Replicas is the desired number of replicas of the given template. If unspecified, defaults to 1.", - "type": "integer", - "format": "int32" - }, - "restartPolicy": { - "description": "Restart policy for all replicas within the job. One of Always, OnFailure, Never and ExitCode. Default to Never.", - "type": "string" - }, - "template": { - "description": "Template is the object that describes the pod that will be created for this replica. RestartPolicy in PodTemplateSpec will be overide by RestartPolicy in ReplicaSpec", - "default": {}, - "$ref": "#/definitions/v1.PodTemplateSpec" - } - } - }, - "v1.ReplicaStatus": { - "description": "ReplicaStatus represents the current observed state of the replica.", - "type": "object", - "properties": { - "active": { - "description": "The number of actively running pods.", - "type": "integer", - "format": "int32" - }, - "failed": { - "description": "The number of pods which reached phase Failed.", - "type": "integer", - "format": "int32" - }, - "labelSelector": { - "description": "Deprecated: Use Selector instead", - "$ref": "#/definitions/v1.LabelSelector" - }, - "selector": { - "description": "A Selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty Selector matches all objects. A null Selector matches no objects.", - "type": "string" - }, - "succeeded": { - "description": "The number of pods which reached phase Succeeded.", - "type": "integer", - "format": "int32" - } - } - }, - "v1.RunPolicy": { - "description": "RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active.", - "type": "object", - "properties": { - "activeDeadlineSeconds": { - "description": "Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer.", - "type": "integer", - "format": "int64" - }, - "backoffLimit": { - "description": "Optional number of retries before marking this job failed.", - "type": "integer", - "format": "int32" - }, - "cleanPodPolicy": { - "description": "CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running.", - "type": "string" - }, - "schedulingPolicy": { - "description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling", - "$ref": "#/definitions/v1.SchedulingPolicy" - }, - "ttlSecondsAfterFinished": { - "description": "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.", - "type": "integer", - "format": "int32" - } - } - }, - "v1.SchedulingPolicy": { - "description": "SchedulingPolicy encapsulates various scheduling policies of the distributed training job, for example `minAvailable` for gang-scheduling.", - "type": "object", - "properties": { - "minAvailable": { - "type": "integer", - "format": "int32" - }, - "minResources": { - "type": "object", - "additionalProperties": { - "default": {}, - "$ref": "#/definitions/resource.Quantity" - } - }, - "priorityClass": { - "type": "string" - }, - "queue": { - "type": "string" - }, - "scheduleTimeoutSeconds": { - "type": "integer", - "format": "int32" - } - } - }, "v2beta1.JobCondition": { "description": "JobCondition describes the state of the job at a certain point.", "type": "object", @@ -333,7 +153,7 @@ "description": "MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run.", "type": "object", "additionalProperties": { - "$ref": "#/definitions/v1.ReplicaSpec" + "$ref": "#/definitions/v2beta1.ReplicaSpec" } }, "runPolicy": { @@ -352,6 +172,26 @@ } } }, + "v2beta1.ReplicaSpec": { + "description": "ReplicaSpec is a description of the replica", + "type": "object", + "properties": { + "replicas": { + "description": "Replicas is the desired number of replicas of the given template. If unspecified, defaults to 1.", + "type": "integer", + "format": "int32" + }, + "restartPolicy": { + "description": "Restart policy for all replicas within the job. One of Always, OnFailure, Never and ExitCode. Default to Never.", + "type": "string" + }, + "template": { + "description": "Template is the object that describes the pod that will be created for this replica. RestartPolicy in PodTemplateSpec will be overide by RestartPolicy in ReplicaSpec", + "default": {}, + "$ref": "#/definitions/v1.PodTemplateSpec" + } + } + }, "v2beta1.ReplicaStatus": { "description": "ReplicaStatus represents the current observed state of the replica.", "type": "object", diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 4bbf1b6ff..7525a053e 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -15,7 +15,6 @@ package v2beta1 import ( - common "github.com/kubeflow/common/pkg/apis/common/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -160,7 +159,7 @@ type MPIJobSpec struct { // MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that // specify the MPI replicas to run. - MPIReplicaSpecs map[MPIReplicaType]*common.ReplicaSpec `json:"mpiReplicaSpecs"` + MPIReplicaSpecs map[MPIReplicaType]*ReplicaSpec `json:"mpiReplicaSpecs"` // SSHAuthMountPath is the directory where SSH keys are mounted. // Defaults to "/root/.ssh". @@ -314,3 +313,45 @@ const ( // The training has failed its execution. JobFailed JobConditionType = "Failed" ) + +// Following is merge from common.v1 +// reference https://github.com/kubeflow/training-operator/blob/master/pkg/apis/kubeflow.org/v1/common_types.go + +// +k8s:openapi-gen=true +// +k8s:deepcopy-gen=true +// ReplicaSpec is a description of the replica +type ReplicaSpec struct { + // Replicas is the desired number of replicas of the given template. + // If unspecified, defaults to 1. + Replicas *int32 `json:"replicas,omitempty"` + + // Template is the object that describes the pod that + // will be created for this replica. RestartPolicy in PodTemplateSpec + // will be overide by RestartPolicy in ReplicaSpec + Template v1.PodTemplateSpec `json:"template,omitempty"` + + // Restart policy for all replicas within the job. + // One of Always, OnFailure, Never and ExitCode. + // Default to Never. + RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"` +} + +// +k8s:openapi-gen=true +// RestartPolicy describes how the replicas should be restarted. +// Only one of the following restart policies may be specified. +// If none of the following policies is specified, the default one +// is RestartPolicyAlways. +type RestartPolicy string + +const ( + RestartPolicyAlways RestartPolicy = "Always" + RestartPolicyOnFailure RestartPolicy = "OnFailure" + RestartPolicyNever RestartPolicy = "Never" + + // RestartPolicyExitCode policy means that user should add exit code by themselves, + // The job operator will check these exit codes to + // determine the behavior when an error occurs: + // - 1-127: permanent error, do not restart. + // - 128-255: retryable error, will restart the pod. + RestartPolicyExitCode RestartPolicy = "ExitCode" +) diff --git a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go index 55aca2ac6..058f0b536 100644 --- a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go @@ -20,10 +20,9 @@ package v2beta1 import ( - v1 "github.com/kubeflow/common/pkg/apis/common/v1" corev1 "k8s.io/api/core/v1" resource "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -167,14 +166,14 @@ func (in *MPIJobSpec) DeepCopyInto(out *MPIJobSpec) { in.RunPolicy.DeepCopyInto(&out.RunPolicy) if in.MPIReplicaSpecs != nil { in, out := &in.MPIReplicaSpecs, &out.MPIReplicaSpecs - *out = make(map[MPIReplicaType]*v1.ReplicaSpec, len(*in)) + *out = make(map[MPIReplicaType]*ReplicaSpec, len(*in)) for key, val := range *in { - var outVal *v1.ReplicaSpec + var outVal *ReplicaSpec if val == nil { (*out)[key] = nil } else { in, out := &val, &outVal - *out = new(v1.ReplicaSpec) + *out = new(ReplicaSpec) (*in).DeepCopyInto(*out) } (*out)[key] = outVal @@ -193,12 +192,34 @@ func (in *MPIJobSpec) DeepCopy() *MPIJobSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReplicaSpec) DeepCopyInto(out *ReplicaSpec) { + *out = *in + if in.Replicas != nil { + in, out := &in.Replicas, &out.Replicas + *out = new(int32) + **out = **in + } + in.Template.DeepCopyInto(&out.Template) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReplicaSpec. +func (in *ReplicaSpec) DeepCopy() *ReplicaSpec { + if in == nil { + return nil + } + out := new(ReplicaSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReplicaStatus) DeepCopyInto(out *ReplicaStatus) { *out = *in if in.LabelSelector != nil { in, out := &in.LabelSelector, &out.LabelSelector - *out = new(metav1.LabelSelector) + *out = new(v1.LabelSelector) (*in).DeepCopyInto(*out) } return diff --git a/pkg/apis/kubeflow/validation/validation.go b/pkg/apis/kubeflow/validation/validation.go index f39f1b417..8658683bd 100644 --- a/pkg/apis/kubeflow/validation/validation.go +++ b/pkg/apis/kubeflow/validation/validation.go @@ -23,7 +23,6 @@ import ( apimachineryvalidation "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation/field" - common "github.com/kubeflow/common/pkg/apis/common/v1" kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" ) @@ -39,8 +38,8 @@ var ( string(kubeflow.MPIImplementationMPICH)) validRestartPolicies = sets.NewString( - string(common.RestartPolicyNever), - string(common.RestartPolicyOnFailure), + string(kubeflow.RestartPolicyNever), + string(kubeflow.RestartPolicyOnFailure), ) ) @@ -102,7 +101,7 @@ func validateRunPolicy(policy *kubeflow.RunPolicy, path *field.Path) field.Error return errs } -func validateMPIReplicaSpecs(replicaSpecs map[kubeflow.MPIReplicaType]*common.ReplicaSpec, path *field.Path) field.ErrorList { +func validateMPIReplicaSpecs(replicaSpecs map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec, path *field.Path) field.ErrorList { var errs field.ErrorList if replicaSpecs == nil { errs = append(errs, field.Required(path, "must have replica specs")) @@ -113,7 +112,7 @@ func validateMPIReplicaSpecs(replicaSpecs map[kubeflow.MPIReplicaType]*common.Re return errs } -func validateLauncherReplicaSpec(spec *common.ReplicaSpec, path *field.Path) field.ErrorList { +func validateLauncherReplicaSpec(spec *kubeflow.ReplicaSpec, path *field.Path) field.ErrorList { var errs field.ErrorList if spec == nil { errs = append(errs, field.Required(path, fmt.Sprintf("must have %s replica spec", kubeflow.MPIReplicaTypeLauncher))) @@ -126,7 +125,7 @@ func validateLauncherReplicaSpec(spec *common.ReplicaSpec, path *field.Path) fie return errs } -func validateWorkerReplicaSpec(spec *common.ReplicaSpec, path *field.Path) field.ErrorList { +func validateWorkerReplicaSpec(spec *kubeflow.ReplicaSpec, path *field.Path) field.ErrorList { var errs field.ErrorList if spec == nil { return errs @@ -138,7 +137,7 @@ func validateWorkerReplicaSpec(spec *common.ReplicaSpec, path *field.Path) field return errs } -func validateReplicaSpec(spec *common.ReplicaSpec, path *field.Path) field.ErrorList { +func validateReplicaSpec(spec *kubeflow.ReplicaSpec, path *field.Path) field.ErrorList { var errs field.ErrorList if spec.Replicas == nil { errs = append(errs, field.Required(path.Child("replicas"), "must define number of replicas")) diff --git a/pkg/apis/kubeflow/validation/validation_test.go b/pkg/apis/kubeflow/validation/validation_test.go index b5f2d6d79..c0b45b5c8 100644 --- a/pkg/apis/kubeflow/validation/validation_test.go +++ b/pkg/apis/kubeflow/validation/validation_test.go @@ -19,7 +19,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - common "github.com/kubeflow/common/pkg/apis/common/v1" kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,10 +42,10 @@ func TestValidateMPIJob(t *testing.T) { }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: kubeflow.MPIImplementationIntel, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: newInt32(1), - RestartPolicy: common.RestartPolicyNever, + RestartPolicy: kubeflow.RestartPolicyNever, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{}}, @@ -69,10 +68,10 @@ func TestValidateMPIJob(t *testing.T) { }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: kubeflow.MPIImplementationIntel, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: newInt32(1), - RestartPolicy: common.RestartPolicyOnFailure, + RestartPolicy: kubeflow.RestartPolicyOnFailure, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{}}, @@ -81,7 +80,7 @@ func TestValidateMPIJob(t *testing.T) { }, kubeflow.MPIReplicaTypeWorker: { Replicas: newInt32(3), - RestartPolicy: common.RestartPolicyNever, + RestartPolicy: kubeflow.RestartPolicyNever, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{}}, @@ -104,10 +103,10 @@ func TestValidateMPIJob(t *testing.T) { }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: kubeflow.MPIImplementationMPICH, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: newInt32(1), - RestartPolicy: common.RestartPolicyNever, + RestartPolicy: kubeflow.RestartPolicyNever, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{}}, @@ -130,10 +129,10 @@ func TestValidateMPIJob(t *testing.T) { }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: kubeflow.MPIImplementationMPICH, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: newInt32(1), - RestartPolicy: common.RestartPolicyOnFailure, + RestartPolicy: kubeflow.RestartPolicyOnFailure, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{}}, @@ -142,7 +141,7 @@ func TestValidateMPIJob(t *testing.T) { }, kubeflow.MPIReplicaTypeWorker: { Replicas: newInt32(3), - RestartPolicy: common.RestartPolicyNever, + RestartPolicy: kubeflow.RestartPolicyNever, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{}}, @@ -196,10 +195,10 @@ func TestValidateMPIJob(t *testing.T) { }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: kubeflow.MPIImplementation("Unknown"), - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: newInt32(1), - RestartPolicy: common.RestartPolicyNever, + RestartPolicy: kubeflow.RestartPolicyNever, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{}}, @@ -208,7 +207,7 @@ func TestValidateMPIJob(t *testing.T) { }, kubeflow.MPIReplicaTypeWorker: { Replicas: newInt32(1000), - RestartPolicy: common.RestartPolicyNever, + RestartPolicy: kubeflow.RestartPolicyNever, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{}}, @@ -257,7 +256,7 @@ func TestValidateMPIJob(t *testing.T) { }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: kubeflow.MPIImplementationOpenMPI, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{}, + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{}, }, }, wantErrs: field.ErrorList{ @@ -279,7 +278,7 @@ func TestValidateMPIJob(t *testing.T) { }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: kubeflow.MPIImplementationOpenMPI, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: {}, kubeflow.MPIReplicaTypeWorker: {}, }, @@ -324,10 +323,10 @@ func TestValidateMPIJob(t *testing.T) { }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: kubeflow.MPIImplementationOpenMPI, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: newInt32(2), - RestartPolicy: common.RestartPolicyAlways, + RestartPolicy: kubeflow.RestartPolicyAlways, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{}}, @@ -377,10 +376,10 @@ func TestValidateMPIJob(t *testing.T) { }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: kubeflow.MPIImplementationIntel, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: newInt32(1), - RestartPolicy: common.RestartPolicyNever, + RestartPolicy: kubeflow.RestartPolicyNever, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{}}, diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 54de73dae..ba2e16d9f 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -28,7 +28,6 @@ import ( "strconv" "time" - common "github.com/kubeflow/common/pkg/apis/common/v1" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "golang.org/x/crypto/ssh" @@ -919,7 +918,7 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1 } if len(podFullList) > int(*worker.Replicas) { for _, pod := range podFullList { - indexStr, ok := pod.Labels[common.ReplicaIndexLabel] + indexStr, ok := pod.Labels[kubeflow.ReplicaIndexLabel] if !ok { return nil, err } @@ -1371,7 +1370,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1 for key, value := range defaultLabels(mpiJob.Name, worker) { podTemplate.Labels[key] = value } - podTemplate.Labels[common.ReplicaIndexLabel] = strconv.Itoa(index) + podTemplate.Labels[kubeflow.ReplicaIndexLabel] = strconv.Itoa(index) podTemplate.Spec.Hostname = name podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name. if podTemplate.Spec.HostNetwork { @@ -1550,8 +1549,8 @@ func countRunningPods(pods []*corev1.Pod) int { return running } -func setRestartPolicy(podTemplateSpec *corev1.PodTemplateSpec, spec *common.ReplicaSpec) { - if spec.RestartPolicy == common.RestartPolicyExitCode { +func setRestartPolicy(podTemplateSpec *corev1.PodTemplateSpec, spec *kubeflow.ReplicaSpec) { + if spec.RestartPolicy == kubeflow.RestartPolicyExitCode { podTemplateSpec.Spec.RestartPolicy = corev1.RestartPolicyNever } else { podTemplateSpec.Spec.RestartPolicy = corev1.RestartPolicy(spec.RestartPolicy) @@ -1602,9 +1601,9 @@ func isCleanUpPods(cleanPodPolicy *kubeflow.CleanPodPolicy) bool { func defaultLabels(jobName, role string) map[string]string { return map[string]string{ - common.OperatorNameLabel: kubeflow.OperatorName, - common.JobNameLabel: jobName, - common.JobRoleLabel: role, + kubeflow.OperatorNameLabel: kubeflow.OperatorName, + kubeflow.JobNameLabel: jobName, + kubeflow.JobRoleLabel: role, } } diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 5aaa4789f..c1c245446 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -22,7 +22,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - common "github.com/kubeflow/common/pkg/apis/common/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" @@ -110,7 +109,7 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: &cleanPodPolicyAll, }, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeWorker: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -1212,7 +1211,7 @@ func TestNewLauncherAndWorker(t *testing.T) { Namespace: "bar", }, Spec: kubeflow.MPIJobSpec{ - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -1242,9 +1241,9 @@ func TestNewLauncherAndWorker(t *testing.T) { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - common.OperatorNameLabel: kubeflow.OperatorName, - common.JobNameLabel: "foo", - common.JobRoleLabel: "launcher", + kubeflow.OperatorNameLabel: kubeflow.OperatorName, + kubeflow.JobNameLabel: "foo", + kubeflow.JobRoleLabel: "launcher", }, }, Spec: corev1.PodSpec{ @@ -1296,10 +1295,10 @@ func TestNewLauncherAndWorker(t *testing.T) { Name: "foo-worker-0", Namespace: "bar", Labels: map[string]string{ - common.OperatorNameLabel: kubeflow.OperatorName, - common.JobNameLabel: "foo", - common.JobRoleLabel: "worker", - common.ReplicaIndexLabel: "0", + kubeflow.OperatorNameLabel: kubeflow.OperatorName, + kubeflow.JobNameLabel: "foo", + kubeflow.JobRoleLabel: "worker", + kubeflow.ReplicaIndexLabel: "0", }, }, Spec: corev1.PodSpec{ @@ -1345,9 +1344,9 @@ func TestNewLauncherAndWorker(t *testing.T) { ActiveDeadlineSeconds: newInt64(2), BackoffLimit: newInt32(3), }, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { - RestartPolicy: common.RestartPolicyOnFailure, + RestartPolicy: kubeflow.RestartPolicyOnFailure, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{"foo": "bar"}, @@ -1408,10 +1407,10 @@ func TestNewLauncherAndWorker(t *testing.T) { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "foo": "bar", - common.OperatorNameLabel: kubeflow.OperatorName, - common.JobNameLabel: "bar", - common.JobRoleLabel: "launcher", + "foo": "bar", + kubeflow.OperatorNameLabel: kubeflow.OperatorName, + kubeflow.JobNameLabel: "bar", + kubeflow.JobRoleLabel: "launcher", }, }, Spec: corev1.PodSpec{ @@ -1471,10 +1470,10 @@ func TestNewLauncherAndWorker(t *testing.T) { Name: "bar-worker-12", Namespace: "foo", Labels: map[string]string{ - common.OperatorNameLabel: kubeflow.OperatorName, - common.JobNameLabel: "bar", - common.JobRoleLabel: "worker", - common.ReplicaIndexLabel: "12", + kubeflow.OperatorNameLabel: kubeflow.OperatorName, + kubeflow.JobNameLabel: "bar", + kubeflow.JobRoleLabel: "worker", + kubeflow.ReplicaIndexLabel: "12", }, }, Spec: corev1.PodSpec{ diff --git a/pkg/controller/podgroup.go b/pkg/controller/podgroup.go index 90d8e6a7a..c01d691b8 100644 --- a/pkg/controller/podgroup.go +++ b/pkg/controller/podgroup.go @@ -19,7 +19,6 @@ import ( "sort" "github.com/google/go-cmp/cmp" - common "github.com/kubeflow/common/pkg/apis/common/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -399,7 +398,7 @@ func calculateMinAvailable(mpiJob *kubeflow.MPIJob) *int32 { // 2. .spec.mpiReplicaSecs[Launcher].template.spec.priorityClassName // 3. .spec.mpiReplicaSecs[Worker].template.spec.priorityClassName func calculatePriorityClassName( - replicas map[kubeflow.MPIReplicaType]*common.ReplicaSpec, + replicas map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec, schedulingPolicy *kubeflow.SchedulingPolicy, ) string { if schedulingPolicy != nil && len(schedulingPolicy.PriorityClass) != 0 { @@ -443,7 +442,7 @@ type replicaPriority struct { priority int32 replicaType kubeflow.MPIReplicaType - common.ReplicaSpec + kubeflow.ReplicaSpec } type replicasOrder []replicaPriority diff --git a/pkg/controller/podgroup_test.go b/pkg/controller/podgroup_test.go index b1282f966..2301b8aaa 100644 --- a/pkg/controller/podgroup_test.go +++ b/pkg/controller/podgroup_test.go @@ -20,7 +20,6 @@ import ( "testing" "github.com/google/go-cmp/cmp" - common "github.com/kubeflow/common/pkg/apis/common/v1" corev1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -65,7 +64,7 @@ func TestNewPodGroup(t *testing.T) { ScheduleTimeoutSeconds: pointer.Int32(100), }, }, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: pointer.Int32(1), Template: corev1.PodTemplateSpec{ @@ -138,7 +137,7 @@ func TestNewPodGroup(t *testing.T) { }, }, Spec: kubeflow.MPIJobSpec{ - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: pointer.Int32(1), Template: corev1.PodTemplateSpec{ @@ -238,19 +237,19 @@ func TestNewPodGroup(t *testing.T) { func TestCalcPriorityClassName(t *testing.T) { testCases := map[string]struct { - replicas map[kubeflow.MPIReplicaType]*common.ReplicaSpec + replicas map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec sp *kubeflow.SchedulingPolicy wantPCName string }{ "use schedulingPolicy": { - replicas: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{}, + replicas: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{}, sp: &kubeflow.SchedulingPolicy{ PriorityClass: "high", }, wantPCName: "high", }, "use launcher": { - replicas: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + replicas: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -269,7 +268,7 @@ func TestCalcPriorityClassName(t *testing.T) { wantPCName: "high", }, "use worker": { - replicas: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + replicas: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{}, @@ -286,7 +285,7 @@ func TestCalcPriorityClassName(t *testing.T) { wantPCName: "low", }, "nothing": { - replicas: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + replicas: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{}, @@ -405,7 +404,7 @@ func TestCalculatePGMinResources(t *testing.T) { Name: "test", }, Spec: kubeflow.MPIJobSpec{ - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: pointer.Int32(1), Template: corev1.PodTemplateSpec{ @@ -502,7 +501,7 @@ func TestCalculatePGMinResources(t *testing.T) { Name: "test", }, Spec: kubeflow.MPIJobSpec{ - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: pointer.Int32(1), Template: corev1.PodTemplateSpec{ @@ -560,7 +559,7 @@ func TestCalculatePGMinResources(t *testing.T) { Name: "test", }, Spec: kubeflow.MPIJobSpec{ - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: pointer.Int32(1), Template: corev1.PodTemplateSpec{ @@ -626,7 +625,7 @@ func TestCalculatePGMinResources(t *testing.T) { Name: "test", }, Spec: kubeflow.MPIJobSpec{ - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: pointer.Int32(1), Template: corev1.PodTemplateSpec{ @@ -778,7 +777,7 @@ func TestCalculateMinAvailable(t *testing.T) { MinAvailable: pointer.Int32(2), }, }, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: pointer.Int32(1), }, @@ -796,7 +795,7 @@ func TestCalculateMinAvailable(t *testing.T) { Name: "test", }, Spec: kubeflow.MPIJobSpec{ - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Replicas: pointer.Int32(1), }, @@ -827,22 +826,22 @@ func TestReplicasOrder(t *testing.T) { }{ "1-lancher, 2-worker, lancher higher priority": { original: replicasOrder{ - {priority: 1, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}}, - {priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}}, + {priority: 1, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: kubeflow.ReplicaSpec{Replicas: &lancherReplic}}, + {priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: kubeflow.ReplicaSpec{Replicas: &wokerReplic}}, }, expected: replicasOrder{ - {priority: 1, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}}, - {priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}}, + {priority: 1, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: kubeflow.ReplicaSpec{Replicas: &lancherReplic}}, + {priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: kubeflow.ReplicaSpec{Replicas: &wokerReplic}}, }, }, "1-lancher, 2-worker, equal priority": { original: replicasOrder{ - {priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}}, - {priority: 0, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}}, + {priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: kubeflow.ReplicaSpec{Replicas: &wokerReplic}}, + {priority: 0, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: kubeflow.ReplicaSpec{Replicas: &lancherReplic}}, }, expected: replicasOrder{ - {priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}}, - {priority: 0, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}}, + {priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: kubeflow.ReplicaSpec{Replicas: &wokerReplic}}, + {priority: 0, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: kubeflow.ReplicaSpec{Replicas: &lancherReplic}}, }, }, } diff --git a/sdk/python/v2beta1/README.md b/sdk/python/v2beta1/README.md index a2e817fd1..8e53720f8 100644 --- a/sdk/python/v2beta1/README.md +++ b/sdk/python/v2beta1/README.md @@ -64,17 +64,12 @@ Class | Method | HTTP request | Description ## Documentation For Models - - [V1JobCondition](docs/V1JobCondition.md) - - [V1JobStatus](docs/V1JobStatus.md) - - [V1ReplicaSpec](docs/V1ReplicaSpec.md) - - [V1ReplicaStatus](docs/V1ReplicaStatus.md) - - [V1RunPolicy](docs/V1RunPolicy.md) - - [V1SchedulingPolicy](docs/V1SchedulingPolicy.md) - [V2beta1JobCondition](docs/V2beta1JobCondition.md) - [V2beta1JobStatus](docs/V2beta1JobStatus.md) - [V2beta1MPIJob](docs/V2beta1MPIJob.md) - [V2beta1MPIJobList](docs/V2beta1MPIJobList.md) - [V2beta1MPIJobSpec](docs/V2beta1MPIJobSpec.md) + - [V2beta1ReplicaSpec](docs/V2beta1ReplicaSpec.md) - [V2beta1ReplicaStatus](docs/V2beta1ReplicaStatus.md) - [V2beta1RunPolicy](docs/V2beta1RunPolicy.md) - [V2beta1SchedulingPolicy](docs/V2beta1SchedulingPolicy.md) diff --git a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md index 324d7a245..44d2ef04e 100644 --- a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md +++ b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md @@ -6,7 +6,7 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **launcher_creation_policy** | **str** | launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state. Defaults to AtStartup. | [optional] **mpi_implementation** | **str** | MPIImplementation is the MPI implementation. Options are \"OpenMPI\" (default), \"Intel\" and \"MPICH\". | [optional] -**mpi_replica_specs** | [**dict(str, V1ReplicaSpec)**](V1ReplicaSpec.md) | MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. | +**mpi_replica_specs** | [**dict(str, V2beta1ReplicaSpec)**](V2beta1ReplicaSpec.md) | MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. | **run_policy** | [**V2beta1RunPolicy**](V2beta1RunPolicy.md) | | [optional] **slots_per_worker** | **int** | Specifies the number of slots per worker used in hostfile. Defaults to 1. | [optional] **ssh_auth_mount_path** | **str** | SSHAuthMountPath is the directory where SSH keys are mounted. Defaults to \"/root/.ssh\". | [optional] diff --git a/sdk/python/v2beta1/docs/V2beta1ReplicaSpec.md b/sdk/python/v2beta1/docs/V2beta1ReplicaSpec.md new file mode 100644 index 000000000..a863d43c3 --- /dev/null +++ b/sdk/python/v2beta1/docs/V2beta1ReplicaSpec.md @@ -0,0 +1,14 @@ +# V2beta1ReplicaSpec + +ReplicaSpec is a description of the replica + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**replicas** | **int** | Replicas is the desired number of replicas of the given template. If unspecified, defaults to 1. | [optional] +**restart_policy** | **str** | Restart policy for all replicas within the job. One of Always, OnFailure, Never and ExitCode. Default to Never. | [optional] +**template** | [**V1PodTemplateSpec**](V1PodTemplateSpec.md) | | [optional] + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/sdk/python/v2beta1/mpijob/__init__.py b/sdk/python/v2beta1/mpijob/__init__.py index 1e7c84993..4d6a7cc36 100644 --- a/sdk/python/v2beta1/mpijob/__init__.py +++ b/sdk/python/v2beta1/mpijob/__init__.py @@ -28,17 +28,12 @@ from mpijob.exceptions import ApiAttributeError from mpijob.exceptions import ApiException # import models into sdk package -from mpijob.models.v1_job_condition import V1JobCondition -from mpijob.models.v1_job_status import V1JobStatus -from mpijob.models.v1_replica_spec import V1ReplicaSpec -from mpijob.models.v1_replica_status import V1ReplicaStatus -from mpijob.models.v1_run_policy import V1RunPolicy -from mpijob.models.v1_scheduling_policy import V1SchedulingPolicy from mpijob.models.v2beta1_job_condition import V2beta1JobCondition from mpijob.models.v2beta1_job_status import V2beta1JobStatus from mpijob.models.v2beta1_mpi_job import V2beta1MPIJob from mpijob.models.v2beta1_mpi_job_list import V2beta1MPIJobList from mpijob.models.v2beta1_mpi_job_spec import V2beta1MPIJobSpec +from mpijob.models.v2beta1_replica_spec import V2beta1ReplicaSpec from mpijob.models.v2beta1_replica_status import V2beta1ReplicaStatus from mpijob.models.v2beta1_run_policy import V2beta1RunPolicy from mpijob.models.v2beta1_scheduling_policy import V2beta1SchedulingPolicy diff --git a/sdk/python/v2beta1/mpijob/models/__init__.py b/sdk/python/v2beta1/mpijob/models/__init__.py index f1d1b4dbb..468e474bb 100644 --- a/sdk/python/v2beta1/mpijob/models/__init__.py +++ b/sdk/python/v2beta1/mpijob/models/__init__.py @@ -14,17 +14,12 @@ from __future__ import absolute_import # import models into model package -from mpijob.models.v1_job_condition import V1JobCondition -from mpijob.models.v1_job_status import V1JobStatus -from mpijob.models.v1_replica_spec import V1ReplicaSpec -from mpijob.models.v1_replica_status import V1ReplicaStatus -from mpijob.models.v1_run_policy import V1RunPolicy -from mpijob.models.v1_scheduling_policy import V1SchedulingPolicy from mpijob.models.v2beta1_job_condition import V2beta1JobCondition from mpijob.models.v2beta1_job_status import V2beta1JobStatus from mpijob.models.v2beta1_mpi_job import V2beta1MPIJob from mpijob.models.v2beta1_mpi_job_list import V2beta1MPIJobList from mpijob.models.v2beta1_mpi_job_spec import V2beta1MPIJobSpec +from mpijob.models.v2beta1_replica_spec import V2beta1ReplicaSpec from mpijob.models.v2beta1_replica_status import V2beta1ReplicaStatus from mpijob.models.v2beta1_run_policy import V2beta1RunPolicy from mpijob.models.v2beta1_scheduling_policy import V2beta1SchedulingPolicy diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py index 081ba64d8..68656d763 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py @@ -35,7 +35,7 @@ class V2beta1MPIJobSpec(object): openapi_types = { 'launcher_creation_policy': 'str', 'mpi_implementation': 'str', - 'mpi_replica_specs': 'dict(str, V1ReplicaSpec)', + 'mpi_replica_specs': 'dict(str, V2beta1ReplicaSpec)', 'run_policy': 'V2beta1RunPolicy', 'slots_per_worker': 'int', 'ssh_auth_mount_path': 'str' @@ -129,7 +129,7 @@ def mpi_replica_specs(self): MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. # noqa: E501 :return: The mpi_replica_specs of this V2beta1MPIJobSpec. # noqa: E501 - :rtype: dict(str, V1ReplicaSpec) + :rtype: dict(str, V2beta1ReplicaSpec) """ return self._mpi_replica_specs @@ -140,7 +140,7 @@ def mpi_replica_specs(self, mpi_replica_specs): MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that specify the MPI replicas to run. # noqa: E501 :param mpi_replica_specs: The mpi_replica_specs of this V2beta1MPIJobSpec. # noqa: E501 - :type mpi_replica_specs: dict(str, V1ReplicaSpec) + :type mpi_replica_specs: dict(str, V2beta1ReplicaSpec) """ if self.local_vars_configuration.client_side_validation and mpi_replica_specs is None: # noqa: E501 raise ValueError("Invalid value for `mpi_replica_specs`, must not be `None`") # noqa: E501 diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_replica_spec.py b/sdk/python/v2beta1/mpijob/models/v2beta1_replica_spec.py new file mode 100644 index 000000000..f055b3a02 --- /dev/null +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_replica_spec.py @@ -0,0 +1,184 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +import inspect +import pprint +import re # noqa: F401 +import six + +from mpijob.configuration import Configuration + + +class V2beta1ReplicaSpec(object): + """NOTE: This class is auto generated by OpenAPI Generator. + Ref: https://openapi-generator.tech + + Do not edit the class manually. + """ + + """ + Attributes: + openapi_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. + """ + openapi_types = { + 'replicas': 'int', + 'restart_policy': 'str', + 'template': 'V1PodTemplateSpec' + } + + attribute_map = { + 'replicas': 'replicas', + 'restart_policy': 'restartPolicy', + 'template': 'template' + } + + def __init__(self, replicas=None, restart_policy=None, template=None, local_vars_configuration=None): # noqa: E501 + """V2beta1ReplicaSpec - a model defined in OpenAPI""" # noqa: E501 + if local_vars_configuration is None: + local_vars_configuration = Configuration.get_default_copy() + self.local_vars_configuration = local_vars_configuration + + self._replicas = None + self._restart_policy = None + self._template = None + self.discriminator = None + + if replicas is not None: + self.replicas = replicas + if restart_policy is not None: + self.restart_policy = restart_policy + if template is not None: + self.template = template + + @property + def replicas(self): + """Gets the replicas of this V2beta1ReplicaSpec. # noqa: E501 + + Replicas is the desired number of replicas of the given template. If unspecified, defaults to 1. # noqa: E501 + + :return: The replicas of this V2beta1ReplicaSpec. # noqa: E501 + :rtype: int + """ + return self._replicas + + @replicas.setter + def replicas(self, replicas): + """Sets the replicas of this V2beta1ReplicaSpec. + + Replicas is the desired number of replicas of the given template. If unspecified, defaults to 1. # noqa: E501 + + :param replicas: The replicas of this V2beta1ReplicaSpec. # noqa: E501 + :type replicas: int + """ + + self._replicas = replicas + + @property + def restart_policy(self): + """Gets the restart_policy of this V2beta1ReplicaSpec. # noqa: E501 + + Restart policy for all replicas within the job. One of Always, OnFailure, Never and ExitCode. Default to Never. # noqa: E501 + + :return: The restart_policy of this V2beta1ReplicaSpec. # noqa: E501 + :rtype: str + """ + return self._restart_policy + + @restart_policy.setter + def restart_policy(self, restart_policy): + """Sets the restart_policy of this V2beta1ReplicaSpec. + + Restart policy for all replicas within the job. One of Always, OnFailure, Never and ExitCode. Default to Never. # noqa: E501 + + :param restart_policy: The restart_policy of this V2beta1ReplicaSpec. # noqa: E501 + :type restart_policy: str + """ + + self._restart_policy = restart_policy + + @property + def template(self): + """Gets the template of this V2beta1ReplicaSpec. # noqa: E501 + + + :return: The template of this V2beta1ReplicaSpec. # noqa: E501 + :rtype: V1PodTemplateSpec + """ + return self._template + + @template.setter + def template(self, template): + """Sets the template of this V2beta1ReplicaSpec. + + + :param template: The template of this V2beta1ReplicaSpec. # noqa: E501 + :type template: V1PodTemplateSpec + """ + + self._template = template + + def to_dict(self, serialize=False): + """Returns the model properties as a dict""" + result = {} + + def convert(x): + if hasattr(x, "to_dict"): + args = inspect.getargspec(x.to_dict).args + if len(args) == 1: + return x.to_dict() + else: + return x.to_dict(serialize) + else: + return x + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + attr = self.attribute_map.get(attr, attr) if serialize else attr + if isinstance(value, list): + result[attr] = list(map( + lambda x: convert(x), + value + )) + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], convert(item[1])), + value.items() + )) + else: + result[attr] = convert(value) + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V2beta1ReplicaSpec): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V2beta1ReplicaSpec): + return True + + return self.to_dict() != other.to_dict() diff --git a/sdk/python/v2beta1/test/test_v2beta1_replica_spec.py b/sdk/python/v2beta1/test/test_v2beta1_replica_spec.py new file mode 100644 index 000000000..7ba4cd54a --- /dev/null +++ b/sdk/python/v2beta1/test/test_v2beta1_replica_spec.py @@ -0,0 +1,53 @@ +# coding: utf-8 + +""" + mpijob + + Python SDK for MPI-Operator # noqa: E501 + + The version of the OpenAPI document: v2beta1 + Generated by: https://openapi-generator.tech +""" + + +from __future__ import absolute_import + +import unittest +import datetime + +import mpijob +from mpijob.models.v2beta1_replica_spec import V2beta1ReplicaSpec # noqa: E501 +from mpijob.rest import ApiException + +class TestV2beta1ReplicaSpec(unittest.TestCase): + """V2beta1ReplicaSpec unit test stubs""" + + def setUp(self): + pass + + def tearDown(self): + pass + + def make_instance(self, include_optional): + """Test V2beta1ReplicaSpec + include_option is a boolean, when False only required + params are included, when True both required and + optional params are included """ + # model = mpijob.models.v2beta1_replica_spec.V2beta1ReplicaSpec() # noqa: E501 + if include_optional : + return V2beta1ReplicaSpec( + replicas = 56, + restart_policy = '', + template = None + ) + else : + return V2beta1ReplicaSpec( + ) + + def testV2beta1ReplicaSpec(self): + """Test V2beta1ReplicaSpec""" + inst_req_only = self.make_instance(include_optional=False) + inst_req_and_optional = self.make_instance(include_optional=True) + +if __name__ == '__main__': + unittest.main() diff --git a/test/e2e/mpi_job_test.go b/test/e2e/mpi_job_test.go index 55cdc19c4..26183ab89 100644 --- a/test/e2e/mpi_job_test.go +++ b/test/e2e/mpi_job_test.go @@ -20,7 +20,6 @@ import ( "io" "github.com/google/go-cmp/cmp" - common "github.com/kubeflow/common/pkg/apis/common/v1" "github.com/onsi/ginkgo" "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -70,9 +69,9 @@ var _ = ginkgo.Describe("MPIJob", func() { Namespace: namespace, }, Spec: kubeflow.MPIJobSpec{ - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { - RestartPolicy: common.RestartPolicyOnFailure, + RestartPolicy: kubeflow.RestartPolicyOnFailure, }, kubeflow.MPIReplicaTypeWorker: { Replicas: newInt32(2), @@ -458,7 +457,7 @@ var _ = ginkgo.Describe("MPIJob", func() { var err error pods, err = k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{ LabelSelector: labels.FormatLabels(map[string]string{ - common.JobNameLabel: mpiJob.Name, + kubeflow.JobNameLabel: mpiJob.Name, }), }) return err @@ -540,9 +539,9 @@ func waitForCompletion(ctx context.Context, mpiJob *kubeflow.MPIJob) *kubeflow.M func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { selector := metav1.LabelSelector{ MatchLabels: map[string]string{ - common.OperatorNameLabel: kubeflow.OperatorName, - common.JobNameLabel: mpiJob.Name, - common.JobRoleLabel: "launcher", + kubeflow.OperatorNameLabel: kubeflow.OperatorName, + kubeflow.JobNameLabel: mpiJob.Name, + kubeflow.JobRoleLabel: "launcher", }, } launcherPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{ @@ -565,7 +564,7 @@ func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { return fmt.Errorf("obtaining launcher logs: %w", err) } - selector.MatchLabels[common.JobRoleLabel] = "worker" + selector.MatchLabels[kubeflow.JobRoleLabel] = "worker" workerPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{ LabelSelector: metav1.FormatLabelSelector(&selector), }) diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index c62915ff7..60e593849 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -21,7 +21,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - common "github.com/kubeflow/common/pkg/apis/common/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" @@ -66,7 +65,7 @@ func TestMPIJobSuccess(t *testing.T) { RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: kubeflow.NewCleanPodPolicy(kubeflow.CleanPodPolicyRunning), }, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -193,7 +192,7 @@ func TestMPIJobWaitWorkers(t *testing.T) { RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: kubeflow.NewCleanPodPolicy(kubeflow.CleanPodPolicyRunning), }, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -324,7 +323,7 @@ func TestMPIJobResumingAndSuspending(t *testing.T) { CleanPodPolicy: kubeflow.NewCleanPodPolicy(kubeflow.CleanPodPolicyRunning), Suspend: pointer.Bool(true), }, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -487,7 +486,7 @@ func TestMPIJobFailure(t *testing.T) { RunPolicy: kubeflow.RunPolicy{ CleanPodPolicy: kubeflow.NewCleanPodPolicy(kubeflow.CleanPodPolicyRunning), }, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -618,7 +617,7 @@ func TestMPIJobWithSchedulerPlugins(t *testing.T) { ScheduleTimeoutSeconds: pointer.Int32(900), }, }, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -732,7 +731,7 @@ func TestMPIJobWithVolcanoScheduler(t *testing.T) { PriorityClass: prioClass, }, }, - MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ kubeflow.MPIReplicaTypeLauncher: { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{