Skip to content

Commit

Permalink
Resources spec for Batch. Fixes #2093
Browse files Browse the repository at this point in the history
  • Loading branch information
rigazilla authored and ryanemerson committed Sep 27, 2024
1 parent be94e44 commit 5172183
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 12 deletions.
14 changes: 7 additions & 7 deletions api/v1/types_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,32 +446,32 @@ func (ispn *Infinispan) GetTruststoreSecretName() string {

// GetCpuResources returns the CPU request and limit values to be used by pods
func (spec *InfinispanContainerSpec) GetCpuResources() (requests resource.Quantity, limits resource.Quantity, err error) {
return getRequestLimits(spec.CPU)
return GetRequestLimits(spec.CPU)
}

// GetMemoryResources returns the Memory request and limit values to be used by pods
func (spec *InfinispanContainerSpec) GetMemoryResources() (requests resource.Quantity, limits resource.Quantity, err error) {
return getRequestLimits(spec.Memory)
return GetRequestLimits(spec.Memory)
}

// CpuResources returns the CPU request and limit values to be used by pods
func (spec *ConfigListenerSpec) CpuResources() (requests resource.Quantity, limits resource.Quantity, err error) {
return getRequestLimits(spec.CPU)
return GetRequestLimits(spec.CPU)
}

// MemoryResources returns the Memory request and limit values to be used by pods
func (spec *ConfigListenerSpec) MemoryResources() (requests resource.Quantity, limits resource.Quantity, err error) {
return getRequestLimits(spec.Memory)
return GetRequestLimits(spec.Memory)
}

// CpuResources returns the CPU request and limit values to be used by by Gossip Router pod
func (spec *DiscoverySiteSpec) CpuResources() (requests resource.Quantity, limits resource.Quantity, err error) {
return getRequestLimits(spec.CPU)
return GetRequestLimits(spec.CPU)
}

// MemoryResources returns the Memory request and limit values to be used by Gossip Router pod
func (spec *DiscoverySiteSpec) MemoryResources() (requests resource.Quantity, limits resource.Quantity, err error) {
return getRequestLimits(spec.Memory)
return GetRequestLimits(spec.Memory)
}

// CpuResources returns the CPU request and limit values to be used by by External Dependencies Downloader Init container
Expand All @@ -484,7 +484,7 @@ func (spec *InitDependenciesContainerSpec) MemoryResources() (requests resource.
return getRequestLimits(spec.Memory)
}

func getRequestLimits(str string) (requests resource.Quantity, limits resource.Quantity, err error) {
func GetRequestLimits(str string) (requests resource.Quantity, limits resource.Quantity, err error) {
if str == "" {
err = fmt.Errorf("resource string cannot be empty")
return
Expand Down
12 changes: 12 additions & 0 deletions api/v2alpha1/batch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ type BatchSpec struct {
// Name of the ConfigMap containing the batch and resource files to be executed
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="ConfigMap Name"
ConfigMap *string `json:"configMap,omitempty"`
// +optional
// Specify resource requirements per container
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Container Spec"
Container *BatchContainerSpec `json:"container,omitempty"`
}

// BatchContainerSpec specify resource requirements per container
type BatchContainerSpec struct {
// +optional
Memory string `json:"memory,omitempty"`
// +optional
CPU string `json:"cpu,omitempty"`
}

type BatchPhase string
Expand Down
47 changes: 47 additions & 0 deletions api/v2alpha1/batch_webhook.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v2alpha1

import (
"fmt"
"reflect"

apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -24,6 +25,9 @@ var _ webhook.Validator = &Batch{}
// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (b *Batch) ValidateCreate() error {
var allErrs field.ErrorList
if err := b.validate(); err != nil {
return err
}
if b.Spec.ConfigMap == nil && b.Spec.Config == nil {
allErrs = append(allErrs, field.Required(field.NewPath("spec").Child("configMap"), "'Spec.config' OR 'spec.ConfigMap' must be configured"))
} else if b.Spec.ConfigMap != nil && b.Spec.Config != nil {
Expand All @@ -35,13 +39,56 @@ func (b *Batch) ValidateCreate() error {
// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (b *Batch) ValidateUpdate(old runtime.Object) error {
var allErrs field.ErrorList
if err := b.validate(); err != nil {
return err
}
oldBatch := old.(*Batch)
if !reflect.DeepEqual(b.Spec, oldBatch.Spec) {
allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "The Batch spec is immutable and cannot be updated after initial Batch creation"))
}
return b.StatusError(allErrs)
}

func (b *Batch) validate() error {
var allErrs field.ErrorList
if b.Spec.Container == nil {
return nil
}
if b.Spec.Container.CPU != "" {
req, limit, err := b.Spec.Container.CpuResources()
if err != nil {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("container").Child("cpu"), b.Spec.Container.CPU, err.Error()))
}

if req.Cmp(limit) > 0 {
msg := fmt.Sprintf("CPU request '%s' exceeds limit '%s'", req.String(), limit.String())
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("container").Child("cpu"), b.Spec.Container.CPU, msg))
}
}

memReq, memLimit, err := b.Spec.Container.MemoryResources()
if b.Spec.Container.Memory != "" {
if err != nil {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("container").Child("memory"), b.Spec.Container.Memory, err.Error()))
}

