From 5172183c503e6e8af2158e9986212e73eaea217f Mon Sep 17 00:00:00 2001 From: rigazilla Date: Thu, 23 May 2024 15:13:29 +0200 Subject: [PATCH] Resources spec for Batch. Fixes #2093 --- api/v1/types_util.go | 14 +++--- api/v2alpha1/batch_types.go | 12 +++++ api/v2alpha1/batch_webhook.go | 47 +++++++++++++++++++ api/v2alpha1/batch_webhook_test.go | 24 ++++++++++ api/v2alpha1/types_util.go | 12 +++++ api/v2alpha1/zz_generated.deepcopy.go | 20 ++++++++ config/crd/bases/infinispan.org_batches.yaml | 8 ++++ ...nispan-operator.clusterserviceversion.yaml | 3 ++ controllers/batch_controller.go | 24 ++++++++++ test/e2e/batch/batch_helper.go | 3 +- test/e2e/batch/batch_test.go | 29 ++++++++++-- test/e2e/upgrade/upgrade_test.go | 2 +- test/e2e/utils/kubernetes.go | 13 +++++ 13 files changed, 199 insertions(+), 12 deletions(-) diff --git a/api/v1/types_util.go b/api/v1/types_util.go index a81c7ee30..c3e0a5523 100644 --- a/api/v1/types_util.go +++ b/api/v1/types_util.go @@ -446,32 +446,32 @@ func (ispn *Infinispan) GetTruststoreSecretName() string { // GetCpuResources returns the CPU request and limit values to be used by pods func (spec *InfinispanContainerSpec) GetCpuResources() (requests resource.Quantity, limits resource.Quantity, err error) { - return getRequestLimits(spec.CPU) + return GetRequestLimits(spec.CPU) } // GetMemoryResources returns the Memory request and limit values to be used by pods func (spec *InfinispanContainerSpec) GetMemoryResources() (requests resource.Quantity, limits resource.Quantity, err error) { - return getRequestLimits(spec.Memory) + return GetRequestLimits(spec.Memory) } // CpuResources returns the CPU request and limit values to be used by pods func (spec *ConfigListenerSpec) CpuResources() (requests resource.Quantity, limits resource.Quantity, err error) { - return getRequestLimits(spec.CPU) + return GetRequestLimits(spec.CPU) } // MemoryResources returns the Memory request and limit values to be used by pods func (spec *ConfigListenerSpec) MemoryResources() (requests resource.Quantity, limits resource.Quantity, err error) { - return getRequestLimits(spec.Memory) + return GetRequestLimits(spec.Memory) } // CpuResources returns the CPU request and limit values to be used by by Gossip Router pod func (spec *DiscoverySiteSpec) CpuResources() (requests resource.Quantity, limits resource.Quantity, err error) { - return getRequestLimits(spec.CPU) + return GetRequestLimits(spec.CPU) } // MemoryResources returns the Memory request and limit values to be used by Gossip Router pod func (spec *DiscoverySiteSpec) MemoryResources() (requests resource.Quantity, limits resource.Quantity, err error) { - return getRequestLimits(spec.Memory) + return GetRequestLimits(spec.Memory) } // CpuResources returns the CPU request and limit values to be used by by External Dependencies Downloader Init container @@ -484,7 +484,7 @@ func (spec *InitDependenciesContainerSpec) MemoryResources() (requests resource. return getRequestLimits(spec.Memory) } -func getRequestLimits(str string) (requests resource.Quantity, limits resource.Quantity, err error) { +func GetRequestLimits(str string) (requests resource.Quantity, limits resource.Quantity, err error) { if str == "" { err = fmt.Errorf("resource string cannot be empty") return diff --git a/api/v2alpha1/batch_types.go b/api/v2alpha1/batch_types.go index a3398d8a9..876f7dbd4 100644 --- a/api/v2alpha1/batch_types.go +++ b/api/v2alpha1/batch_types.go @@ -16,6 +16,18 @@ type BatchSpec struct { // Name of the ConfigMap containing the batch and resource files to be executed // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="ConfigMap Name" ConfigMap *string `json:"configMap,omitempty"` + // +optional + // Specify resource requirements per container + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Container Spec" + Container *BatchContainerSpec `json:"container,omitempty"` +} + +// BatchContainerSpec specify resource requirements per container +type BatchContainerSpec struct { + // +optional + Memory string `json:"memory,omitempty"` + // +optional + CPU string `json:"cpu,omitempty"` } type BatchPhase string diff --git a/api/v2alpha1/batch_webhook.go b/api/v2alpha1/batch_webhook.go index 12f96ac21..fe5b8b05b 100644 --- a/api/v2alpha1/batch_webhook.go +++ b/api/v2alpha1/batch_webhook.go @@ -1,6 +1,7 @@ package v2alpha1 import ( + "fmt" "reflect" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -24,6 +25,9 @@ var _ webhook.Validator = &Batch{} // ValidateCreate implements webhook.Validator so a webhook will be registered for the type func (b *Batch) ValidateCreate() error { var allErrs field.ErrorList + if err := b.validate(); err != nil { + return err + } if b.Spec.ConfigMap == nil && b.Spec.Config == nil { allErrs = append(allErrs, field.Required(field.NewPath("spec").Child("configMap"), "'Spec.config' OR 'spec.ConfigMap' must be configured")) } else if b.Spec.ConfigMap != nil && b.Spec.Config != nil { @@ -35,6 +39,9 @@ func (b *Batch) ValidateCreate() error { // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type func (b *Batch) ValidateUpdate(old runtime.Object) error { var allErrs field.ErrorList + if err := b.validate(); err != nil { + return err + } oldBatch := old.(*Batch) if !reflect.DeepEqual(b.Spec, oldBatch.Spec) { allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "The Batch spec is immutable and cannot be updated after initial Batch creation")) @@ -42,6 +49,46 @@ func (b *Batch) ValidateUpdate(old runtime.Object) error { return b.StatusError(allErrs) } +func (b *Batch) validate() error { + var allErrs field.ErrorList + if b.Spec.Container == nil { + return nil + } + if b.Spec.Container.CPU != "" { + req, limit, err := b.Spec.Container.CpuResources() + if err != nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("container").Child("cpu"), b.Spec.Container.CPU, err.Error())) + } + + if req.Cmp(limit) > 0 { + msg := fmt.Sprintf("CPU request '%s' exceeds limit '%s'", req.String(), limit.String()) + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("container").Child("cpu"), b.Spec.Container.CPU, msg)) + } + } + + memReq, memLimit, err := b.Spec.Container.MemoryResources() + if b.Spec.Container.Memory != "" { + if err != nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("container").Child("memory"), b.Spec.Container.Memory, err.Error())) + } + + if memReq.Cmp(memLimit) > 0 { + msg := fmt.Sprintf("Memory request '%s' exceeds limit '%s'", memReq.String(), memLimit.String()) + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("container").Child("memory"), b.Spec.Container.Memory, msg)) + } + } + return errorListToError(b, allErrs) +} + +func errorListToError(b *Batch, allErrs field.ErrorList) error { + if len(allErrs) != 0 { + return apierrors.NewInvalid( + schema.GroupKind{Group: GroupVersion.Group, Kind: "Batch"}, + b.Name, allErrs) + } + return nil +} + // ValidateDelete implements webhook.Validator so a webhook will be registered for the type func (b *Batch) ValidateDelete() error { // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. diff --git a/api/v2alpha1/batch_webhook_test.go b/api/v2alpha1/batch_webhook_test.go index 31a903bce..810030449 100644 --- a/api/v2alpha1/batch_webhook_test.go +++ b/api/v2alpha1/batch_webhook_test.go @@ -135,5 +135,29 @@ var _ = Describe("Batch Webhook", func() { updated.Spec.ConfigMap = pointer.String("New ConfigMap") expectInvalidErrStatus(k8sClient.Update(ctx, updated), cause) }) + + It("Should return error if malformed memory or CPU request is greater than limit", func() { + created := &Batch{ + ObjectMeta: metav1.ObjectMeta{ + Name: key.Name, + Namespace: key.Namespace, + }, + Spec: BatchSpec{ + Cluster: "some-cluster", + Config: pointer.String("create cache --template=org.infinispan.DIST_SYNC batch-cache"), + Container: &BatchContainerSpec{ + Memory: "1Gi:5Gi", + CPU: "1000m:2000m", + }, + }, + } + + err := k8sClient.Create(ctx, created) + expectInvalidErrStatus(err, []statusDetailCause{{ + metav1.CauseTypeFieldValueInvalid, "spec.container.cpu", "exceeds limit", + }, { + metav1.CauseTypeFieldValueInvalid, "spec.container.memory", "exceeds limit", + }}...) + }) }) }) diff --git a/api/v2alpha1/types_util.go b/api/v2alpha1/types_util.go index 8d1129d5f..a75719c29 100644 --- a/api/v2alpha1/types_util.go +++ b/api/v2alpha1/types_util.go @@ -3,6 +3,8 @@ package v2alpha1 import ( "strings" + v1 "github.com/infinispan/infinispan-operator/api/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -57,3 +59,13 @@ func (b *Batch) ConfigMapName() string { func (a CacheConditionType) equals(b CacheConditionType) bool { return strings.EqualFold(strings.ToLower(string(a)), strings.ToLower(string(b))) } + +// CpuResources returns the CPU request and limit values to be used by Batch pod +func (spec *BatchContainerSpec) CpuResources() (requests resource.Quantity, limits resource.Quantity, err error) { + return v1.GetRequestLimits(spec.CPU) +} + +// MemoryResources returns the Memory request and limit values to be used by by Batch pod +func (spec *BatchContainerSpec) MemoryResources() (requests resource.Quantity, limits resource.Quantity, err error) { + return v1.GetRequestLimits(spec.Memory) +} diff --git a/api/v2alpha1/zz_generated.deepcopy.go b/api/v2alpha1/zz_generated.deepcopy.go index 75434475e..61d3a1e23 100644 --- a/api/v2alpha1/zz_generated.deepcopy.go +++ b/api/v2alpha1/zz_generated.deepcopy.go @@ -224,6 +224,21 @@ func (in *Batch) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BatchContainerSpec) DeepCopyInto(out *BatchContainerSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BatchContainerSpec. +func (in *BatchContainerSpec) DeepCopy() *BatchContainerSpec { + if in == nil { + return nil + } + out := new(BatchContainerSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BatchList) DeepCopyInto(out *BatchList) { *out = *in @@ -269,6 +284,11 @@ func (in *BatchSpec) DeepCopyInto(out *BatchSpec) { *out = new(string) **out = **in } + if in.Container != nil { + in, out := &in.Container, &out.Container + *out = new(BatchContainerSpec) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BatchSpec. diff --git a/config/crd/bases/infinispan.org_batches.yaml b/config/crd/bases/infinispan.org_batches.yaml index 0d61bef03..3eeaa0c13 100644 --- a/config/crd/bases/infinispan.org_batches.yaml +++ b/config/crd/bases/infinispan.org_batches.yaml @@ -49,6 +49,14 @@ spec: description: Name of the ConfigMap containing the batch and resource files to be executed type: string + container: + description: Specify resource requirements per container + properties: + cpu: + type: string + memory: + type: string + type: object required: - cluster type: object diff --git a/config/manifests/bases/infinispan-operator.clusterserviceversion.yaml b/config/manifests/bases/infinispan-operator.clusterserviceversion.yaml index 98e29a12c..ea2e60124 100644 --- a/config/manifests/bases/infinispan-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/infinispan-operator.clusterserviceversion.yaml @@ -62,6 +62,9 @@ spec: to be executed displayName: ConfigMap Name path: configMap + - description: Specify resource requirements per container + displayName: Container Spec + path: container statusDescriptors: - description: The UUID of the Infinispan instance that the Batch is associated with diff --git a/controllers/batch_controller.go b/controllers/batch_controller.go index 56ddf4422..eafc963a5 100644 --- a/controllers/batch_controller.go +++ b/controllers/batch_controller.go @@ -194,6 +194,7 @@ func (r *batchRequest) execute() (reconcile.Result, error) { MountPath: consts.ServerAdminIdentitiesRoot, }, }, + Resources: *BatchResources(batch.Spec.Container), }}, RestartPolicy: corev1.RestartPolicyNever, Volumes: []corev1.Volume{ @@ -318,3 +319,26 @@ func batchLabels(name string) map[string]string { "app": "infinispan-batch-pod", } } + +func BatchResources(spec *v2.BatchContainerSpec) *corev1.ResourceRequirements { + if spec == nil { + return &corev1.ResourceRequirements{} + } + memRequests, memLimits, _ := spec.MemoryResources() + + req := &corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: memRequests, + }, + Limits: corev1.ResourceList{ + corev1.ResourceMemory: memLimits, + }, + } + + if spec.CPU != "" { + cpuRequests, cpuLimits, _ := spec.CpuResources() + req.Requests[corev1.ResourceCPU] = cpuRequests + req.Limits[corev1.ResourceCPU] = cpuLimits + } + return req +} diff --git a/test/e2e/batch/batch_helper.go b/test/e2e/batch/batch_helper.go index b863b850f..dca4ddd7d 100644 --- a/test/e2e/batch/batch_helper.go +++ b/test/e2e/batch/batch_helper.go @@ -21,7 +21,7 @@ func NewBatchHelper(testKube *tutils.TestKubernetes) *BatchHelper { } } -func (b BatchHelper) CreateBatch(t *testing.T, name, cluster string, config, configMap *string) *v2.Batch { +func (b BatchHelper) CreateBatch(t *testing.T, name, cluster string, config, configMap *string, containerSpec *v2.BatchContainerSpec) *v2.Batch { testName := tutils.TestName(t) batch := &v2.Batch{ TypeMeta: metav1.TypeMeta{ @@ -37,6 +37,7 @@ func (b BatchHelper) CreateBatch(t *testing.T, name, cluster string, config, con Cluster: cluster, Config: config, ConfigMap: configMap, + Container: containerSpec, }, } b.testKube.Create(batch) diff --git a/test/e2e/batch/batch_test.go b/test/e2e/batch/batch_test.go index 17f3f338d..241731643 100644 --- a/test/e2e/batch/batch_test.go +++ b/test/e2e/batch/batch_test.go @@ -10,6 +10,7 @@ import ( v1 "github.com/infinispan/infinispan-operator/api/v1" v2 "github.com/infinispan/infinispan-operator/api/v2alpha1" + "github.com/infinispan/infinispan-operator/controllers" batchCtrl "github.com/infinispan/infinispan-operator/controllers" ispnClient "github.com/infinispan/infinispan-operator/pkg/infinispan/client" "github.com/infinispan/infinispan-operator/pkg/infinispan/client/api" @@ -41,7 +42,7 @@ func TestBatchInlineConfig(t *testing.T) { func testBatchInlineConfig(t *testing.T, infinispan *v1.Infinispan) { name := infinispan.Name batchScript := batchString() - batch := helper.CreateBatch(t, name, name, &batchScript, nil) + batch := helper.CreateBatch(t, name, name, &batchScript, nil, nil) helper.WaitForValidBatchPhase(name, v2.BatchSucceeded) @@ -73,7 +74,7 @@ func TestBatchConfigMap(t *testing.T) { testKube.CreateConfigMap(configMap) defer testKube.DeleteConfigMap(configMap) - batch := helper.CreateBatch(t, infinispan.Name, infinispan.Name, nil, &configMapName) + batch := helper.CreateBatch(t, infinispan.Name, infinispan.Name, nil, &configMapName, nil) helper.WaitForValidBatchPhase(infinispan.Name, v2.BatchSucceeded) testKube.DeleteBatch(batch) @@ -92,13 +93,35 @@ func TestBatchFail(t *testing.T) { infinispan := createCluster(t) batchScript := "SOME INVALID BATCH CMD!" - batch := helper.CreateBatch(t, infinispan.Name, infinispan.Name, &batchScript, nil) + batch := helper.CreateBatch(t, infinispan.Name, infinispan.Name, &batchScript, nil, nil) helper.WaitForValidBatchPhase(infinispan.Name, v2.BatchFailed) testKube.DeleteBatch(batch) waitForK8sResourceCleanup(infinispan.Name) } +func TestBatchWithResources(t *testing.T) { + infinispan := createCluster(t) + batchScript := batchString() + bcSpec := &v2.BatchContainerSpec{Memory: "1Gi:1Gi", CPU: "500m:500m"} + podRes := controllers.BatchResources(bcSpec) + batch := helper.CreateBatch(t, infinispan.Name, infinispan.Name, &batchScript, nil, bcSpec) + + helper.WaitForValidBatchPhase(infinispan.Name, v2.BatchRunning) + + job := testKube.GetJob(infinispan.Name, tutils.Namespace) + limits := job.Spec.Template.Spec.Containers[0].Resources.Limits + requests := job.Spec.Template.Spec.Containers[0].Resources.Requests + if !limits.Cpu().Equal(*podRes.Limits.Cpu()) || + !limits.Memory().Equal(*podRes.Limits.Memory()) || + !requests.Cpu().Equal(*podRes.Requests.Cpu()) || + !requests.Memory().Equal(*podRes.Requests.Memory()) { + panic(fmt.Errorf("unexpected error")) + } + testKube.DeleteBatch(batch) + waitForK8sResourceCleanup(infinispan.Name) +} + func batchString() string { batchScript := `create cache --template=org.infinispan.DIST_SYNC batch-cache create counter --concurrency-level=1 --initial-value=5 --storage=VOLATILE --type=weak batch-counter` diff --git a/test/e2e/upgrade/upgrade_test.go b/test/e2e/upgrade/upgrade_test.go index 4ef99a438..3b7c74c34 100644 --- a/test/e2e/upgrade/upgrade_test.go +++ b/test/e2e/upgrade/upgrade_test.go @@ -369,6 +369,6 @@ func checkBatch(t *testing.T, name string) { // Run a batch in the migrated cluster batchHelper := batchtest.NewBatchHelper(testKube) config := "create cache --template=org.infinispan.DIST_SYNC batch-cache" - batchHelper.CreateBatch(t, name, name, &config, nil) + batchHelper.CreateBatch(t, name, name, &config, nil, nil) batchHelper.WaitForValidBatchPhase(name, v2.BatchSucceeded) } diff --git a/test/e2e/utils/kubernetes.go b/test/e2e/utils/kubernetes.go index b3cf90607..ba34ba266 100644 --- a/test/e2e/utils/kubernetes.go +++ b/test/e2e/utils/kubernetes.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap/zapcore" "gopkg.in/yaml.v2" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" storagev1 "k8s.io/api/storage/v1" @@ -68,6 +69,7 @@ func init() { addToScheme(&ispnv2.SchemeBuilder.SchemeBuilder, Scheme) addToScheme(&appsv1.SchemeBuilder, Scheme) addToScheme(&storagev1.SchemeBuilder, Scheme) + addToScheme(&batchv1.SchemeBuilder, Scheme) ExpectNoError(routev1.AddToScheme(Scheme)) } @@ -1097,3 +1099,14 @@ func (k TestKubernetes) GetUsedNodePorts() map[int32]struct{} { } return usedPorts } + +// GetStatefulSet gets an Infinispan resource in the given namespace +func (k TestKubernetes) GetJob(name, namespace string) *batchv1.Job { + job := &batchv1.Job{} + key := types.NamespacedName{ + Namespace: namespace, + Name: name, + } + ExpectMaybeNotFound(k.Kubernetes.Client.Get(context.TODO(), key, job)) + return job +}