Skip to content

Commit

Permalink
[release-1.14] Improve scheduler memory usage and remove scheduler wa…
Browse files Browse the repository at this point in the history
…its to speed up recovery time (#8203)

* Improve scheduler memory usage (#8144)

* Improve scheduler memory usage

- Create a namespaced-scoped statefulset lister instead of being
  cluster-wide
- Accept a PodLister rather than creating a cluster-wide one

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Update codegen

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Remove scheduler `wait`s to speed up recovery time (#8200)

Currently, the scheduler and autoscaler are single threads and use
a lock to prevent multiple scheduling and autoscaling decision
from happening in parallel; this is not a problem for our use
cases, however, the multiple `wait` currently present are slowing
down recovery time.

From my testing, if I delete and recreate the Kafka control plane
and data plane, without this patch it takes 1h to recover when there
are 400 triggers or 20 minutes when there are 100 triggers; with the
patch it is immediate (only a 2/3 minutes with 400 triggers).

- Remove `wait`s from state builder and autoscaler
- Add additional debug logs
- Use logger provided through the context as opposed to gloabal loggers
  in each individual component to preserve `knative/pkg` resource aware
  log keys.

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Sep 23, 2024
1 parent 3755819 commit 09cb633
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 229 deletions.
8 changes: 4 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) erro
// Scheduler is responsible for placing VPods into real Kubernetes pods
type Scheduler interface {
// Schedule computes the new set of placements for vpod.
Schedule(vpod VPod) ([]duckv1alpha1.Placement, error)
Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error)
}

// SchedulerFunc type is an adapter to allow the use of
// ordinary functions as Schedulers. If f is a function
// with the appropriate signature, SchedulerFunc(f) is a
// Scheduler that calls f.
type SchedulerFunc func(vpod VPod) ([]duckv1alpha1.Placement, error)
type SchedulerFunc func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error)

// Schedule implements the Scheduler interface.
func (f SchedulerFunc) Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) {
return f(vpod)
func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) {
return f(ctx, vpod)
}

// VPod represents virtual replicas placed into real Kubernetes pods
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package scheduler

import (
"context"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -28,12 +29,12 @@ func TestSchedulerFuncSchedule(t *testing.T) {

called := 0

var s Scheduler = SchedulerFunc(func(vpod VPod) ([]duckv1alpha1.Placement, error) {
var s Scheduler = SchedulerFunc(func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) {
called++
return nil, nil
})

_, err := s.Schedule(nil)
_, err := s.Schedule(context.Background(), nil)
require.Nil(t, err)
require.Equal(t, 1, called)
}
9 changes: 5 additions & 4 deletions pkg/scheduler/state/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"

"knative.dev/eventing/pkg/scheduler"
)

Expand Down Expand Up @@ -55,10 +56,10 @@ func SatisfyZoneAvailability(feasiblePods []int32, states *State) bool {
var zoneName string
var err error
for _, podID := range feasiblePods {
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID))
return err == nil, nil
})
zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID))
if err != nil {
continue
}
zoneMap[zoneName] = struct{}{}
}
return len(zoneMap) == int(states.NumZones)
Expand Down
95 changes: 44 additions & 51 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@ import (
"errors"
"math"
"strconv"
"time"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
corev1 "k8s.io/client-go/listers/core/v1"

"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/scheduler"
Expand All @@ -42,7 +39,7 @@ type StateAccessor interface {
// State returns the current state (snapshot) about placed vpods
// Take into account reserved vreplicas and update `reserved` to reflect
// the current state.
State(reserved map[types.NamespacedName]map[string]int32) (*State, error)
State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error)
}