if memReq.Cmp(memLimit) > 0 {
msg := fmt.Sprintf("Memory request '%s' exceeds limit '%s'", memReq.String(), memLimit.String())
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("container").Child("memory"), b.Spec.Container.Memory, msg))
}
}
return errorListToError(b, allErrs)
}

func errorListToError(b *Batch, allErrs field.ErrorList) error {
if len(allErrs) != 0 {
return apierrors.NewInvalid(
schema.GroupKind{Group: GroupVersion.Group, Kind: "Batch"},
b.Name, allErrs)
}
return nil
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (b *Batch) ValidateDelete() error {
// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
Expand Down
24 changes: 24 additions & 0 deletions api/v2alpha1/batch_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,29 @@ var _ = Describe("Batch Webhook", func() {
updated.Spec.ConfigMap = pointer.String("New ConfigMap")
expectInvalidErrStatus(k8sClient.Update(ctx, updated), cause)
})

It("Should return error if malformed memory or CPU request is greater than limit", func() {
created := &Batch{
ObjectMeta: metav1.ObjectMeta{
Name: key.Name,
Namespace: key.Namespace,
},
Spec: BatchSpec{
Cluster: "some-cluster",
Config: pointer.String("create cache --template=org.infinispan.DIST_SYNC batch-cache"),
Container: &BatchContainerSpec{
Memory: "1Gi:5Gi",
CPU: "1000m:2000m",
},
},
}

err := k8sClient.Create(ctx, created)
expectInvalidErrStatus(err, []statusDetailCause{{
metav1.CauseTypeFieldValueInvalid, "spec.container.cpu", "exceeds limit",
}, {
metav1.CauseTypeFieldValueInvalid, "spec.container.memory", "exceeds limit",
}}...)
})
})
})
12 changes: 12 additions & 0 deletions api/v2alpha1/types_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package v2alpha1
import (
"strings"

v1 "github.com/infinispan/infinispan-operator/api/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -57,3 +59,13 @@ func (b *Batch) ConfigMapName() string {
func (a CacheConditionType) equals(b CacheConditionType) bool {
return strings.EqualFold(strings.ToLower(string(a)), strings.ToLower(string(b)))
}

// CpuResources returns the CPU request and limit values to be used by Batch pod
func (spec *BatchContainerSpec) CpuResources() (requests resource.Quantity, limits resource.Quantity, err error) {
return v1.GetRequestLimits(spec.CPU)
}

// MemoryResources returns the Memory request and limit values to be used by by Batch pod
func (spec *BatchContainerSpec) MemoryResources() (requests resource.Quantity, limits resource.Quantity, err error) {
return v1.GetRequestLimits(spec.Memory)
}
20 changes: 20 additions & 0 deletions api/v2alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions config/crd/bases/infinispan.org_batches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ spec:
description: Name of the ConfigMap containing the batch and resource
files to be executed
type: string
container:
description: Specify resource requirements per container
properties:
cpu:
type: string
memory:
type: string
type: object
required:
- cluster
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ spec:
to be executed
displayName: ConfigMap Name
path: configMap
- description: Specify resource requirements per container
displayName: Container Spec
path: container
statusDescriptors:
- description: The UUID of the Infinispan instance that the Batch is associated
with
Expand Down
24 changes: 24 additions & 0 deletions controllers/batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (r *batchRequest) execute() (reconcile.Result, error) {
MountPath: consts.ServerAdminIdentitiesRoot,
},
},
Resources: *BatchResources(batch.Spec.Container),
}},
RestartPolicy: corev1.RestartPolicyNever,
Volumes: []corev1.Volume{
Expand Down Expand Up @@ -318,3 +319,26 @@ func batchLabels(name string) map[string]string {
"app": "infinispan-batch-pod",
}
}

func BatchResources(spec *v2.BatchContainerSpec) *corev1.ResourceRequirements {
if spec == nil {
return &corev1.ResourceRequirements{}
}
memRequests, memLimits, _ := spec.MemoryResources()

req := &corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceMemory: memRequests,
},
Limits: corev1.ResourceList{
corev1.ResourceMemory: memLimits,
},
}

if spec.CPU != "" {
cpuRequests, cpuLimits, _ := spec.CpuResources()
req.Requests[corev1.ResourceCPU] = cpuRequests
req.Limits[corev1.ResourceCPU] = cpuLimits
}
return req
}
3 changes: 2 additions & 1 deletion test/e2e/batch/batch_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewBatchHelper(testKube *tutils.TestKubernetes) *BatchHelper {
}
}

