diff --git a/pkg/controller/nodes/task/backoff/handler.go b/pkg/controller/nodes/task/backoff/handler.go index 1abb9855d..d27400a17 100644 --- a/pkg/controller/nodes/task/backoff/handler.go +++ b/pkg/controller/nodes/task/backoff/handler.go @@ -9,7 +9,6 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/errors" stdAtomic "github.com/flyteorg/flytestdlib/atomic" - stdErrors "github.com/flyteorg/flytestdlib/errors" "github.com/flyteorg/flytestdlib/logger" v1 "k8s.io/api/core/v1" apiErrors "k8s.io/apimachinery/pkg/api/errors" @@ -18,7 +17,10 @@ import ( ) var ( - reqRegexp = regexp.MustCompile(`requested: (limits.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`) + limitedLimitsRegexp = regexp.MustCompile(`limited: (limits.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`) + limitedRequestsRegexp = regexp.MustCompile(`limited: (requests.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`) + requestedLimitsRegexp = regexp.MustCompile(`requested: (limits.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`) + requestedRequestsRegexp = regexp.MustCompile(`requested: (requests.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`) ) // SimpleBackOffBlocker is a simple exponential back-off timer that keeps track of the back-off period @@ -171,7 +173,7 @@ func (h *ComputeResourceAwareBackOffHandler) Handle(ctx context.Context, operati // It is necessary to parse the error message to get the actual constraints // in this case, if the error message indicates constraints on memory only, then we shouldn't be used to lower the CPU ceiling // even if CPU appears in requestedResourceList - newCeiling := GetComputeResourceAndQuantityRequested(err) + newCeiling := GetComputeResourceAndQuantity(err, requestedLimitsRegexp) h.ComputeResourceCeilings.updateAll(&newCeiling) } @@ -196,18 +198,18 @@ func IsBackOffError(err error) bool { return IsResourceQuotaExceeded(err) || apiErrors.IsTooManyRequests(err) || apiErrors.IsServerTimeout(err) } -func GetComputeResourceAndQuantityRequested(err error) v1.ResourceList { +func GetComputeResourceAndQuantity(err error, resourceRegex *regexp.Regexp) v1.ResourceList { // Playground: https://play.golang.org/p/oOr6CMmW7IE // Sample message: // "requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi" // Extracting "requested: limits.cpu=7,limits.memory=64Gi" - matches := reqRegexp.FindAllStringSubmatch(err.Error(), -1) - requestedComputeResources := v1.ResourceList{} + matches := resourceRegex.FindAllStringSubmatch(err.Error(), -1) + computeResources := v1.ResourceList{} if len(matches) == 0 || len(matches[0]) == 0 { - return requestedComputeResources + return computeResources } // Extracting "limits.cpu=7,limits.memory=64Gi" @@ -226,11 +228,28 @@ func GetComputeResourceAndQuantityRequested(err error) v1.ResourceList { if len(tuple) < 2 { continue } - requestedComputeResources[v1.ResourceName(tuple[0])] = resource.MustParse(tuple[1]) + computeResources[v1.ResourceName(tuple[0])] = resource.MustParse(tuple[1]) } - return requestedComputeResources + return computeResources } -func IsBackoffError(err error) bool { - return stdErrors.IsCausedBy(err, errors.BackOffError) +func IsResourceRequestsEligible(err error) bool { + limitedLimitsResourceList := GetComputeResourceAndQuantity(err, limitedLimitsRegexp) + limitedRequestsResourceList := GetComputeResourceAndQuantity(err, limitedRequestsRegexp) + requestedLimitsResourceList := GetComputeResourceAndQuantity(err, requestedLimitsRegexp) + requestedRequestsResourceList := GetComputeResourceAndQuantity(err, requestedRequestsRegexp) + + return isEligible(requestedLimitsResourceList, limitedLimitsResourceList) && + isEligible(requestedRequestsResourceList, limitedRequestsResourceList) +} + +func isEligible(requestedResourceList, quotaResourceList v1.ResourceList) (eligibility bool) { + for resource, requestedQuantity := range requestedResourceList { + quotaQuantity, exists := quotaResourceList[resource] + if exists && requestedQuantity.Cmp(quotaQuantity) >= 0 { + return false + } + } + + return true } diff --git a/pkg/controller/nodes/task/backoff/handler_test.go b/pkg/controller/nodes/task/backoff/handler_test.go index 525331776..5167becee 100644 --- a/pkg/controller/nodes/task/backoff/handler_test.go +++ b/pkg/controller/nodes/task/backoff/handler_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "reflect" + "regexp" "testing" "time" @@ -344,32 +345,79 @@ func TestComputeResourceCeilings_updateAll(t *testing.T) { func TestGetComputeResourceAndQuantityRequested(t *testing.T) { type args struct { - err error + err error + regexp *regexp.Regexp } tests := []struct { name string args args want v1.ResourceList }{ - {name: "Memory request", args: args{err: apiErrors.NewForbidden( + {name: "Limited memory limits", args: args{err: apiErrors.NewForbidden( schema.GroupResource{}, "", errors.New("is forbidden: "+ - "exceeded quota: project-quota, requested: limits.memory=3Gi, "+ - "used: limits.memory=7976Gi, limited: limits.memory=8000Gi"))}, + "exceeded quota: project-quota, requested: limits.memory=3Gi, used: limits.memory=7976Gi, limited: limits.memory=8000Gi")), + regexp: limitedLimitsRegexp}, + want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("8000Gi")}}, + {name: "Limited CPU limits", args: args{err: apiErrors.NewForbidden( + schema.GroupResource{}, "", errors.New("is forbidden: "+ + "exceeded quota: project-quota, requested: limits.cpu=3640m, used: limits.cpu=6000m, limited: limits.cpu=8000m")), + regexp: limitedLimitsRegexp}, + want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("8000m")}}, + {name: "Limited multiple limits ", args: args{err: apiErrors.NewForbidden( + schema.GroupResource{}, "", errors.New("is forbidden: "+ + "exceeded quota: project-quota, requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi")), + regexp: limitedLimitsRegexp}, + want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250"), v1.ResourceMemory: resource.MustParse("2000Gi")}}, + {name: "Limited memory requests", args: args{err: apiErrors.NewForbidden( + schema.GroupResource{}, "", errors.New("is forbidden: "+ + "exceeded quota: project-quota, requested: requests.memory=3Gi, used: requests.memory=7976Gi, limited: requests.memory=8000Gi")), + regexp: limitedRequestsRegexp}, + want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("8000Gi")}}, + {name: "Limited CPU requests", args: args{err: apiErrors.NewForbidden( + schema.GroupResource{}, "", errors.New("is forbidden: "+ + "exceeded quota: project-quota, requested: requests.cpu=3640m, used: requests.cpu=6000m, limited: requests.cpu=8000m")), + regexp: limitedRequestsRegexp}, + want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("8000m")}}, + {name: "Limited multiple requests ", args: args{err: apiErrors.NewForbidden( + schema.GroupResource{}, "", errors.New("is forbidden: "+ + "exceeded quota: project-quota, requested: requests.cpu=7,requests.memory=64Gi, used: requests.cpu=249,requests.memory=2012730Mi, limited: requests.cpu=250,requests.memory=2000Gi")), + regexp: limitedRequestsRegexp}, + want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250"), v1.ResourceMemory: resource.MustParse("2000Gi")}}, + {name: "Requested memory limits", args: args{err: apiErrors.NewForbidden( + schema.GroupResource{}, "", errors.New("is forbidden: "+ + "exceeded quota: project-quota, requested: limits.memory=3Gi, used: limits.memory=7976Gi, limited: limits.memory=8000Gi")), + regexp: requestedLimitsRegexp}, + want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("3Gi")}}, + {name: "Requested CPU limits", args: args{err: apiErrors.NewForbidden( + schema.GroupResource{}, "", errors.New("is forbidden: "+ + "exceeded quota: project-quota, requested: limits.cpu=3640m, used: limits.cpu=6000m, limited: limits.cpu=8000m")), + regexp: requestedLimitsRegexp}, + want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3640m")}}, + {name: "Requested multiple limits ", args: args{err: apiErrors.NewForbidden( + schema.GroupResource{}, "", errors.New("is forbidden: "+ + "exceeded quota: project-quota, requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi")), + regexp: requestedLimitsRegexp}, + want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("7"), v1.ResourceMemory: resource.MustParse("64Gi")}}, + {name: "Requested memory requests", args: args{err: apiErrors.NewForbidden( + schema.GroupResource{}, "", errors.New("is forbidden: "+ + "exceeded quota: project-quota, requested: requests.memory=3Gi, used: requests.memory=7976Gi, limited: requests.memory=8000Gi")), + regexp: requestedRequestsRegexp}, want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("3Gi")}}, - {name: "CPU request", args: args{err: apiErrors.NewForbidden( + {name: "Requested CPU requests", args: args{err: apiErrors.NewForbidden( schema.GroupResource{}, "", errors.New("is forbidden: "+ - "exceeded quota: project-quota, requested: limits.cpu=3640m, "+ - "used: limits.cpu=6000m, limited: limits.cpu=8000m"))}, + "exceeded quota: project-quota, requested: requests.cpu=3640m, used: requests.cpu=6000m, limited: requests.cpu=8000m")), + regexp: requestedRequestsRegexp}, want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3640m")}}, - {name: "Multiple resources ", args: args{err: apiErrors.NewForbidden( + {name: "Requested multiple requests ", args: args{err: apiErrors.NewForbidden( schema.GroupResource{}, "", errors.New("is forbidden: "+ - "exceeded quota: project-quota, requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi"))}, + "exceeded quota: project-quota, requested: requests.cpu=7,requests.memory=64Gi, used: requests.cpu=249,requests.memory=2012730Mi, limited: requests.cpu=250,requests.memory=2000Gi")), + regexp: requestedRequestsRegexp}, want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("7"), v1.ResourceMemory: resource.MustParse("64Gi")}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := GetComputeResourceAndQuantityRequested(tt.args.err); !reflect.DeepEqual(got, tt.want) { - t.Errorf("GetComputeResourceAndQuantityRequested() = %v, want %v", got, tt.want) + if got := GetComputeResourceAndQuantity(tt.args.err, tt.args.regexp); !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetComputeResourceAndQuantity() = %v, want %v", got, tt.want) } }) } @@ -390,7 +438,7 @@ func TestIsBackoffError(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := IsBackoffError(tt.args.err); got != tt.want { + if got := stdlibErrors.IsCausedBy(tt.args.err, taskErrors.BackOffError); got != tt.want { t.Errorf("IsBackoffError() = %v, want %v", got, tt.want) } }) @@ -516,3 +564,56 @@ func TestErrorTypes(t *testing.T) { assert.True(t, res) }) } + +func TestIsEligible(t *testing.T) { + type args struct { + requested v1.ResourceList + quota v1.ResourceList + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "CPUElgible", + args: args{ + requested: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}, + quota: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2")}, + }, + want: true, + }, + { + name: "CPUInelgible", + args: args{ + requested: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")}, + quota: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m")}, + }, + want: false, + }, + { + name: "MemoryElgible", + args: args{ + requested: v1.ResourceList{v1.ResourceMemory: resource.MustParse("32Gi")}, + quota: v1.ResourceList{v1.ResourceMemory: resource.MustParse("64Gi")}, + }, + want: true, + }, + { + name: "MemoryInelgible", + args: args{ + requested: v1.ResourceList{v1.ResourceMemory: resource.MustParse("64Gi")}, + quota: v1.ResourceList{v1.ResourceMemory: resource.MustParse("64Gi")}, + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isEligible(tt.args.requested, tt.args.quota); got != tt.want { + t.Errorf("isEligible() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 67b0356a3..3793102be 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "reflect" - "strings" "time" "github.com/flyteorg/flyteplugins/go/tasks/errors" @@ -224,14 +223,19 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas } if err != nil && !k8serrors.IsAlreadyExists(err) { - if backoff.IsBackoffError(err) { + if backoff.IsResourceQuotaExceeded(err) && !backoff.IsResourceRequestsEligible(err) { + // if task resources exceed resource quotas then permanently fail because the task will + // be stuck waiting for resources until the `node-active-deadline` terminates the node. + logger.Errorf(ctx, "task resource requests exceed k8s resource limits. err: %v", err) + return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("ResourceRequestsExceedLimits", + fmt.Sprintf("requested resources exceed limits: %v", err.Error()), nil)), nil + } else if stdErrors.IsCausedBy(err, errors.BackOffError) { logger.Warnf(ctx, "Failed to launch job, resource quota exceeded. err: %v", err) return pluginsCore.DoTransition(pluginsCore.PhaseInfoWaitingForResourcesInfo(time.Now(), pluginsCore.DefaultPhaseVersion, fmt.Sprintf("Exceeded resourcequota: %s", err.Error()), nil)), nil + } else if e.backOffController == nil && backoff.IsResourceQuotaExceeded(err) { + logger.Warnf(ctx, "Failed to launch job, resource quota exceeded and the operation is not guarded by back-off. err: %v", err) + return pluginsCore.DoTransition(pluginsCore.PhaseInfoWaitingForResourcesInfo(time.Now(), pluginsCore.DefaultPhaseVersion, fmt.Sprintf("Exceeded resourcequota: %s", err.Error()), nil)), nil } else if k8serrors.IsForbidden(err) { - if e.backOffController == nil && strings.Contains(err.Error(), "exceeded quota") { - logger.Warnf(ctx, "Failed to launch job, resource quota exceeded and the operation is not guarded by back-off. err: %v", err) - return pluginsCore.DoTransition(pluginsCore.PhaseInfoWaitingForResourcesInfo(time.Now(), pluginsCore.DefaultPhaseVersion, fmt.Sprintf("Exceeded resourcequota: %s", err.Error()), nil)), nil - } return pluginsCore.DoTransition(pluginsCore.PhaseInfoRetryableFailure("RuntimeFailure", err.Error(), nil)), nil } else if k8serrors.IsBadRequest(err) || k8serrors.IsInvalid(err) { logger.Errorf(ctx, "Badly formatted resource for plugin [%s], err %s", e.id, err)