Skip to content

Commit

Permalink
[local] long-pending pods filter improvements
Browse files Browse the repository at this point in the history
Improve "long pending" pods detection so we're less likely to suffer from an overwhelmed autoscaler for an extended duration:
* Work at workload -not pod- level; keeping all pods sharing an owneref makes multi nodes upscales more efficient
* Don't set a maximum number of workloads having pending pods; that protection never helped (and inhibited long pending filter to help under large backlog)
* Beside pod age, consider the number of times a pod ran through the autoscaler's main loop: this allows to lower the "long pending" cutoff delay.
  • Loading branch information
bpineau authored and domenicbozzuto committed Oct 31, 2023
1 parent e3098e4 commit bf4afce
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 157 deletions.
146 changes: 110 additions & 36 deletions cluster-autoscaler/processors/datadog/pods/filter_long_pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,62 @@ limitations under the License.

/*
Ensures pods pending for a long time are retried at a slower pace:
* We don't want those long pending to slow down scale-up for fresh pods.
* If one of those is a pod causing a runaway infinite upscale, we want to give
* We don't want those long pending to slow down scale-up for new pods.
* If one of those is causing a runaway infinite upscale, we want to give
autoscaler some slack time to recover from cooldown, and reap unused nodes.
* For autoscaling efficiency, we prefer skipping or keeping all pods from
a given workload (all pods sharing a common controller ref).
We quarantine (remove from next upscale attempt) for a progressively
longer duration (up to a couple of scaledown cooldown duration + jitter)
all pods from workloads that didn't had new pods recently (all that
workload's pods are "old"), and were already considered several times
for upscale.
*/

package pods

import (
"math/rand"
"time"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/context"

apiv1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"
)

const maxDistinctWorkloadsHavingPendingPods = 50
// we won't quarantine a workload's pods until they all went
// through (were considered for upscales) minAttempts times
const minAttempts = 3

var now = time.Now // unit tests
var (
now = time.Now // unit tests
retriesIncrement = 2 * time.Minute
)

type pendingTracker struct {
firstSeen time.Time
lastTried time.Time
// last time we saw a new pod for that workload (or most recent
// pod's creation time, whichever is the most recent)
lastSeen time.Time
// deadline we won't attempt upscale for that workload's pods until reached
nextTry time.Time
// how many times that pods went through unfiltered upscale attempts
attempts int
}

type filterOutLongPending struct {
seen map[types.UID]*pendingTracker
seen map[types.UID]*pendingTracker
deadlineFunc func(int, time.Duration) time.Time
}

// NewFilterOutLongPending returns a processor slowing down retries on long pending pods
func NewFilterOutLongPending() *filterOutLongPending {
rand.Seed(time.Now().UnixNano())
return &filterOutLongPending{
seen: make(map[types.UID]*pendingTracker),
seen: make(map[types.UID]*pendingTracker),
deadlineFunc: buildDeadline,
}
}

Expand All @@ -58,45 +80,97 @@ func (p *filterOutLongPending) CleanUp() {}

// Process slows down upscales retries on long pending pods
func (p *filterOutLongPending) Process(ctx *context.AutoscalingContext, pending []*apiv1.Pod) ([]*apiv1.Pod, error) {
longPendingBackoff := ctx.AutoscalingOptions.ScaleDownDelayAfterAdd * 2

currentPods := make(map[types.UID]struct{}, len(pending))
coolDownDelay := ctx.AutoscalingOptions.ScaleDownDelayAfterAdd
allowedPods := make([]*apiv1.Pod, 0)

distinctWorkloadsCount := countDistinctOwnerReferences(pending)
if distinctWorkloadsCount > maxDistinctWorkloadsHavingPendingPods {
klog.Warningf("detected pending pods from too many (%d) distinct workloads:"+
" disabling backoff on long pending pods", distinctWorkloadsCount)
return pending, nil
}
refsToPods := make(map[types.UID][]*apiv1.Pod)

for _, pod := range pending {
currentPods[pod.UID] = struct{}{}
if _, found := p.seen[pod.UID]; !found {
p.seen[pod.UID] = &pendingTracker{
firstSeen: now(),
lastTried: now(),
refID := pod.GetUID()
controllerRef := metav1.GetControllerOf(pod)
if controllerRef != nil {
refID = controllerRef.UID
}

// ephemeral ownerrefs -> pods mapping
if _, ok := refsToPods[refID]; !ok {
refsToPods[refID] = make([]*apiv1.Pod, 0)
}
refsToPods[refID] = append(refsToPods[refID], pod)

// track upscale attempts for a given ownerref
if _, ok := p.seen[refID]; !ok {
p.seen[refID] = &pendingTracker{
lastSeen: now(),
}
}

if p.seen[pod.UID].firstSeen.Add(longPendingCutoff).Before(now()) {
deadline := p.seen[pod.UID].lastTried.Add(longPendingBackoff)
if deadline.After(now()) {
klog.Warningf("ignoring long pending pod %s/%s until %s",
pod.GetNamespace(), pod.GetName(), deadline)
continue
// reset counters whenever a workload has new pods
if p.seen[refID].lastSeen.Before(pod.GetCreationTimestamp().Time) {
p.seen[refID] = &pendingTracker{
lastSeen: pod.GetCreationTimestamp().Time,
}
}
p.seen[pod.UID].lastTried = now()
}

for ref, pods := range refsToPods {
// skip (don't attempt upscales for) quarantined pods
if p.seen[ref].nextTry.After(now()) {
logSkipped(ref, p.seen[ref], pods)
continue
}

allowedPods = append(allowedPods, pod)
// let autoscaler attempt upscales for non-quarantined pods
allowedPods = append(allowedPods, pods...)
p.seen[ref].attempts++

// don't quarantine pods until they had several upscale opportunities
if p.seen[ref].attempts < minAttempts {
continue
}

// don't quarantine workloads having recent pods
if p.seen[ref].lastSeen.Add(longPendingCutoff).After(now()) {
continue
}

// pending pods from that workload already had several upscale attempts
// opportunities, and aren't recent: will delay after current loop.
p.seen[ref].nextTry = p.deadlineFunc(p.seen[ref].attempts, coolDownDelay)
}

for uid := range p.seen {
if _, found := currentPods[uid]; !found {
delete(p.seen, uid)
// reap stale entries
for ref := range p.seen {
if _, ok := refsToPods[ref]; !ok {
delete(p.seen, ref)
}
}

return allowedPods, nil
}

func logSkipped(ref types.UID, tracker *pendingTracker, pods []*apiv1.Pod) {
klog.Warningf(
"ignoring %d long pending pod(s) (eg. %s/%s) "+
"from workload uid %q until %q (attempted %d times)",
len(pods), pods[0].GetNamespace(), pods[0].GetName(),
ref, tracker.nextTry, tracker.attempts,
)
}

func buildDeadline(attempts int, coolDownDelay time.Duration) time.Time {
increment := time.Duration(attempts-minAttempts) * retriesIncrement
delay := minDuration(increment, 2*coolDownDelay.Abs()) - jitterDuration(coolDownDelay.Abs())
return now().Add(delay.Abs())
}

func minDuration(a, b time.Duration) time.Duration {
if a <= b {
return a
}
return b
}

func jitterDuration(duration time.Duration) time.Duration {
jitter := time.Duration(rand.Int63n(int64(duration.Abs() + 1)))
return jitter - duration/2
}
Loading

0 comments on commit bf4afce

Please sign in to comment.