Skip to content

Commit

Permalink
Merge pull request kubernetes#120871 from pohly/dra-unsuitable-nodes-…
Browse files Browse the repository at this point in the history
…selected-node

k8s.io/dynamic-resource-allocation: fix potential scheduling deadlock
  • Loading branch information
k8s-ci-robot authored Oct 10, 2023
2 parents 4a92b00 + 0ba37e7 commit 38c6bd8
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ type Driver interface {
// can be allocated for it (for example, two GPUs requested but
// the node only has one).
//
// The potentialNodes slice contains all potential nodes selected
// by the scheduler plus the selected node. The response must
// not contain any other nodes. Implementations do not have to
// care about size limits in the PodSchedulingContext status, the
// caller will handle that.
//
// The result of the check is in ClaimAllocation.UnsuitableNodes.
// An error indicates that the entire check must be repeated.
UnsuitableNodes(ctx context.Context, pod *v1.Pod, claims []*ClaimAllocation, potentialNodes []string) error
Expand Down Expand Up @@ -757,12 +763,20 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin
// and shouldn't, because those allocations might have to be undone to
// pick a better node. If we don't need to allocate now, then we'll
// simply report back the gather information.
//
// We shouldn't assume that the scheduler has included the selected node
// in the list of potential nodes. Usually it does, but let's make sure
// that we check it.
selectedNode := schedulingCtx.Spec.SelectedNode
potentialNodes := schedulingCtx.Spec.PotentialNodes
if selectedNode != "" && !hasString(potentialNodes, selectedNode) {
potentialNodes = append(potentialNodes, selectedNode)
}
if len(schedulingCtx.Spec.PotentialNodes) > 0 {
if err := ctrl.driver.UnsuitableNodes(ctx, pod, claims, schedulingCtx.Spec.PotentialNodes); err != nil {
if err := ctrl.driver.UnsuitableNodes(ctx, pod, claims, potentialNodes); err != nil {
return fmt.Errorf("checking potential nodes: %v", err)
}
}
selectedNode := schedulingCtx.Spec.SelectedNode
logger.V(5).Info("pending pod claims", "claims", claims, "selectedNode", selectedNode)
if selectedNode != "" {
unsuitable := false
Expand Down Expand Up @@ -816,12 +830,12 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin
schedulingCtx.Status.ResourceClaims = append(schedulingCtx.Status.ResourceClaims,
resourcev1alpha2.ResourceClaimSchedulingStatus{
Name: delayed.PodClaimName,
UnsuitableNodes: delayed.UnsuitableNodes,
UnsuitableNodes: truncateNodes(delayed.UnsuitableNodes, selectedNode),
})
modified = true
} else if stringsDiffer(schedulingCtx.Status.ResourceClaims[i].UnsuitableNodes, delayed.UnsuitableNodes) {
// Update existing entry.
schedulingCtx.Status.ResourceClaims[i].UnsuitableNodes = delayed.UnsuitableNodes
schedulingCtx.Status.ResourceClaims[i].UnsuitableNodes = truncateNodes(delayed.UnsuitableNodes, selectedNode)
modified = true
}
}
Expand All @@ -837,6 +851,23 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin
return errPeriodic
}

func truncateNodes(nodes []string, selectedNode string) []string {
// We might have checked "potential nodes + selected node" above, so
// this list might be too long by one element. When truncating it, make
// sure that the selected node is listed.
lenUnsuitable := len(nodes)
if lenUnsuitable > resourcev1alpha2.PodSchedulingNodeListMaxSize {
if nodes[0] == selectedNode {
// Truncate at the end and keep selected node in the first element.
nodes = nodes[0 : lenUnsuitable-1]
} else {
// Truncate at the front, it's not the selected node.
nodes = nodes[1:lenUnsuitable]
}
}
return nodes
}

type claimAllocations []*ClaimAllocation