// state provides information about the current scheduling of all vpods
Expand Down Expand Up @@ -152,8 +149,6 @@ func (s *State) IsSchedulablePod(ordinal int32) bool {

// stateBuilder reconstruct the state from scratch, by listing vpods
type stateBuilder struct {
ctx context.Context
logger *zap.SugaredLogger
vpodLister scheduler.VPodLister
capacity int32
schedulerPolicy scheduler.SchedulerPolicyType
Expand All @@ -166,11 +161,9 @@ type stateBuilder struct {
}

// NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested
func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor {
func NewStateBuilder(sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor {

return &stateBuilder{
ctx: ctx,
logger: logging.FromContext(ctx),
vpodLister: lister,
capacity: podCapacity,
schedulerPolicy: schedulerPolicy,
Expand All @@ -183,15 +176,18 @@ func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister sche
}
}

func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) (*State, error) {
func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) {
vpods, err := s.vpodLister()
if err != nil {
return nil, err
}

scale, err := s.statefulSetCache.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{})
logger := logging.FromContext(ctx).With("subcomponent", "statebuilder")
ctx = logging.WithLogger(ctx, logger)

scale, err := s.statefulSetCache.GetScale(ctx, s.statefulSetName, metav1.GetOptions{})
if err != nil {
s.logger.Infow("failed to get statefulset", zap.Error(err))
logger.Infow("failed to get statefulset", zap.Error(err))
return nil, err
}

Expand Down Expand Up @@ -235,36 +231,35 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}

for podId := int32(0); podId < scale.Spec.Replicas && s.podLister != nil; podId++ {
var pod *v1.Pod
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
pod, err = s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId))
return err == nil, nil
})

if pod != nil {
if isPodUnschedulable(pod) {
// Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod.
continue
}

node, err := s.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
return nil, err
}
pod, err := s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId))
if err != nil {
logger.Warnw("Failed to get pod", zap.Int32("ordinal", podId), zap.Error(err))
continue
}
if isPodUnschedulable(pod) {
// Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod.
logger.Debugw("Pod is unschedulable", zap.Any("pod", pod))
continue
}

if isNodeUnschedulable(node) {
// Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node.
continue
}
node, err := s.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
return nil, err
}

// Pod has no annotation or not annotated as unschedulable and
// not on an unschedulable node, so add to feasible
schedulablePods.Insert(podId)
if isNodeUnschedulable(node) {
// Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node.
logger.Debugw("Pod is on an unschedulable node", zap.Any("pod", node))
continue
}

// Pod has no annotation or not annotated as unschedulable and
// not on an unschedulable node, so add to feasible
schedulablePods.Insert(podId)
}

for _, p := range schedulablePods.List() {
free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
}

// Getting current state from existing placements for all vpods
Expand All @@ -286,15 +281,14 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
// Account for reserved vreplicas
vreplicas = withReserved(vpod.GetKey(), podName, vreplicas, reserved)

free, last = s.updateFreeCapacity(free, last, podName, vreplicas)
free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas)

withPlacement[vpod.GetKey()][podName] = true

var pod *v1.Pod
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
pod, err = s.podLister.Get(podName)
return err == nil, nil
})
pod, err := s.podLister.Get(podName)
if err != nil {
logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err))
}

if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) {
nodeName := pod.Spec.NodeName //node name for this pod
Expand All @@ -315,11 +309,10 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
continue
}

var pod *v1.Pod
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
pod, err = s.podLister.Get(podName)
return err == nil, nil
})
pod, err := s.podLister.Get(podName)
if err != nil {
logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err))
}

if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) {
nodeName := pod.Spec.NodeName //node name for this pod
Expand All @@ -330,15 +323,15 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}
}

free, last = s.updateFreeCapacity(free, last, podName, rvreplicas)
free, last = s.updateFreeCapacity(logger, free, last, podName, rvreplicas)
}
}

state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)),
SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister,
PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod}

s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved)))
logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved)))

return state, nil
}
Expand All @@ -350,7 +343,7 @@ func pendingFromVPod(vpod scheduler.VPod) int32 {
return int32(math.Max(float64(0), float64(expected-scheduled)))
}

func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
ordinal := OrdinalFromPodName(podName)
free = grow(free, ordinal, s.capacity)

Expand All @@ -359,7 +352,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri
// Assert the pod is not overcommitted
if free[ordinal] < 0 {
// This should not happen anymore. Log as an error but do not interrupt the current scheduling.
s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
}

if ordinal > last {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,8 @@ func TestStateBuilder(t *testing.T) {

scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5})

stateBuilder := NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)
state, err := stateBuilder.State(tc.reserved)
stateBuilder := NewStateBuilder(sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)
state, err := stateBuilder.State(ctx, tc.reserved)
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down
Loading

0 comments on commit 09cb633

Please sign in to comment.