func (b BatchHelper) CreateBatch(t *testing.T, name, cluster string, config, configMap *string) *v2.Batch {
func (b BatchHelper) CreateBatch(t *testing.T, name, cluster string, config, configMap *string, containerSpec *v2.BatchContainerSpec) *v2.Batch {
testName := tutils.TestName(t)
batch := &v2.Batch{
TypeMeta: metav1.TypeMeta{
Expand All @@ -37,6 +37,7 @@ func (b BatchHelper) CreateBatch(t *testing.T, name, cluster string, config, con
Cluster: cluster,
Config: config,
ConfigMap: configMap,
Container: containerSpec,
},
}
b.testKube.Create(batch)
Expand Down
29 changes: 26 additions & 3 deletions test/e2e/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

v1 "github.com/infinispan/infinispan-operator/api/v1"
v2 "github.com/infinispan/infinispan-operator/api/v2alpha1"
"github.com/infinispan/infinispan-operator/controllers"
batchCtrl "github.com/infinispan/infinispan-operator/controllers"
ispnClient "github.com/infinispan/infinispan-operator/pkg/infinispan/client"
"github.com/infinispan/infinispan-operator/pkg/infinispan/client/api"
Expand Down Expand Up @@ -41,7 +42,7 @@ func TestBatchInlineConfig(t *testing.T) {
func testBatchInlineConfig(t *testing.T, infinispan *v1.Infinispan) {
name := infinispan.Name
batchScript := batchString()
batch := helper.CreateBatch(t, name, name, &batchScript, nil)
batch := helper.CreateBatch(t, name, name, &batchScript, nil, nil)

helper.WaitForValidBatchPhase(name, v2.BatchSucceeded)

Expand Down Expand Up @@ -73,7 +74,7 @@ func TestBatchConfigMap(t *testing.T) {
testKube.CreateConfigMap(configMap)
defer testKube.DeleteConfigMap(configMap)

batch := helper.CreateBatch(t, infinispan.Name, infinispan.Name, nil, &configMapName)
batch := helper.CreateBatch(t, infinispan.Name, infinispan.Name, nil, &configMapName, nil)

helper.WaitForValidBatchPhase(infinispan.Name, v2.BatchSucceeded)
testKube.DeleteBatch(batch)
Expand All @@ -92,13 +93,35 @@ func TestBatchFail(t *testing.T) {
infinispan := createCluster(t)

batchScript := "SOME INVALID BATCH CMD!"
batch := helper.CreateBatch(t, infinispan.Name, infinispan.Name, &batchScript, nil)
batch := helper.CreateBatch(t, infinispan.Name, infinispan.Name, &batchScript, nil, nil)

helper.WaitForValidBatchPhase(infinispan.Name, v2.BatchFailed)
testKube.DeleteBatch(batch)
waitForK8sResourceCleanup(infinispan.Name)
}

func TestBatchWithResources(t *testing.T) {
infinispan := createCluster(t)
batchScript := batchString()
bcSpec := &v2.BatchContainerSpec{Memory: "1Gi:1Gi", CPU: "500m:500m"}
podRes := controllers.BatchResources(bcSpec)
batch := helper.CreateBatch(t, infinispan.Name, infinispan.Name, &batchScript, nil, bcSpec)

helper.WaitForValidBatchPhase(infinispan.Name, v2.BatchRunning)

job := testKube.GetJob(infinispan.Name, tutils.Namespace)
limits := job.Spec.Template.Spec.Containers[0].Resources.Limits
requests := job.Spec.Template.Spec.Containers[0].Resources.Requests
if !limits.Cpu().Equal(*podRes.Limits.Cpu()) ||
!limits.Memory().Equal(*podRes.Limits.Memory()) ||
!requests.Cpu().Equal(*podRes.Requests.Cpu()) ||
!requests.Memory().Equal(*podRes.Requests.Memory()) {
panic(fmt.Errorf("unexpected error"))
}
testKube.DeleteBatch(batch)
waitForK8sResourceCleanup(infinispan.Name)
}

func batchString() string {
batchScript := `create cache --template=org.infinispan.DIST_SYNC batch-cache
create counter --concurrency-level=1 --initial-value=5 --storage=VOLATILE --type=weak batch-counter`
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,6 @@ func checkBatch(t *testing.T, name string) {
// Run a batch in the migrated cluster
batchHelper := batchtest.NewBatchHelper(testKube)
config := "create cache --template=org.infinispan.DIST_SYNC batch-cache"
batchHelper.CreateBatch(t, name, name, &config, nil)
batchHelper.CreateBatch(t, name, name, &config, nil, nil)
batchHelper.WaitForValidBatchPhase(name, v2.BatchSucceeded)
}
Loading

0 comments on commit 5172183

Please sign in to comment.