// MarshalLog replaces the pointers with the actual structs because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -64,6 +65,10 @@ func TestController(t *testing.T) {
otherNodeName := "worker-2"
unsuitableNodes := []string{otherNodeName}
potentialNodes := []string{nodeName, otherNodeName}
maxNodes := make([]string, resourcev1alpha2.PodSchedulingNodeListMaxSize)
for i := range maxNodes {
maxNodes[i] = fmt.Sprintf("node-%d", i)
}
withDeletionTimestamp := func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
var deleted metav1.Time
claim = claim.DeepCopy()
Expand Down Expand Up @@ -101,18 +106,24 @@ func TestController(t *testing.T) {
podSchedulingCtx.Spec.SelectedNode = nodeName
return podSchedulingCtx
}
withUnsuitableNodes := func(podSchedulingCtx *resourcev1alpha2.PodSchedulingContext) *resourcev1alpha2.PodSchedulingContext {
withSpecificUnsuitableNodes := func(podSchedulingCtx *resourcev1alpha2.PodSchedulingContext, unsuitableNodes []string) *resourcev1alpha2.PodSchedulingContext {
podSchedulingCtx = podSchedulingCtx.DeepCopy()
podSchedulingCtx.Status.ResourceClaims = append(podSchedulingCtx.Status.ResourceClaims,
resourcev1alpha2.ResourceClaimSchedulingStatus{Name: podClaimName, UnsuitableNodes: unsuitableNodes},
)
return podSchedulingCtx
}
withPotentialNodes := func(podSchedulingCtx *resourcev1alpha2.PodSchedulingContext) *resourcev1alpha2.PodSchedulingContext {
withUnsuitableNodes := func(podSchedulingCtx *resourcev1alpha2.PodSchedulingContext) *resourcev1alpha2.PodSchedulingContext {
return withSpecificUnsuitableNodes(podSchedulingCtx, unsuitableNodes)
}
withSpecificPotentialNodes := func(podSchedulingCtx *resourcev1alpha2.PodSchedulingContext, potentialNodes []string) *resourcev1alpha2.PodSchedulingContext {
podSchedulingCtx = podSchedulingCtx.DeepCopy()
podSchedulingCtx.Spec.PotentialNodes = potentialNodes
return podSchedulingCtx
}
withPotentialNodes := func(podSchedulingCtx *resourcev1alpha2.PodSchedulingContext) *resourcev1alpha2.PodSchedulingContext {
return withSpecificPotentialNodes(podSchedulingCtx, potentialNodes)
}

var m mockDriver

Expand Down Expand Up @@ -376,6 +387,48 @@ func TestController(t *testing.T) {
expectedSchedulingCtx: withUnsuitableNodes(withSelectedNode(withPotentialNodes(podSchedulingCtx))),
expectedError: errPeriodic.Error(),
},
// pod with delayed allocation, potential nodes, selected node, all unsuitable -> update unsuitable nodes
"pod-selected-is-potential-node": {
key: podKey,
classes: classes,
claim: delayedClaim,
expectedClaim: delayedClaim,
pod: podWithClaim,
schedulingCtx: withPotentialNodes(withSelectedNode(withPotentialNodes(podSchedulingCtx))),
driver: m.expectClassParameters(map[string]interface{}{className: 1}).
expectClaimParameters(map[string]interface{}{claimName: 2}).
expectUnsuitableNodes(map[string][]string{podClaimName: potentialNodes}, nil),
expectedSchedulingCtx: withSpecificUnsuitableNodes(withSelectedNode(withPotentialNodes(podSchedulingCtx)), potentialNodes),
expectedError: errPeriodic.Error(),
},
// pod with delayed allocation, max potential nodes, other selected node, all unsuitable -> update unsuitable nodes with truncation at start
"pod-selected-is-potential-node-truncate-first": {
key: podKey,
classes: classes,
claim: delayedClaim,
expectedClaim: delayedClaim,
pod: podWithClaim,
schedulingCtx: withSpecificPotentialNodes(withSelectedNode(withSpecificPotentialNodes(podSchedulingCtx, maxNodes)), maxNodes),
driver: m.expectClassParameters(map[string]interface{}{className: 1}).
expectClaimParameters(map[string]interface{}{claimName: 2}).
expectUnsuitableNodes(map[string][]string{podClaimName: append(maxNodes, nodeName)}, nil),
expectedSchedulingCtx: withSpecificUnsuitableNodes(withSelectedNode(withSpecificPotentialNodes(podSchedulingCtx, maxNodes)), append(maxNodes[1:], nodeName)),
expectedError: errPeriodic.Error(),
},
// pod with delayed allocation, max potential nodes, other selected node, all unsuitable (but in reverse order) -> update unsuitable nodes with truncation at end
"pod-selected-is-potential-node-truncate-last": {
key: podKey,
classes: classes,
claim: delayedClaim,
expectedClaim: delayedClaim,
pod: podWithClaim,
schedulingCtx: withSpecificPotentialNodes(withSelectedNode(withSpecificPotentialNodes(podSchedulingCtx, maxNodes)), maxNodes),
driver: m.expectClassParameters(map[string]interface{}{className: 1}).
expectClaimParameters(map[string]interface{}{claimName: 2}).
expectUnsuitableNodes(map[string][]string{podClaimName: append([]string{nodeName}, maxNodes...)}, nil),
expectedSchedulingCtx: withSpecificUnsuitableNodes(withSelectedNode(withSpecificPotentialNodes(podSchedulingCtx, maxNodes)), append([]string{nodeName}, maxNodes[:len(maxNodes)-1]...)),
expectedError: errPeriodic.Error(),
},
} {
t.Run(name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
Expand Down

0 comments on commit 38c6bd8

Please sign in to comment.