diff --git a/apis/apps/v1alpha1/uniteddeployment_types.go b/apis/apps/v1alpha1/uniteddeployment_types.go index 1e4c18c947..23cb711db8 100644 --- a/apis/apps/v1alpha1/uniteddeployment_types.go +++ b/apis/apps/v1alpha1/uniteddeployment_types.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "time" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -165,6 +167,10 @@ type Topology struct { // +patchStrategy=merge // +optional Subsets []Subset `json:"subsets,omitempty" patchStrategy:"merge" patchMergeKey:"name"` + + // ScheduleStrategy indicates the strategy the UnitedDeployment used to preform the schedule between each of subsets. + // +optional + ScheduleStrategy UnitedDeploymentScheduleStrategy `json:"scheduleStrategy,omitempty"` } // Subset defines the detail of a subset. @@ -218,6 +224,69 @@ type Subset struct { Patch runtime.RawExtension `json:"patch,omitempty"` } +// UnitedDeploymentScheduleStrategyType is a string enumeration type that enumerates +// all possible schedule strategies for the UnitedDeployment controller. +// +kubebuilder:validation:Enum=Adaptive;Fixed;"" +type UnitedDeploymentScheduleStrategyType string + +const ( + // AdaptiveUnitedDeploymentScheduleStrategyType represents that when a pod is stuck in the pending status and cannot + // be scheduled, allow it to be rescheduled to another subset. + AdaptiveUnitedDeploymentScheduleStrategyType UnitedDeploymentScheduleStrategyType = "Adaptive" + // FixedUnitedDeploymentScheduleStrategyType represents that pods are strictly scheduled to the selected subset + // even if scheduling fail. + FixedUnitedDeploymentScheduleStrategyType UnitedDeploymentScheduleStrategyType = "Fixed" +) + +const ( + DefaultRescheduleCriticalDuration = 30 * time.Second + DefaultUnschedulableStatusLastDuration = 300 * time.Second +) + +// AdaptiveUnitedDeploymentStrategy is used to communicate parameters when Type is AdaptiveUnitedDeploymentScheduleStrategyType. +type AdaptiveUnitedDeploymentStrategy struct { + // RescheduleCriticalSeconds indicates how long controller will reschedule a schedule failed Pod to the subset that has + // redundant capacity after the subset where the Pod lives. If a Pod was scheduled failed and still in an unschedulabe status + // over RescheduleCriticalSeconds duration, the controller will reschedule it to a suitable subset. Default is 30 seconds. + // +optional + RescheduleCriticalSeconds *int32 `json:"rescheduleCriticalSeconds,omitempty"` + + // UnschedulableLastSeconds is used to set the number of seconds for a Subset to recover from an unschedulable state, + // with a default value of 300 seconds. + // +optional + UnschedulableLastSeconds *int32 `json:"unschedulableLastSeconds,omitempty"` +} + +// UnitedDeploymentScheduleStrategy defines the schedule performance of UnitedDeployment. +type UnitedDeploymentScheduleStrategy struct { + // Type indicates the type of the UnitedDeploymentScheduleStrategy. + // Default is Fixed + // +optional + Type UnitedDeploymentScheduleStrategyType `json:"type,omitempty"` + + // Adaptive is used to communicate parameters when Type is AdaptiveUnitedDeploymentScheduleStrategyType. + // +optional + Adaptive *AdaptiveUnitedDeploymentStrategy `json:"adaptive,omitempty"` +} + +func (s *UnitedDeploymentScheduleStrategy) IsAdaptive() bool { + return s.Type == AdaptiveUnitedDeploymentScheduleStrategyType +} + +func (s *UnitedDeploymentScheduleStrategy) GetRescheduleCriticalDuration() time.Duration { + if s.Adaptive == nil || s.Adaptive.RescheduleCriticalSeconds == nil { + return DefaultRescheduleCriticalDuration + } + return time.Duration(*s.Adaptive.RescheduleCriticalSeconds) * time.Second +} + +func (s *UnitedDeploymentScheduleStrategy) GetUnschedulableLastDuration() time.Duration { + if s.Adaptive == nil || s.Adaptive.UnschedulableLastSeconds == nil { + return DefaultUnschedulableStatusLastDuration + } + return time.Duration(*s.Adaptive.UnschedulableLastSeconds) * time.Second +} + // UnitedDeploymentStatus defines the observed state of UnitedDeployment. type UnitedDeploymentStatus struct { // ObservedGeneration is the most recent generation observed for this UnitedDeployment. It corresponds to the @@ -252,6 +321,10 @@ type UnitedDeploymentStatus struct { // +optional SubsetReplicas map[string]int32 `json:"subsetReplicas,omitempty"` + // Record whether each subset is unschedulable. + // +optional + SubsetUnschedulable map[string]UnschedulableStatus `json:"subsetUnschedulable,omitempty"` + // Represents the latest available observations of a UnitedDeployment's current state. // +optional Conditions []UnitedDeploymentCondition `json:"conditions,omitempty"` @@ -278,7 +351,7 @@ type UnitedDeploymentCondition struct { // The reason for the condition's last transition. Reason string `json:"reason,omitempty"` - // A human readable message indicating details about the transition. + // A human-readable message indicating details about the transition. Message string `json:"message,omitempty"` } @@ -293,6 +366,12 @@ type UpdateStatus struct { CurrentPartitions map[string]int32 `json:"currentPartitions,omitempty"` } +type UnschedulableStatus struct { + Unschedulable bool `json:"unschedulable"` + UnschedulableTimestamp metav1.Time `json:"unschedulableTimestamp,omitempty"` + FailedPods int32 `json:"failedPods,omitempty"` +} + // +genclient // +genclient:method=GetScale,verb=get,subresource=scale,result=k8s.io/api/autoscaling/v1.Scale // +genclient:method=UpdateScale,verb=update,subresource=scale,input=k8s.io/api/autoscaling/v1.Scale,result=k8s.io/api/autoscaling/v1.Scale diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 9440ee2715..8062ca29cf 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -30,6 +30,31 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdaptiveUnitedDeploymentStrategy) DeepCopyInto(out *AdaptiveUnitedDeploymentStrategy) { + *out = *in + if in.RescheduleCriticalSeconds != nil { + in, out := &in.RescheduleCriticalSeconds, &out.RescheduleCriticalSeconds + *out = new(int32) + **out = **in + } + if in.UnschedulableLastSeconds != nil { + in, out := &in.UnschedulableLastSeconds, &out.UnschedulableLastSeconds + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdaptiveUnitedDeploymentStrategy. +func (in *AdaptiveUnitedDeploymentStrategy) DeepCopy() *AdaptiveUnitedDeploymentStrategy { + if in == nil { + return nil + } + out := new(AdaptiveUnitedDeploymentStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AdaptiveWorkloadSpreadStrategy) DeepCopyInto(out *AdaptiveWorkloadSpreadStrategy) { *out = *in @@ -3268,6 +3293,7 @@ func (in *Topology) DeepCopyInto(out *Topology) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + in.ScheduleStrategy.DeepCopyInto(&out.ScheduleStrategy) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Topology. @@ -3380,6 +3406,26 @@ func (in *UnitedDeploymentList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UnitedDeploymentScheduleStrategy) DeepCopyInto(out *UnitedDeploymentScheduleStrategy) { + *out = *in + if in.Adaptive != nil { + in, out := &in.Adaptive, &out.Adaptive + *out = new(AdaptiveUnitedDeploymentStrategy) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UnitedDeploymentScheduleStrategy. +func (in *UnitedDeploymentScheduleStrategy) DeepCopy() *UnitedDeploymentScheduleStrategy { + if in == nil { + return nil + } + out := new(UnitedDeploymentScheduleStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *UnitedDeploymentSpec) DeepCopyInto(out *UnitedDeploymentSpec) { *out = *in @@ -3428,6 +3474,13 @@ func (in *UnitedDeploymentStatus) DeepCopyInto(out *UnitedDeploymentStatus) { (*out)[key] = val } } + if in.SubsetUnschedulable != nil { + in, out := &in.SubsetUnschedulable, &out.SubsetUnschedulable + *out = make(map[string]UnschedulableStatus, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]UnitedDeploymentCondition, len(*in)) @@ -3492,6 +3545,22 @@ func (in *UnorderedUpdateStrategy) DeepCopy() *UnorderedUpdateStrategy { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UnschedulableStatus) DeepCopyInto(out *UnschedulableStatus) { + *out = *in + in.UnschedulableTimestamp.DeepCopyInto(&out.UnschedulableTimestamp) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UnschedulableStatus. +func (in *UnschedulableStatus) DeepCopy() *UnschedulableStatus { + if in == nil { + return nil + } + out := new(UnschedulableStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in UpdateScatterStrategy) DeepCopyInto(out *UpdateScatterStrategy) { { diff --git a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml index d15c53767a..badce5b76a 100644 --- a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml +++ b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml @@ -943,6 +943,38 @@ spec: description: Topology describes the pods distribution detail between each of subsets. properties: + scheduleStrategy: + description: ScheduleStrategy indicates the strategy the UnitedDeployment + used to preform the schedule between each of subsets. + properties: + adaptive: + description: Adaptive is used to communicate parameters when + Type is AdaptiveUnitedDeploymentScheduleStrategyType. + properties: + rescheduleCriticalSeconds: + description: |- + RescheduleCriticalSeconds indicates how long controller will reschedule a schedule failed Pod to the subset that has + redundant capacity after the subset where the Pod lives. If a Pod was scheduled failed and still in an unschedulabe status + over RescheduleCriticalSeconds duration, the controller will reschedule it to a suitable subset. Default is 30 seconds. + format: int32 + type: integer + unschedulableLastSeconds: + description: |- + UnschedulableLastSeconds is used to set the number of seconds for a Subset to recover from an unschedulable state, + with a default value of 300 seconds. + format: int32 + type: integer + type: object + type: + description: |- + Type indicates the type of the UnitedDeploymentScheduleStrategy. + Default is Fixed + enum: + - Adaptive + - Fixed + - "" + type: string + type: object subsets: description: |- Contains the details of each subset. Each element in this array represents one subset @@ -1161,7 +1193,7 @@ spec: format: date-time type: string message: - description: A human readable message indicating details about + description: A human-readable message indicating details about the transition. type: string reason: @@ -1204,6 +1236,22 @@ spec: description: Records the topology detail information of the replicas of each subset. type: object + subsetUnschedulable: + additionalProperties: + properties: + failedPods: + format: int32 + type: integer + unschedulable: + type: boolean + unschedulableTimestamp: + format: date-time + type: string + required: + - unschedulable + type: object + description: Record whether each subset is unschedulable. + type: object updateStatus: description: Records the information of update progress. properties: diff --git a/pkg/controller/uniteddeployment/adapter/adapter.go b/pkg/controller/uniteddeployment/adapter/adapter.go index d75c8c5e99..e2d66c5bc4 100644 --- a/pkg/controller/uniteddeployment/adapter/adapter.go +++ b/pkg/controller/uniteddeployment/adapter/adapter.go @@ -17,6 +17,7 @@ limitations under the License. package adapter import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -25,14 +26,15 @@ import ( ) type Adapter interface { - // NewResourceObject creates a empty subset object. + // NewResourceObject creates an empty subset object. NewResourceObject() client.Object - // NewResourceListObject creates a empty subset list object. + // NewResourceListObject creates an empty subset list object. NewResourceListObject() client.ObjectList // GetStatusObservedGeneration returns the observed generation of the subset. GetStatusObservedGeneration(subset metav1.Object) int64 // GetReplicaDetails returns the replicas information of the subset status. - GetReplicaDetails(subset metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) + GetReplicaDetails(subset metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, + statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) // GetSubsetFailure returns failure information of the subset. GetSubsetFailure() *string // ApplySubsetTemplate updates the subset to the latest revision. diff --git a/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go b/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go index 2242d2c751..24cd0b1d03 100644 --- a/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go @@ -63,9 +63,8 @@ func (a *AdvancedStatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Obje } // GetReplicaDetails returns the replicas detail the subset needs. -func (a *AdvancedStatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { +func (a *AdvancedStatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { set := obj.(*v1beta1.StatefulSet) - var pods []*corev1.Pod pods, err = a.getStatefulSetPods(set) if err != nil { return diff --git a/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go b/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go index bc80150858..a570d2b1cc 100644 --- a/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go @@ -41,12 +41,10 @@ func (a *CloneSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 { return obj.(*alpha1.CloneSet).Status.ObservedGeneration } -func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { +func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { set := obj.(*alpha1.CloneSet) - var pods []*corev1.Pod - pods, err = a.getCloneSetPods(set) if err != nil { diff --git a/pkg/controller/uniteddeployment/adapter/deployment_adapter.go b/pkg/controller/uniteddeployment/adapter/deployment_adapter.go index d0978952f9..1ed391c5c8 100644 --- a/pkg/controller/uniteddeployment/adapter/deployment_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/deployment_adapter.go @@ -58,12 +58,11 @@ func (a *DeploymentAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 } // GetReplicaDetails returns the replicas detail the subset needs. -func (a *DeploymentAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { +func (a *DeploymentAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { // Convert to Deployment Object set := obj.(*appsv1.Deployment) // Get all pods belonging to deployment - var pods []*corev1.Pod pods, err = a.getDeploymentPods(set) if err != nil { return diff --git a/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go b/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go index a31e2ad0b0..eb4761d862 100644 --- a/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go @@ -59,9 +59,8 @@ func (a *StatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int6 } // GetReplicaDetails returns the replicas detail the subset needs. -func (a *StatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { +func (a *StatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { set := obj.(*appsv1.StatefulSet) - var pods []*corev1.Pod pods, err = a.getStatefulSetPods(set) if err != nil { return diff --git a/pkg/controller/uniteddeployment/allocator.go b/pkg/controller/uniteddeployment/allocator.go index cf5a71ac47..1b97afe3a0 100644 --- a/pkg/controller/uniteddeployment/allocator.go +++ b/pkg/controller/uniteddeployment/allocator.go @@ -18,6 +18,7 @@ package uniteddeployment import ( "fmt" + "math" "sort" "strings" @@ -68,6 +69,34 @@ func NewReplicaAllocator(ud *appsv1alpha1.UnitedDeployment) ReplicaAllocator { return &specificAllocator{UnitedDeployment: ud} } +// safeReplicas refers to the number of Pods that an unschedulable subset can safely accommodate. +// Exceeding this number may lead to scheduling failures within that subset. +// This value is only effective in the Adaptive scheduling strategy. +func getSafeReplicasMap(nameToSubset *map[string]*Subset) map[string]int32 { + if nameToSubset == nil { + return nil + } + var result = make(map[string]int32) + for name, subset := range *nameToSubset { + if subset.Status.UnschedulableStatus.Unschedulable { + result[name] = subset.Status.Replicas - subset.Status.UnschedulableStatus.FailedPods + } + } + return result +} + +// get readyReplicas to prevent healthy Pods from being deleted. +func getReadyReplicasMap(nameToSubset *map[string]*Subset) map[string]int32 { + if nameToSubset == nil { + return nil + } + var result = make(map[string]int32) + for name, subset := range *nameToSubset { + result[name] = subset.Status.ReadyReplicas + } + return result +} + type specificAllocator struct { *appsv1alpha1.UnitedDeployment subsets *subsetInfos @@ -86,7 +115,7 @@ func (s *specificAllocator) Alloc(nameToSubset *map[string]*Subset) (*map[string expectedReplicas = *s.Spec.Replicas } - specifiedReplicas := getSpecifiedSubsetReplicas(expectedReplicas, s.UnitedDeployment) + specifiedReplicas := getSpecifiedSubsetReplicas(expectedReplicas, s.UnitedDeployment, nameToSubset) klog.V(4).InfoS("UnitedDeployment specifiedReplicas", "unitedDeployment", klog.KObj(s), "specifiedReplicas", specifiedReplicas) return s.AllocateReplicas(expectedReplicas, specifiedReplicas) } @@ -123,19 +152,28 @@ func (s *specificAllocator) validateReplicas(replicas int32, subsetReplicasLimit return nil } -func getSpecifiedSubsetReplicas(replicas int32, ud *appsv1alpha1.UnitedDeployment) *map[string]int32 { +func getSpecifiedSubsetReplicas(replicas int32, ud *appsv1alpha1.UnitedDeployment, nameToSubset *map[string]*Subset) *map[string]int32 { replicaLimits := map[string]int32{} if ud.Spec.Topology.Subsets == nil { return &replicaLimits } + safeReplicasMap := getSafeReplicasMap(nameToSubset) + for _, subsetDef := range ud.Spec.Topology.Subsets { if subsetDef.Replicas == nil { continue } if specifiedReplicas, err := ParseSubsetReplicas(replicas, *subsetDef.Replicas); err == nil { - replicaLimits[subsetDef.Name] = specifiedReplicas + limit := specifiedReplicas + if ud.Spec.Topology.ScheduleStrategy.IsAdaptive() { + // This means that in the Adaptive scheduling strategy, an unschedulable subset can only be scaled down, not scaled up. + if safeReplicas, ok := safeReplicasMap[subsetDef.Name]; ok { + limit = integer.Int32Min(safeReplicas, specifiedReplicas) + } + } + replicaLimits[subsetDef.Name] = limit } else { klog.ErrorS(err, "Failed to consider the replicas of subset when parsing replicaLimits during managing replicas of UnitedDeployment", "subsetName", subsetDef.Name, "unitedDeployment", klog.KObj(ud)) @@ -250,43 +288,56 @@ type elasticAllocator struct { // maxReplicas: nil # will be satisfied with 4th priority // // the results of map will be: {"subset-a": 3, "subset-b": 2} -func (ac *elasticAllocator) Alloc(_ *map[string]*Subset) (*map[string]int32, error) { +func (ac *elasticAllocator) Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error) { replicas := int32(1) if ac.Spec.Replicas != nil { replicas = *ac.Spec.Replicas } - minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas) + minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas, nameToSubset) if err != nil { return nil, err } return ac.alloc(replicas, minReplicasMap, maxReplicasMap), nil } -func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32) (map[string]int32, map[string]int32, error) { - totalMin, totalMax := int64(0), int64(0) +func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameToSubset *map[string]*Subset) (map[string]int32, map[string]int32, error) { numSubset := len(ac.Spec.Topology.Subsets) minReplicasMap := make(map[string]int32, numSubset) maxReplicasMap := make(map[string]int32, numSubset) + safeReplicasMap := getSafeReplicasMap(nameToSubset) + readyReplicasMap := getReadyReplicasMap(nameToSubset) for index, subset := range ac.Spec.Topology.Subsets { minReplicas := int32(0) + maxReplicas := int32(math.MaxInt32) if subset.MinReplicas != nil { minReplicas, _ = ParseSubsetReplicas(replicas, *subset.MinReplicas) } - totalMin += int64(minReplicas) - minReplicasMap[subset.Name] = minReplicas - - maxReplicas := int32(1000000) if subset.MaxReplicas != nil { maxReplicas, _ = ParseSubsetReplicas(replicas, *subset.MaxReplicas) } - totalMax += int64(maxReplicas) + if ac.Spec.Topology.ScheduleStrategy.IsAdaptive() { + // This means that in the Adaptive scheduling strategy, an unschedulable subset can only be scaled down, not scaled up. + if safeReplicas, ok := safeReplicasMap[subset.Name]; ok { + minReplicas = integer.Int32Min(safeReplicas, minReplicas) + maxReplicas = integer.Int32Min(safeReplicas, maxReplicas) + } + // To prevent healthy pod from being deleted + if readyReplicas := readyReplicasMap[subset.Name]; readyReplicas > minReplicas { + klog.InfoS("Assign min(readyReplicas, maxReplicas) to minReplicas to avoid deleting running pods", + "subset", subset.Name, "minReplicas", minReplicas, "readyReplicas", readyReplicas, "maxReplicas", maxReplicas) + minReplicas = integer.Int32Min(readyReplicas, maxReplicas) + } + } + + minReplicasMap[subset.Name] = minReplicas maxReplicasMap[subset.Name] = maxReplicas if minReplicas > maxReplicas { return nil, nil, fmt.Errorf("subset[%d].maxReplicas must be more than or equal to minReplicas", index) } } + klog.InfoS("elastic allocate maps calculated", "minReplicasMap", minReplicasMap, "maxReplicasMap", maxReplicasMap) return minReplicasMap, maxReplicasMap, nil } diff --git a/pkg/controller/uniteddeployment/allocator_test.go b/pkg/controller/uniteddeployment/allocator_test.go index cc50ab411b..91a94cec65 100644 --- a/pkg/controller/uniteddeployment/allocator_test.go +++ b/pkg/controller/uniteddeployment/allocator_test.go @@ -307,6 +307,140 @@ func TestCapacityAllocator(t *testing.T) { } } +func TestAdaptiveSpecificAllocator(t *testing.T) { + five, seven := intstr.FromInt32(5), int32(7) + ud := &appsv1alpha1.UnitedDeployment{ + Spec: appsv1alpha1.UnitedDeploymentSpec{ + Replicas: &seven, + Topology: appsv1alpha1.Topology{ + Subsets: []appsv1alpha1.Subset{ + { + Name: "subset-1", + Replicas: &five, + }, + { + Name: "subset-2", + }, + }, + ScheduleStrategy: appsv1alpha1.UnitedDeploymentScheduleStrategy{ + Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType, + }, + }, + }, + } + subsets := map[string]*Subset{ + "subset-1": { + Status: SubsetStatus{ + UnschedulableStatus: appsv1alpha1.UnschedulableStatus{ + Unschedulable: true, + FailedPods: 2, + }, + Replicas: 5, + }, + Spec: SubsetSpec{Replicas: 5}, + }, + } + if alloc, err := NewReplicaAllocator(ud).Alloc(&subsets); err != nil { + t.Fatalf("unexpected alloc error %v", err) + } else { + subset1Replicas, subset2Replicas := (*alloc)["subset-1"], (*alloc)["subset-2"] + if subset1Replicas != 3 || subset2Replicas != 4 { + t.Fatalf("unexpected alloc result subset1Replicas = %d not 3, subset2Replicas = %d not 4", + subset1Replicas, subset2Replicas) + } + } +} + +func TestAdaptiveElasticAllocator(t *testing.T) { + getUnitedDeploymentAndSubsets := func(totalReplicas, minReplicas, maxReplicas, failedPods, readyReplicas int32) ( + *appsv1alpha1.UnitedDeployment, map[string]*Subset) { + minR, maxR := intstr.FromInt32(minReplicas), intstr.FromInt32(maxReplicas) + return &appsv1alpha1.UnitedDeployment{ + Spec: appsv1alpha1.UnitedDeploymentSpec{ + Replicas: &totalReplicas, + Topology: appsv1alpha1.Topology{ + Subsets: []appsv1alpha1.Subset{ + { + Name: "subset-1", + MinReplicas: &minR, + MaxReplicas: &maxR, + }, + { + Name: "subset-2", + }, + }, + ScheduleStrategy: appsv1alpha1.UnitedDeploymentScheduleStrategy{ + Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType, + }, + }, + }, + }, map[string]*Subset{ + "subset-1": { + Status: SubsetStatus{ + UnschedulableStatus: appsv1alpha1.UnschedulableStatus{ + Unschedulable: true, + FailedPods: failedPods, + }, + Replicas: maxReplicas, + }, + Spec: SubsetSpec{Replicas: minReplicas}, + }, + "subset-2": { + Status: SubsetStatus{ + ReadyReplicas: readyReplicas, + }, + }, + } + } + cases := []struct { + totalReplicas, minReplicas, maxReplicas, failedPods, readyReplicas int32 + subset1Replicas, subset2Replicas int32 + }{ + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 5, + subset1Replicas: 0, subset2Replicas: 10, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 4, + subset1Replicas: 0, subset2Replicas: 10, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 3, + subset1Replicas: 1, subset2Replicas: 9, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 2, + subset1Replicas: 2, subset2Replicas: 8, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 1, + subset1Replicas: 3, subset2Replicas: 7, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, failedPods: 0, + subset1Replicas: 4, subset2Replicas: 6, + }, + { + totalReplicas: 10, maxReplicas: 4, readyReplicas: 7, + subset1Replicas: 3, subset2Replicas: 7, + }, + } + for _, testCase := range cases { + ud, subsets := getUnitedDeploymentAndSubsets( + testCase.totalReplicas, testCase.minReplicas, testCase.maxReplicas, testCase.failedPods, testCase.readyReplicas) + alloc, err := NewReplicaAllocator(ud).Alloc(&subsets) + if err != nil { + t.Fatalf("unexpected alloc error %v", err) + } else { + subset1Replicas, subset2Replicas := (*alloc)["subset-1"], (*alloc)["subset-2"] + if subset1Replicas != testCase.subset1Replicas || subset2Replicas != testCase.subset2Replicas { + t.Fatalf("subset1Replicas = %d, subset1Replicas = %d, test case is %v", + subset1Replicas, subset2Replicas, testCase) + } + } + } +} + func createSubset(name string, replicas int32) *nameToReplicas { return &nameToReplicas{ Replicas: replicas, diff --git a/pkg/controller/uniteddeployment/subset.go b/pkg/controller/uniteddeployment/subset.go index 79b8f006c6..478d5e5b28 100644 --- a/pkg/controller/uniteddeployment/subset.go +++ b/pkg/controller/uniteddeployment/subset.go @@ -17,6 +17,7 @@ limitations under the License. package uniteddeployment import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -36,6 +37,7 @@ type SubsetSpec struct { Replicas int32 UpdateStrategy SubsetUpdateStrategy SubsetRef ResourceRef + SubsetPods []*corev1.Pod } // SubsetStatus stores the observed state of the Subset. @@ -45,6 +47,7 @@ type SubsetStatus struct { ReadyReplicas int32 UpdatedReplicas int32 UpdatedReadyReplicas int32 + UnschedulableStatus appsv1alpha1.UnschedulableStatus } // SubsetUpdateStrategy stores the strategy detail of the Subset. @@ -72,7 +75,7 @@ type ControlInterface interface { CreateSubset(ud *appsv1alpha1.UnitedDeployment, unit string, revision string, replicas, partition int32) error // UpdateSubset updates the target subset with the input information. UpdateSubset(subSet *Subset, ud *appsv1alpha1.UnitedDeployment, revision string, replicas, partition int32) error - // UpdateSubset is used to delete the input subset. + // DeleteSubset is used to delete the input subset. DeleteSubset(*Subset) error // GetSubsetFailure extracts the subset failure message to expose on UnitedDeployment status. GetSubsetFailure(*Subset) *string diff --git a/pkg/controller/uniteddeployment/subset_control.go b/pkg/controller/uniteddeployment/subset_control.go index 9073125950..8a85a869e6 100644 --- a/pkg/controller/uniteddeployment/subset_control.go +++ b/pkg/controller/uniteddeployment/subset_control.go @@ -39,7 +39,7 @@ type SubsetControl struct { adapter adapter.Adapter } -// GetAllSubsets returns all of subsets owned by the UnitedDeployment. +// GetAllSubsets returns all subsets owned by the UnitedDeployment. func (m *SubsetControl) GetAllSubsets(ud *alpha1.UnitedDeployment, updatedRevision string) (subSets []*Subset, err error) { selector, err := metav1.LabelSelectorAsSelector(ud.Spec.Selector) if err != nil { @@ -156,7 +156,8 @@ func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision strin } subset.Spec.SubsetName = subSetName - specReplicas, specPartition, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas, err := m.adapter.GetReplicaDetails(set, updatedRevision) + specReplicas, specPartition, statusReplicas, + statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas, pods, err := m.adapter.GetReplicaDetails(set, updatedRevision) if err != nil { return subset, err } @@ -176,6 +177,7 @@ func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision strin subset.Status.UpdatedReadyReplicas = statusUpdatedReadyReplicas subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set) + subset.Spec.SubsetPods = pods return subset, nil } diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller.go b/pkg/controller/uniteddeployment/uniteddeployment_controller.go index a14f5269cd..41c1609263 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller.go @@ -21,10 +21,12 @@ import ( "flag" "fmt" "reflect" + "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -38,10 +40,12 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" "github.com/openkruise/kruise/pkg/controller/uniteddeployment/adapter" + utilcontroller "github.com/openkruise/kruise/pkg/controller/util" "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" "github.com/openkruise/kruise/pkg/util/ratelimiter" + "github.com/openkruise/kruise/pkg/util/requeueduration" ) func init() { @@ -51,16 +55,17 @@ func init() { var ( concurrentReconciles = 3 controllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("UnitedDeployment") + durationStore = requeueduration.DurationStore{} ) const ( controllerName = "uniteddeployment-controller" - eventTypeRevisionProvision = "RevisionProvision" - eventTypeFindSubsets = "FindSubsets" - eventTypeDupSubsetsDelete = "DeleteDuplicatedSubsets" - eventTypeSubsetsUpdate = "UpdateSubset" - eventTypeSpecifySubbsetReplicas = "SpecifySubsetReplicas" + eventTypeRevisionProvision = "RevisionProvision" + eventTypeFindSubsets = "FindSubsets" + eventTypeDupSubsetsDelete = "DeleteDuplicatedSubsets" + eventTypeSubsetsUpdate = "UpdateSubset" + eventTypeSpecifySubsetReplicas = "SpecifySubsetReplicas" slowStartInitialBatchSize = 1 ) @@ -210,7 +215,7 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci if err != nil { klog.ErrorS(err, "UnitedDeployment specified subset replicas is ineffective", "unitedDeployment", klog.KObj(instance)) r.recorder.Eventf(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed %s", - eventTypeSpecifySubbsetReplicas), "Specified subset replicas is ineffective: %s", err.Error()) + eventTypeSpecifySubsetReplicas), "Specified subset replicas is ineffective: %s", err.Error()) return reconcile.Result{}, err } @@ -233,9 +238,16 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci } newStatus.LabelSelector = selector.String() - return r.updateStatus(instance, newStatus, oldStatus, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, collisionCount, control) + requeueAfter := durationStore.Pop(getUnitedDeploymentKey(instance)) + if requeueAfter > 0 { + klog.InfoS("Requeue needed", "afterSeconds", requeueAfter.Seconds()) + } + return reconcile.Result{RequeueAfter: requeueAfter}, + r.updateStatus(instance, newStatus, oldStatus, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, collisionCount, control) } +// getNameToSubset fetches all subset workloads in cluster managed by this UnitedDeployment +// if adaptive scheduling strategy is used, existing subset unscheduable status will be set true here (newly created subsets are default false) func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.UnitedDeployment, control ControlInterface, expectedRevision string) (*map[string]*Subset, error) { subSets, err := control.GetAllSubsets(instance, expectedRevision) if err != nil { @@ -252,9 +264,58 @@ func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.Unite return nil, fmt.Errorf("fail to manage duplicate Subset of UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err) } + // If the Fixing scheduling strategy is used, the unschedulable state for all subsets remains false and + // the UnschedulableStatus of Subsets are not managed. + if instance.Spec.Topology.ScheduleStrategy.IsAdaptive() { + for name, subset := range *nameToSubset { + manageUnschedulableStatusForExistingSubset(name, subset, instance) + } + } + return nameToSubset, nil } +// manageUnschedulableStatusForExistingSubset manages subset unscheduable status and store them in the Subset.Status.UnschedulableStatus field. +func manageUnschedulableStatusForExistingSubset(name string, subset *Subset, ud *appsv1alpha1.UnitedDeployment) { + now := time.Now() + unitedDeploymentKey := getUnitedDeploymentKey(ud) + if status, ok := ud.Status.SubsetUnschedulable[name]; ok && status.Unschedulable { + // The unschedulable state of a subset lasts for at least 5 minutes. + // During this period, even if ReadyReplicas == Replicas, the subset is still unschedulable. + recoverTime := status.UnschedulableTimestamp.Add(ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration()) + klog.InfoS("existing unschedulable subset found", "subset", name, "recoverTime", recoverTime) + if now.Before(recoverTime) { + klog.InfoS("subset is still unschedulable", "subset", name) + subset.Status.UnschedulableStatus.Unschedulable = true + subset.Status.UnschedulableStatus.UnschedulableTimestamp = status.UnschedulableTimestamp + durationStore.Push(unitedDeploymentKey, recoverTime.Sub(now)) + } else { + klog.InfoS("unschedulable subset recovered", "subset", name) + } + } + // Maybe there exist some pending pods because the subset is unschedulable. + if subset.Status.ReadyReplicas < subset.Status.Replicas { + for _, pod := range subset.Spec.SubsetPods { + timeouted, checkAfter := utilcontroller.PodUnscheduledTimeout(pod, ud.Spec.Topology.ScheduleStrategy.GetRescheduleCriticalDuration()) + if timeouted { + subset.Status.UnschedulableStatus.FailedPods++ + } + if checkAfter > 0 { + durationStore.Push(unitedDeploymentKey, checkAfter) + } + } + if subset.Status.UnschedulableStatus.FailedPods > 0 { + klog.InfoS("subset has failed pods", "failedPods", subset.Status.UnschedulableStatus.FailedPods) + if !subset.Status.UnschedulableStatus.Unschedulable { + subset.Status.UnschedulableStatus.Unschedulable = true + subset.Status.UnschedulableStatus.UnschedulableTimestamp = metav1.Time{Time: time.Now()} + durationStore.Push(unitedDeploymentKey, ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration()) + } + } + } + klog.InfoS("unschedulable status", "subset", name, "unschedulableStatus", subset.Status.UnschedulableStatus) +} + func calcNextPartitions(ud *appsv1alpha1.UnitedDeployment, nextReplicas *map[string]int32) *map[string]int32 { partitions := map[string]int32{} for _, subset := range ud.Spec.Topology.Subsets { @@ -348,10 +409,10 @@ func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(ud *appsv1alpha1. return mapping } -func (r *ReconcileUnitedDeployment) updateStatus(instance *appsv1alpha1.UnitedDeployment, newStatus, oldStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) (reconcile.Result, error) { +func (r *ReconcileUnitedDeployment) updateStatus(instance *appsv1alpha1.UnitedDeployment, newStatus, oldStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) error { newStatus = r.calculateStatus(newStatus, nameToSubset, nextReplicas, nextPartition, currentRevision, updatedRevision, collisionCount, control) _, err := r.updateUnitedDeployment(instance, oldStatus, newStatus) - return reconcile.Result{}, err + return err } func (r *ReconcileUnitedDeployment) calculateStatus(newStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) *appsv1alpha1.UnitedDeploymentStatus { @@ -431,7 +492,8 @@ func (r *ReconcileUnitedDeployment) updateUnitedDeployment(ud *appsv1alpha1.Unit ud.Generation == newStatus.ObservedGeneration && reflect.DeepEqual(oldStatus.SubsetReplicas, newStatus.SubsetReplicas) && reflect.DeepEqual(oldStatus.UpdateStatus, newStatus.UpdateStatus) && - reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) { + reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) && + reflect.DeepEqual(oldStatus.SubsetUnschedulable, newStatus.SubsetUnschedulable) { return ud, nil } @@ -439,13 +501,14 @@ func (r *ReconcileUnitedDeployment) updateUnitedDeployment(ud *appsv1alpha1.Unit var getErr, updateErr error for i, obj := 0, ud; ; i++ { - klog.V(4).InfoS("Updating status", + klog.V(4).InfoS("updating UnitedDeployment status", "updateCount", i, "unitedDeployment", klog.KObj(obj), "replicasSpec", obj.Spec.Replicas, "oldReplicas", obj.Status.Replicas, "newReplicas", newStatus.Replicas, "readyReplicasSpec", obj.Spec.Replicas, "oldReadyReplicas", obj.Status.ReadyReplicas, "newReadyReplicas", newStatus.ReadyReplicas, "oldUpdatedReplicas", obj.Status.UpdatedReplicas, "newUpdatedReplicas", newStatus.UpdatedReplicas, "oldUpdatedReadyReplicas", obj.Status.UpdatedReadyReplicas, "newUpdatedReadyReplicas", newStatus.UpdatedReadyReplicas, "oldObservedGeneration", obj.Status.ObservedGeneration, "newObservedGeneration", newStatus.ObservedGeneration, + "oldSubsetUnschedulable", obj.Status.SubsetUnschedulable, "newSubsetUnschedulable", newStatus.SubsetUnschedulable, ) obj.Status = *newStatus @@ -467,3 +530,7 @@ func (r *ReconcileUnitedDeployment) updateUnitedDeployment(ud *appsv1alpha1.Unit klog.ErrorS(updateErr, "Failed to update UnitedDeployment status", "unitedDeployment", klog.KObj(ud)) return nil, updateErr } + +func getUnitedDeploymentKey(o *appsv1alpha1.UnitedDeployment) string { + return o.GetNamespace() + "/" + o.GetName() +} diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go b/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go index 6d52c50d65..7661bb69f5 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go @@ -18,6 +18,7 @@ package uniteddeployment import ( "testing" + "time" "github.com/onsi/gomega" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -95,7 +96,7 @@ func TestReconcile(t *testing.T) { }, } - // Setup the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a + // Set up the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a // channel when it is finished. mgr, err := manager.New(cfg, manager.Options{}) g.Expect(err).NotTo(gomega.HaveOccurred()) @@ -124,3 +125,81 @@ func TestReconcile(t *testing.T) { defer c.Delete(context.TODO(), instance) g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest))) } + +func TestUnschedulableStatusManagement(t *testing.T) { + g := gomega.NewGomegaWithT(t) + ud := &appsv1alpha1.UnitedDeployment{ + Spec: appsv1alpha1.UnitedDeploymentSpec{ + Topology: appsv1alpha1.Topology{ + ScheduleStrategy: appsv1alpha1.UnitedDeploymentScheduleStrategy{ + Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType, + }, + }, + }, + } + pod := corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodScheduled, + Status: corev1.ConditionFalse, + Reason: corev1.PodReasonUnschedulable, + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.NewTime(time.Now().Add(-15 * time.Second)), + }, + } + subset := &Subset{ + Status: SubsetStatus{ + ReadyReplicas: 0, + Replicas: 1, + }, + Spec: SubsetSpec{ + SubsetPods: []*corev1.Pod{&pod}, + }, + } + + // CASE1: Not timeouted yet + manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud) + g.Expect(subset.Status.UnschedulableStatus.FailedPods).To(gomega.BeEquivalentTo(0)) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.Satisfy(func(after time.Duration) bool { + return after < 15*time.Second && after > 14*time.Second + })) + + //// CASE2: Timeouted + pod.CreationTimestamp = metav1.NewTime(time.Now().Add(-31 * time.Second)) + manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud) + g.Expect(subset.Status.UnschedulableStatus.FailedPods).To(gomega.BeEquivalentTo(1)) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.BeEquivalentTo(appsv1alpha1.DefaultUnschedulableStatusLastDuration)) + + // CASE3: Unschedulable status + ud.Status.SubsetUnschedulable = map[string]appsv1alpha1.UnschedulableStatus{ + subset.Name: { + Unschedulable: true, + UnschedulableTimestamp: metav1.NewTime(time.Now().Add(-time.Minute)), + }, + } + subset.Status.ReadyReplicas = 1 + subset.Status.UnschedulableStatus.FailedPods = 0 + manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud) + g.Expect(g.Expect(subset.Status.UnschedulableStatus.FailedPods).To(gomega.BeEquivalentTo(0))) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.Satisfy(func(after time.Duration) bool { + return after < appsv1alpha1.DefaultUnschedulableStatusLastDuration-time.Minute && + after > 59*time.Second+appsv1alpha1.DefaultUnschedulableStatusLastDuration-2*time.Minute + })) + + // CASE4: Status Reset + ud.Status.SubsetUnschedulable = map[string]appsv1alpha1.UnschedulableStatus{ + subset.Name: { + Unschedulable: true, + UnschedulableTimestamp: metav1.NewTime(time.Now().Add(-time.Minute - appsv1alpha1.DefaultUnschedulableStatusLastDuration)), + }, + } + subset.Status.UnschedulableStatus.Unschedulable = false + g.Expect(g.Expect(subset.Status.UnschedulableStatus.FailedPods).To(gomega.BeEquivalentTo(0))) + g.Expect(subset.Status.UnschedulableStatus.Unschedulable).To(gomega.BeFalse()) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.BeEquivalentTo(0)) +} diff --git a/pkg/controller/uniteddeployment/uniteddeployment_update.go b/pkg/controller/uniteddeployment/uniteddeployment_update.go index 024945ce98..1668147ea4 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_update.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_update.go @@ -48,8 +48,10 @@ func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeploym } var needUpdate []string + newStatus.SubsetUnschedulable = make(map[string]appsv1alpha1.UnschedulableStatus) for _, name := range exists.List() { subset := (*nameToSubset)[name] + newStatus.SubsetUnschedulable[name] = subset.Status.UnschedulableStatus if r.subSetControls[subsetType].IsExpected(subset, expectedRevision.Name) || subset.Spec.Replicas != nextUpdate[name].Replicas || subset.Spec.UpdateStrategy.Partition != nextUpdate[name].Partition || @@ -58,6 +60,7 @@ func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeploym } } + var newPodCreated = false if len(needUpdate) > 0 { _, updateErr = util.SlowStartBatch(len(needUpdate), slowStartInitialBatchSize, func(index int) error { cell := needUpdate[index] @@ -72,6 +75,7 @@ func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeploym if updateSubsetErr != nil { r.recorder.Event(ud.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeSubsetsUpdate), fmt.Sprintf("Error updating PodSet (%s) %s when updating: %s", subsetType, subset.Name, updateSubsetErr)) } + newPodCreated = newPodCreated || subset.Spec.Replicas < replicas return updateSubsetErr }) } @@ -79,6 +83,11 @@ func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeploym if updateErr == nil { SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionTrue, "", "")) } else { + // If using an Adaptive scheduling strategy, when the subset is scaled out leading to the creation of new Pods, + // future potential scheduling failures need to be checked for rescheduling. + if strategy := ud.Spec.Topology.ScheduleStrategy; strategy.IsAdaptive() && newPodCreated { + durationStore.Push(getUnitedDeploymentKey(ud), strategy.GetRescheduleCriticalDuration()) + } SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionFalse, "Error", updateErr.Error())) } return @@ -132,6 +141,11 @@ func (r *ReconcileUnitedDeployment) manageSubsetProvision(ud *appsv1alpha1.Unite return nil }) if createdErr == nil { + // When a new subset is created, regardless of whether it contains newly created Pods, + // a requeue is triggered to treat it as an existing subset and update its unschedulable information. + if strategy := ud.Spec.Topology.ScheduleStrategy; strategy.IsAdaptive() { + durationStore.Push(getUnitedDeploymentKey(ud), strategy.GetRescheduleCriticalDuration()) + } r.recorder.Eventf(ud.DeepCopy(), corev1.EventTypeNormal, fmt.Sprintf("Successful%s", eventTypeSubsetsUpdate), "Create %d Subset (%s)", createdNum, subsetType) } else { errs = append(errs, createdErr) diff --git a/pkg/controller/util/pod_condition_utils.go b/pkg/controller/util/pod_condition_utils.go index 44fa65a7aa..4018951f39 100644 --- a/pkg/controller/util/pod_condition_utils.go +++ b/pkg/controller/util/pod_condition_utils.go @@ -2,6 +2,7 @@ package util import ( "encoding/json" + "time" v1 "k8s.io/api/core/v1" ) @@ -22,3 +23,25 @@ func UpdateMessageKvCondition(kv map[string]interface{}, condition *v1.PodCondit message, _ := json.Marshal(kv) condition.Message = string(message) } + +// PodUnscheduledTimeout return true when Pod was scheduled failed and timeout. +// nextCheckAfter > 0 means the pod is failed to schedule but not timeout yet. +func PodUnscheduledTimeout(pod *v1.Pod, timeout time.Duration) (timeouted bool, nextCheckAfter time.Duration) { + if pod.DeletionTimestamp != nil || pod.Status.Phase != v1.PodPending || pod.Spec.NodeName != "" { + return false, -1 + } + for _, condition := range pod.Status.Conditions { + if condition.Type == v1.PodScheduled && condition.Status == v1.ConditionFalse && + condition.Reason == v1.PodReasonUnschedulable { + currentTime := time.Now() + + expectSchedule := pod.CreationTimestamp.Add(timeout) + // schedule timeout + if expectSchedule.Before(currentTime) { + return true, -1 + } + return false, expectSchedule.Sub(currentTime) + } + } + return false, -1 +} diff --git a/pkg/controller/workloadspread/reschedule.go b/pkg/controller/workloadspread/reschedule.go index 810a357c47..b1868d7af3 100644 --- a/pkg/controller/workloadspread/reschedule.go +++ b/pkg/controller/workloadspread/reschedule.go @@ -20,6 +20,7 @@ import ( "context" "time" + "github.com/openkruise/kruise/pkg/controller/util" corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -111,26 +112,9 @@ func (r *ReconcileWorkloadSpread) deletePodsForSubset(ws *appsv1alpha1.WorkloadS // PodUnscheduledTimeout return true when Pod was scheduled failed and timeout. func PodUnscheduledTimeout(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod) bool { - if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodPending || pod.Spec.NodeName != "" { - return false + timeouted, nextCheckAfter := util.PodUnscheduledTimeout(pod, time.Second*time.Duration(*ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds)) + if nextCheckAfter > 0 { + durationStore.Push(getWorkloadSpreadKey(ws), nextCheckAfter) } - for _, condition := range pod.Status.Conditions { - if condition.Type == corev1.PodScheduled && condition.Status == corev1.ConditionFalse && - condition.Reason == corev1.PodReasonUnschedulable { - currentTime := time.Now() - rescheduleCriticalSeconds := *ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds - - expectSchedule := pod.CreationTimestamp.Add(time.Second * time.Duration(rescheduleCriticalSeconds)) - // schedule timeout - if expectSchedule.Before(currentTime) { - return true - } - - // no timeout, requeue key when expectSchedule is equal to time.Now() - durationStore.Push(getWorkloadSpreadKey(ws), expectSchedule.Sub(currentTime)) - - return false - } - } - return false + return timeouted } diff --git a/test/e2e/apps/uniteddeployment.go b/test/e2e/apps/uniteddeployment.go index 9d41cd9d01..adaeed7f8e 100644 --- a/test/e2e/apps/uniteddeployment.go +++ b/test/e2e/apps/uniteddeployment.go @@ -1,13 +1,20 @@ package apps import ( + "context" "fmt" + "time" "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" "github.com/openkruise/kruise/test/e2e/framework" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" clientset "k8s.io/client-go/kubernetes" + "k8s.io/utils/ptr" ) var _ = SIGDescribe("uniteddeployment", func() { @@ -77,4 +84,284 @@ var _ = SIGDescribe("uniteddeployment", func() { udManager.Scale(1) udManager.CheckSubsets(replicasMap([]int32{0, 0, 1})) }) + + ginkgo.It("adaptive united deployment with elastic allocator", func() { + replicas := func(r int) *intstr.IntOrString { p := intstr.FromInt32(int32(r)); return &p } + replicasMap := func(replicas []int32) map[string]int32 { + replicaMap := make(map[string]int32) + for i, r := range replicas { + replicaMap[fmt.Sprintf("subset-%d", i)] = r + } + return replicaMap + } + unschedulableMap := func(unschedulables []bool) map[string]bool { + resultMap := make(map[string]bool) + for i, r := range unschedulables { + resultMap[fmt.Sprintf("subset-%d", i)] = r + } + return resultMap + } + + udManager := tester.NewUnitedDeploymentManager("adaptive-ud-elastic-test") + // enable adaptive scheduling + udManager.UnitedDeployment.Spec.Topology.ScheduleStrategy = appsv1alpha1.UnitedDeploymentScheduleStrategy{ + Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType, + Adaptive: &appsv1alpha1.AdaptiveUnitedDeploymentStrategy{ + RescheduleCriticalSeconds: ptr.To(int32(10)), + UnschedulableLastSeconds: ptr.To(int32(10)), + }, + } + udManager.AddSubset("subset-0", nil, nil, replicas(2)) + udManager.AddSubset("subset-1", nil, nil, replicas(2)) + udManager.AddSubset("subset-2", nil, nil, nil) + // make subset-1 unschedulable + nodeKey := "ud-e2e/to-make-a-bad-subset-elastic" + udManager.UnitedDeployment.Spec.Topology.Subsets[1].NodeSelectorTerm = corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: nodeKey, + Operator: corev1.NodeSelectorOpExists, + }, + }, + } + + ginkgo.By("pulling busybox image") + udManager.EnsureImage() + + ginkgo.By("creating united deployment") + udManager.Spec.Replicas = ptr.To(int32(3)) + _, err := f.KruiseClientSet.AppsV1alpha1().UnitedDeployments(udManager.Namespace).Create(context.Background(), + udManager.UnitedDeployment, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("schedule start") + time.Sleep(2 * time.Second) // now 2s, reschedule 10s, recover 20s + udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 0})) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + fmt.Println() + + ginkgo.By("wait for rescheduling") + time.Sleep(12 * time.Second) // now 14s, reschedule 10s, recover 20s + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1})) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, true, false})) + fmt.Println() + + ginkgo.By("scale up while unschedulable") + udManager.Scale(4) + time.Sleep(time.Second) // now 15s, reschedule 10s, recover 20s + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 2})) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, true, false})) + fmt.Println() + + ginkgo.By("scale down while unschedulable") + udManager.Scale(3) + time.Sleep(time.Second) // now 16s, reschedule 10s, recover 20s + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1})) + fmt.Println() + + ginkgo.By("wait subset recovery") + time.Sleep(8 * time.Second) // now 24s, reschedule 10s, recover 20s + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1})) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + fmt.Println() + + ginkgo.By("scale up after recovery") + udManager.Scale(4) + time.Sleep(time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 1})) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + fmt.Println() + + ginkgo.By("scale down after recovery") + udManager.Scale(3) + time.Sleep(time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1})) // pods in subset-1 are not ready, deleted first + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + fmt.Println() + + ginkgo.By("create new subset") + udManager.AddSubset("subset-3", nil, replicas(2), nil) + udManager.Update() + time.Sleep(time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1, 0})) + fmt.Println() + + ginkgo.By("scale up to new subset (waiting final status after rescheduling)") + udManager.Scale(6) + time.Sleep(45 * time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 2, 2})) + fmt.Println() + + ginkgo.By("delete new subset (waiting final status after rescheduling)") + udManager.Spec.Topology.Subsets = udManager.Spec.Topology.Subsets[:3] + udManager.Update() + time.Sleep(10 * time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 4})) + fmt.Println() + + ginkgo.By("fix subset-1 and wait recover") + nodeList, err := f.ClientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + someNode := nodeList.Items[1] + someNode.Labels[nodeKey] = "haha" + _, err = f.ClientSet.CoreV1().Nodes().Update(context.Background(), &someNode, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second) + udManager.Scale(3) + time.Sleep(20 * time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1})) + fmt.Println() + + ginkgo.By("scale up after fixed") + udManager.Scale(5) + time.Sleep(time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{2, 2, 1})) + fmt.Println() + + ginkgo.By("scale down after fixed") + udManager.Scale(3) + time.Sleep(time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 0})) + fmt.Println() + }) + + ginkgo.It("adaptive united deployment with specific allocator", func() { + replicas := func(p string) *intstr.IntOrString { x := intstr.FromString(p); return &x } + replicasMap := func(replicas []int32) map[string]int32 { + replicaMap := make(map[string]int32) + for i, r := range replicas { + replicaMap[fmt.Sprintf("subset-%d", i)] = r + } + return replicaMap + } + unschedulableMap := func(unschedulables []bool) map[string]bool { + resultMap := make(map[string]bool) + for i, r := range unschedulables { + resultMap[fmt.Sprintf("subset-%d", i)] = r + } + return resultMap + } + + udManager := tester.NewUnitedDeploymentManager("adaptive-ud-specific-test") + // enable adaptive scheduling + udManager.UnitedDeployment.Spec.Topology.ScheduleStrategy = appsv1alpha1.UnitedDeploymentScheduleStrategy{ + Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType, + Adaptive: &appsv1alpha1.AdaptiveUnitedDeploymentStrategy{ + RescheduleCriticalSeconds: ptr.To(int32(10)), + UnschedulableLastSeconds: ptr.To(int32(10)), + }, + } + udManager.AddSubset("subset-0", replicas("25%"), nil, nil) + udManager.AddSubset("subset-1", replicas("25%"), nil, nil) + udManager.AddSubset("subset-2", nil, nil, nil) + // make subset-1 unschedulable + nodeKey := "ud-e2e/to-make-a-bad-subset-specific" + udManager.UnitedDeployment.Spec.Topology.Subsets[1].NodeSelectorTerm = corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: nodeKey, + Operator: corev1.NodeSelectorOpExists, + }, + }, + } + + ginkgo.By("pulling busybox image") + udManager.EnsureImage() + + ginkgo.By("creating united deployment") + udManager.Spec.Replicas = ptr.To(int32(4)) + _, err := f.KruiseClientSet.AppsV1alpha1().UnitedDeployments(udManager.Namespace).Create(context.Background(), + udManager.UnitedDeployment, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("schedule start") + time.Sleep(2 * time.Second) // now 2s, reschedule 10s, recover 20s + udManager.CheckSubsetPods(replicasMap([]int32{1, 1, 2})) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + fmt.Println() + + ginkgo.By("wait for rescheduling") + time.Sleep(12 * time.Second) // now 14s, reschedule 10s, recover 20s + udManager.CheckSubsetPods(replicasMap([]int32{1, 0, 3})) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, true, false})) + fmt.Println() + + ginkgo.By("scale up while unschedulable") + udManager.Scale(5) + time.Sleep(time.Second) // now 15s, reschedule 10s, recover 20s + udManager.CheckSubsetPods(replicasMap([]int32{1, 0, 4})) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, true, false})) + fmt.Println() + + ginkgo.By("scale down while unschedulable") + udManager.Scale(3) + time.Sleep(time.Second) // now 16s, reschedule 10s, recover 20s + udManager.CheckSubsetPods(replicasMap([]int32{1, 0, 2})) + fmt.Println() + + ginkgo.By("wait subset recovery") + time.Sleep(8 * time.Second) // now 24s, reschedule 10s, recover 20s + udManager.CheckSubsetPods(replicasMap([]int32{1, 0, 2})) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + fmt.Println() + + ginkgo.By("scale up after recovery") + udManager.Scale(4) + time.Sleep(time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{1, 1, 2})) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + fmt.Println() + + ginkgo.By("scale down after recovery") + udManager.Scale(3) + time.Sleep(time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{1, 1, 1})) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + fmt.Println() + + ginkgo.By("create new subset") + udManager.AddSubset("subset-3", replicas("25%"), nil, nil) + udManager.Update() + time.Sleep(time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{1, 1, 0, 1})) + fmt.Println() + + ginkgo.By("scale up to new subset (waiting final status after rescheduling)") + udManager.Scale(8) + time.Sleep(45 * time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 4, 2})) + fmt.Println() + + ginkgo.By("delete new subset (waiting final status after rescheduling)") + udManager.Spec.Topology.Subsets = udManager.Spec.Topology.Subsets[:3] + udManager.Update() + time.Sleep(10 * time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 6})) + fmt.Println() + + ginkgo.By("fix subset-1 and wait recover") + nodeList, err := f.ClientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + someNode := nodeList.Items[1] + someNode.Labels[nodeKey] = "haha" + _, err = f.ClientSet.CoreV1().Nodes().Update(context.Background(), &someNode, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second) + udManager.Scale(4) + time.Sleep(20 * time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{1, 1, 2})) // subset-0 scaled down to 1 for 1.5X overflow: 2 > 1.5 + fmt.Println() + + ginkgo.By("scale up after fixed") + udManager.Scale(8) + time.Sleep(time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{2, 2, 4})) + fmt.Println() + + ginkgo.By("scale down after fixed") + udManager.Scale(4) + time.Sleep(time.Second) + udManager.CheckSubsetPods(replicasMap([]int32{1, 1, 2})) + fmt.Println() + }) }) diff --git a/test/e2e/framework/uniteddeployment.go b/test/e2e/framework/uniteddeployment.go index 02bab7b662..2bdba10eb6 100644 --- a/test/e2e/framework/uniteddeployment.go +++ b/test/e2e/framework/uniteddeployment.go @@ -3,6 +3,9 @@ package framework import ( "context" "fmt" + "reflect" + "time" + "github.com/onsi/gomega" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" @@ -13,8 +16,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" clientset "k8s.io/client-go/kubernetes" "k8s.io/utils/pointer" - "reflect" - "time" ) type UnitedDeploymentTester struct { @@ -31,6 +32,8 @@ func NewUnitedDeploymentTester(c clientset.Interface, kc kruiseclientset.Interfa } } +var zero = int64(0) + func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *UnitedDeploymentManager { return &UnitedDeploymentManager{ UnitedDeployment: &appsv1alpha1.UnitedDeployment{ @@ -64,6 +67,7 @@ func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *United }, }, Spec: v1.PodSpec{ + TerminationGracePeriodSeconds: &zero, Containers: []v1.Container{ { Name: "busybox", @@ -81,12 +85,14 @@ func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *United }, }, kc: t.kc, + c: t.c, } } type UnitedDeploymentManager struct { *appsv1alpha1.UnitedDeployment kc kruiseclientset.Interface + c clientset.Interface } func (m *UnitedDeploymentManager) AddSubset(name string, replicas, minReplicas, maxReplicas *intstr.IntOrString) { @@ -111,6 +117,14 @@ func (m *UnitedDeploymentManager) Scale(replicas int32) { }, time.Minute, time.Second).Should(gomega.BeTrue()) } +func (m *UnitedDeploymentManager) Update() { + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + ud.Spec = m.UnitedDeployment.DeepCopy().Spec + _, err = m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Update(context.Background(), ud, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) +} + func (m *UnitedDeploymentManager) Create(replicas int32) { m.Spec.Replicas = pointer.Int32(replicas) _, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Create(context.TODO(), m.UnitedDeployment, metav1.CreateOptions{}) @@ -131,3 +145,53 @@ func (m *UnitedDeploymentManager) CheckSubsets(replicas map[string]int32) { return ud.GetGeneration() == ud.Status.ObservedGeneration && *ud.Spec.Replicas == ud.Status.Replicas && reflect.DeepEqual(replicas, ud.Status.SubsetReplicas) }, time.Minute, time.Second).Should(gomega.BeTrue()) } + +func (m *UnitedDeploymentManager) CheckSubsetPods(expect map[string]int32) { + fmt.Print("CheckSubsetPods ") + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func(g gomega.Gomega) { + actual := map[string]int32{} + for _, subset := range ud.Spec.Topology.Subsets { + podList, err := m.c.CoreV1().Pods(m.Namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("apps.kruise.io/subset-name=%s", subset.Name), + }) + g.Expect(err).NotTo(gomega.HaveOccurred()) + actual[subset.Name] = int32(len(podList.Items)) + } + g.Expect(expect).To(gomega.BeEquivalentTo(actual)) + }, time.Minute, 100*time.Millisecond).Should(gomega.Succeed()) + fmt.Println("pass") +} + +func (m *UnitedDeploymentManager) CheckUnschedulableStatus(expect map[string]bool) { + fmt.Print("CheckUnschedulableStatus ") + gomega.Eventually(func(g gomega.Gomega) { + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(gomega.HaveOccurred()) + actual := map[string]bool{} + for name := range expect { + actual[name] = ud.Status.SubsetUnschedulable[name].Unschedulable + } + g.Expect(expect).To(gomega.BeEquivalentTo(actual)) + }, time.Minute, 100*time.Millisecond).Should(gomega.Succeed()) + fmt.Println("pass") +} + +func (m *UnitedDeploymentManager) EnsureImage() { + job, err := m.kc.AppsV1alpha1().ImagePullJobs(m.Namespace).Create(context.Background(), &appsv1alpha1.ImagePullJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ud-test-" + m.Name, + Namespace: m.Namespace, + }, + Spec: appsv1alpha1.ImagePullJobSpec{ + Image: "busybox:1.32", + }, + }, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() bool { + job, err := m.kc.AppsV1alpha1().ImagePullJobs(m.Namespace).Get(context.Background(), job.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return !job.Status.CompletionTime.IsZero() && len(job.Status.FailedNodes) == 0 + }, time.Minute, time.Second).Should(gomega.BeTrue()) +}