diff --git a/apis/apps/v1alpha1/uniteddeployment_types.go b/apis/apps/v1alpha1/uniteddeployment_types.go index 1e4c18c947..51dddea478 100644 --- a/apis/apps/v1alpha1/uniteddeployment_types.go +++ b/apis/apps/v1alpha1/uniteddeployment_types.go @@ -17,13 +17,14 @@ limitations under the License. package v1alpha1 import ( + "time" + + "github.com/openkruise/kruise/apis/apps/v1beta1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" - - "github.com/openkruise/kruise/apis/apps/v1beta1" ) // UpdateStrategyType is a string enumeration type that enumerates @@ -165,6 +166,10 @@ type Topology struct { // +patchStrategy=merge // +optional Subsets []Subset `json:"subsets,omitempty" patchStrategy:"merge" patchMergeKey:"name"` + + // ScheduleStrategy indicates the strategy the UnitedDeployment used to preform the schedule between each of subsets. + // +optional + ScheduleStrategy UnitedDeploymentScheduleStrategy `json:"scheduleStrategy,omitempty"` } // Subset defines the detail of a subset. @@ -218,6 +223,69 @@ type Subset struct { Patch runtime.RawExtension `json:"patch,omitempty"` } +// UnitedDeploymentScheduleStrategyType is a string enumeration type that enumerates +// all possible schedule strategies for the UnitedDeployment controller. +// +kubebuilder:validation:Enum=Adaptive;Fixed;"" +type UnitedDeploymentScheduleStrategyType string + +const ( + // AdaptiveUnitedDeploymentScheduleStrategyType represents that when a pod is stuck in the pending status and cannot + // be scheduled, allow it to be rescheduled to another subset. + AdaptiveUnitedDeploymentScheduleStrategyType UnitedDeploymentScheduleStrategyType = "Adaptive" + // FixedUnitedDeploymentScheduleStrategyType represents that pods are strictly scheduled to the selected subset + // even if scheduling fail. + FixedUnitedDeploymentScheduleStrategyType UnitedDeploymentScheduleStrategyType = "Fixed" +) + +const ( + DefaultRescheduleCriticalDuration = 30 * time.Second + DefaultUnschedulableStatusLastDuration = 300 * time.Second +) + +// AdaptiveUnitedDeploymentStrategy is used to communicate parameters when Type is AdaptiveUnitedDeploymentScheduleStrategyType. +type AdaptiveUnitedDeploymentStrategy struct { + // RescheduleCriticalSeconds indicates how long controller will reschedule a schedule failed Pod to the subset that has + // redundant capacity after the subset where the Pod lives. If a Pod was scheduled failed and still in an unschedulabe status + // over RescheduleCriticalSeconds duration, the controller will reschedule it to a suitable subset. Default is 30 seconds. + // +optional + RescheduleCriticalSeconds *int32 `json:"rescheduleCriticalSeconds,omitempty"` + + // UnschedulableLastSeconds is used to set the number of seconds for a Subset to recover from an unschedulable state, + // with a default value of 300 seconds. + // +optional + UnschedulableLastSeconds *int32 `json:"unschedulableLastSeconds,omitempty"` +} + +// UnitedDeploymentScheduleStrategy defines the schedule performance of UnitedDeployment. +type UnitedDeploymentScheduleStrategy struct { + // Type indicates the type of the UnitedDeploymentScheduleStrategy. + // Default is Fixed + // +optional + Type UnitedDeploymentScheduleStrategyType `json:"type,omitempty"` + + // Adaptive is used to communicate parameters when Type is AdaptiveUnitedDeploymentScheduleStrategyType. + // +optional + Adaptive *AdaptiveUnitedDeploymentStrategy `json:"adaptive,omitempty"` +} + +func (s *UnitedDeploymentScheduleStrategy) IsAdaptive() bool { + return s.Type == AdaptiveUnitedDeploymentScheduleStrategyType +} + +func (s *UnitedDeploymentScheduleStrategy) GetRescheduleCriticalDuration() time.Duration { + if s.Adaptive == nil || s.Adaptive.RescheduleCriticalSeconds == nil { + return DefaultRescheduleCriticalDuration + } + return time.Duration(*s.Adaptive.RescheduleCriticalSeconds) * time.Second +} + +func (s *UnitedDeploymentScheduleStrategy) GetUnschedulableLastDuration() time.Duration { + if s.Adaptive == nil || s.Adaptive.UnschedulableLastSeconds == nil { + return DefaultUnschedulableStatusLastDuration + } + return time.Duration(*s.Adaptive.UnschedulableLastSeconds) * time.Second +} + // UnitedDeploymentStatus defines the observed state of UnitedDeployment. type UnitedDeploymentStatus struct { // ObservedGeneration is the most recent generation observed for this UnitedDeployment. It corresponds to the @@ -252,6 +320,8 @@ type UnitedDeploymentStatus struct { // +optional SubsetReplicas map[string]int32 `json:"subsetReplicas,omitempty"` + // Record the conditions of each subset. + SubsetStatuses []UnitedDeploymentSubsetStatus `json:"subsetStatuses,omitempty"` // Represents the latest available observations of a UnitedDeployment's current state. // +optional Conditions []UnitedDeploymentCondition `json:"conditions,omitempty"` @@ -264,6 +334,16 @@ type UnitedDeploymentStatus struct { LabelSelector string `json:"labelSelector,omitempty"` } +func (s *UnitedDeploymentStatus) GetSubsetStatus(subset string) *UnitedDeploymentSubsetStatus { + for i, subsetStatus := range s.SubsetStatuses { + if subsetStatus.Name == subset { + return &s.SubsetStatuses[i] + } + } + s.SubsetStatuses = append(s.SubsetStatuses, UnitedDeploymentSubsetStatus{Name: subset}) + return &s.SubsetStatuses[len(s.SubsetStatuses)-1] +} + // UnitedDeploymentCondition describes current state of a UnitedDeployment. type UnitedDeploymentCondition struct { // Type of in place set condition. @@ -278,7 +358,7 @@ type UnitedDeploymentCondition struct { // The reason for the condition's last transition. Reason string `json:"reason,omitempty"` - // A human readable message indicating details about the transition. + // A human-readable message indicating details about the transition. Message string `json:"message,omitempty"` } @@ -293,6 +373,62 @@ type UpdateStatus struct { CurrentPartitions map[string]int32 `json:"currentPartitions,omitempty"` } +type UnitedDeploymentSubsetStatus struct { + // Subset name specified in Topology.Subsets + Name string `json:"name,omitempty"` + // Recores the current replicas. Currently unused. + Replicas int32 `json:"replicas,omitempty"` + // Records the current partition. Currently unused. + Partition int32 `json:"partition,omitempty"` + // Conditions is an array of current observed subset conditions. + Conditions []UnitedDeploymentSubsetCondition `json:"conditions,omitempty"` +} + +func (s *UnitedDeploymentSubsetStatus) GetCondition(condType UnitedDeploymentSubsetConditionType) *UnitedDeploymentSubsetCondition { + for _, condition := range s.Conditions { + if condition.Type == condType { + return &condition + } + } + return nil +} + +func (s *UnitedDeploymentSubsetStatus) SetCondition(condType UnitedDeploymentSubsetConditionType, status corev1.ConditionStatus, reason, message string) { + var currentCond *UnitedDeploymentSubsetCondition + for i, c := range s.Conditions { + if c.Type == condType { + currentCond = &s.Conditions[i] + break + } + } + if currentCond != nil && currentCond.Status == status && currentCond.Reason == reason { + return + } + if currentCond == nil { + s.Conditions = append(s.Conditions, UnitedDeploymentSubsetCondition{Type: condType}) + currentCond = &s.Conditions[len(s.Conditions)-1] + } + currentCond.LastTransitionTime = metav1.Now() + currentCond.Status = status + currentCond.Reason = reason + currentCond.Message = message +} + +type UnitedDeploymentSubsetConditionType string + +const ( + // UnitedDeploymentSubsetSchedulable means new pods allocated into the subset will keep pending. + UnitedDeploymentSubsetSchedulable UnitedDeploymentSubsetConditionType = "Schedulable" +) + +type UnitedDeploymentSubsetCondition struct { + Type UnitedDeploymentSubsetConditionType `json:"type"` + Status corev1.ConditionStatus `json:"status"` + LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` + Reason string `json:"reason,omitempty"` + Message string `json:"message,omitempty"` +} + // +genclient // +genclient:method=GetScale,verb=get,subresource=scale,result=k8s.io/api/autoscaling/v1.Scale // +genclient:method=UpdateScale,verb=update,subresource=scale,input=k8s.io/api/autoscaling/v1.Scale,result=k8s.io/api/autoscaling/v1.Scale diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 9440ee2715..cd443d3fc1 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -30,6 +30,31 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdaptiveUnitedDeploymentStrategy) DeepCopyInto(out *AdaptiveUnitedDeploymentStrategy) { + *out = *in + if in.RescheduleCriticalSeconds != nil { + in, out := &in.RescheduleCriticalSeconds, &out.RescheduleCriticalSeconds + *out = new(int32) + **out = **in + } + if in.UnschedulableLastSeconds != nil { + in, out := &in.UnschedulableLastSeconds, &out.UnschedulableLastSeconds + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdaptiveUnitedDeploymentStrategy. +func (in *AdaptiveUnitedDeploymentStrategy) DeepCopy() *AdaptiveUnitedDeploymentStrategy { + if in == nil { + return nil + } + out := new(AdaptiveUnitedDeploymentStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AdaptiveWorkloadSpreadStrategy) DeepCopyInto(out *AdaptiveWorkloadSpreadStrategy) { *out = *in @@ -3268,6 +3293,7 @@ func (in *Topology) DeepCopyInto(out *Topology) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + in.ScheduleStrategy.DeepCopyInto(&out.ScheduleStrategy) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Topology. @@ -3380,6 +3406,26 @@ func (in *UnitedDeploymentList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UnitedDeploymentScheduleStrategy) DeepCopyInto(out *UnitedDeploymentScheduleStrategy) { + *out = *in + if in.Adaptive != nil { + in, out := &in.Adaptive, &out.Adaptive + *out = new(AdaptiveUnitedDeploymentStrategy) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UnitedDeploymentScheduleStrategy. +func (in *UnitedDeploymentScheduleStrategy) DeepCopy() *UnitedDeploymentScheduleStrategy { + if in == nil { + return nil + } + out := new(UnitedDeploymentScheduleStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *UnitedDeploymentSpec) DeepCopyInto(out *UnitedDeploymentSpec) { *out = *in @@ -3428,6 +3474,13 @@ func (in *UnitedDeploymentStatus) DeepCopyInto(out *UnitedDeploymentStatus) { (*out)[key] = val } } + if in.SubsetStatuses != nil { + in, out := &in.SubsetStatuses, &out.SubsetStatuses + *out = make([]UnitedDeploymentSubsetStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]UnitedDeploymentCondition, len(*in)) @@ -3452,6 +3505,44 @@ func (in *UnitedDeploymentStatus) DeepCopy() *UnitedDeploymentStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UnitedDeploymentSubsetCondition) DeepCopyInto(out *UnitedDeploymentSubsetCondition) { + *out = *in + in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UnitedDeploymentSubsetCondition. +func (in *UnitedDeploymentSubsetCondition) DeepCopy() *UnitedDeploymentSubsetCondition { + if in == nil { + return nil + } + out := new(UnitedDeploymentSubsetCondition) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UnitedDeploymentSubsetStatus) DeepCopyInto(out *UnitedDeploymentSubsetStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]UnitedDeploymentSubsetCondition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UnitedDeploymentSubsetStatus. +func (in *UnitedDeploymentSubsetStatus) DeepCopy() *UnitedDeploymentSubsetStatus { + if in == nil { + return nil + } + out := new(UnitedDeploymentSubsetStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *UnitedDeploymentUpdateStrategy) DeepCopyInto(out *UnitedDeploymentUpdateStrategy) { *out = *in diff --git a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml index ccff1c0ea8..3595596280 100644 --- a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml +++ b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml @@ -955,6 +955,38 @@ spec: description: Topology describes the pods distribution detail between each of subsets. properties: + scheduleStrategy: + description: ScheduleStrategy indicates the strategy the UnitedDeployment + used to preform the schedule between each of subsets. + properties: + adaptive: + description: Adaptive is used to communicate parameters when + Type is AdaptiveUnitedDeploymentScheduleStrategyType. + properties: + rescheduleCriticalSeconds: + description: |- + RescheduleCriticalSeconds indicates how long controller will reschedule a schedule failed Pod to the subset that has + redundant capacity after the subset where the Pod lives. If a Pod was scheduled failed and still in an unschedulabe status + over RescheduleCriticalSeconds duration, the controller will reschedule it to a suitable subset. Default is 30 seconds. + format: int32 + type: integer + unschedulableLastSeconds: + description: |- + UnschedulableLastSeconds is used to set the number of seconds for a Subset to recover from an unschedulable state, + with a default value of 300 seconds. + format: int32 + type: integer + type: object + type: + description: |- + Type indicates the type of the UnitedDeploymentScheduleStrategy. + Default is Fixed + enum: + - Adaptive + - Fixed + - "" + type: string + type: object subsets: description: |- Contains the details of each subset. Each element in this array represents one subset @@ -1173,7 +1205,7 @@ spec: format: date-time type: string message: - description: A human readable message indicating details about + description: A human-readable message indicating details about the transition. type: string reason: @@ -1216,6 +1248,44 @@ spec: description: Records the topology detail information of the replicas of each subset. type: object + subsetStatuses: + description: Record the conditions of each subset. + items: + properties: + conditions: + description: Conditions is an array of current observed subset + conditions. + items: + properties: + lastTransitionTime: + format: date-time + type: string + message: + type: string + reason: + type: string + status: + type: string + type: + type: string + required: + - status + - type + type: object + type: array + name: + description: Subset name specified in Topology.Subsets + type: string + partition: + description: Records the current partition. Currently unused. + format: int32 + type: integer + replicas: + description: Recores the current replicas. Currently unused. + format: int32 + type: integer + type: object + type: array updateStatus: description: Records the information of update progress. properties: diff --git a/pkg/controller/uniteddeployment/adapter/adapter.go b/pkg/controller/uniteddeployment/adapter/adapter.go index d75c8c5e99..12e3a44207 100644 --- a/pkg/controller/uniteddeployment/adapter/adapter.go +++ b/pkg/controller/uniteddeployment/adapter/adapter.go @@ -17,6 +17,7 @@ limitations under the License. package adapter import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -25,14 +26,22 @@ import ( ) type Adapter interface { - // NewResourceObject creates a empty subset object. + // NewResourceObject creates an empty subset object. NewResourceObject() client.Object - // NewResourceListObject creates a empty subset list object. + // NewResourceListObject creates an empty subset list object. NewResourceListObject() client.ObjectList // GetStatusObservedGeneration returns the observed generation of the subset. GetStatusObservedGeneration(subset metav1.Object) int64 - // GetReplicaDetails returns the replicas information of the subset status. - GetReplicaDetails(subset metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) + // GetSubsetPods returns all pods of the subset workload. + GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) + // GetSpecReplicas returns the replicas information of the subset workload. + GetSpecReplicas(obj metav1.Object) *int32 + // GetSpecPartition returns the partition information of the subset workload if possible. + GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 + // GetStatusReplicas returns the replicas from the subset workload status. + GetStatusReplicas(obj metav1.Object) int32 + // GetStatusReadyReplicas returns the ready replicas information from the subset workload status. + GetStatusReadyReplicas(obj metav1.Object) int32 // GetSubsetFailure returns failure information of the subset. GetSubsetFailure() *string // ApplySubsetTemplate updates the subset to the latest revision. diff --git a/pkg/controller/uniteddeployment/adapter/adapter_util.go b/pkg/controller/uniteddeployment/adapter/adapter_util.go index 3f761a02a1..30b4faefd2 100644 --- a/pkg/controller/uniteddeployment/adapter/adapter_util.go +++ b/pkg/controller/uniteddeployment/adapter/adapter_util.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/validation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) @@ -97,3 +98,19 @@ func getCurrentPartition(pods []*corev1.Pod, revision string) *int32 { return &partition } + +func CalculateUpdatedReplicas(podList []*corev1.Pod, updatedRevision string) (updatedReplicas, updatedReadyReplicas int32) { + for _, pod := range podList { + revision := getRevision(&pod.ObjectMeta) + + // Only count pods that are updated and are not terminating + if revision == updatedRevision && pod.GetDeletionTimestamp() == nil { + updatedReplicas++ + if podutil.IsPodReady(pod) { + updatedReadyReplicas++ + } + } + } + + return +} diff --git a/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go b/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go index 2242d2c751..6cc42f4cfb 100644 --- a/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go @@ -62,29 +62,32 @@ func (a *AdvancedStatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Obje return obj.(*v1beta1.StatefulSet).Status.ObservedGeneration } -// GetReplicaDetails returns the replicas detail the subset needs. -func (a *AdvancedStatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { - set := obj.(*v1beta1.StatefulSet) - var pods []*corev1.Pod - pods, err = a.getStatefulSetPods(set) - if err != nil { - return - } +func (a *AdvancedStatefulSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) { + return a.getStatefulSetPods(obj.(*v1beta1.StatefulSet)) +} + +func (a *AdvancedStatefulSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 { + return obj.(*v1beta1.StatefulSet).Spec.Replicas +} - specReplicas = set.Spec.Replicas +func (a *AdvancedStatefulSetAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 { + set := obj.(*v1beta1.StatefulSet) if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType { revision := getRevision(&set.ObjectMeta) - specPartition = getCurrentPartition(pods, revision) + return getCurrentPartition(pods, revision) } else if set.Spec.UpdateStrategy.RollingUpdate != nil && set.Spec.UpdateStrategy.RollingUpdate.Partition != nil { - specPartition = set.Spec.UpdateStrategy.RollingUpdate.Partition + return set.Spec.UpdateStrategy.RollingUpdate.Partition } + return nil +} - statusReplicas = set.Status.Replicas - statusReadyReplicas = set.Status.ReadyReplicas - statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision) +func (a *AdvancedStatefulSetAdapter) GetStatusReplicas(obj metav1.Object) int32 { + return obj.(*v1beta1.StatefulSet).Status.Replicas +} - return +func (a *AdvancedStatefulSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 { + return obj.(*v1beta1.StatefulSet).Status.ReadyReplicas } // GetSubsetFailure returns the failure information of the subset. diff --git a/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go b/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go index bc80150858..7b25f4bd21 100644 --- a/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go @@ -41,30 +41,29 @@ func (a *CloneSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 { return obj.(*alpha1.CloneSet).Status.ObservedGeneration } -func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { - - set := obj.(*alpha1.CloneSet) - - var pods []*corev1.Pod - - pods, err = a.getCloneSetPods(set) - - if err != nil { - return - } +func (a *CloneSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) { + return a.getCloneSetPods(obj.(*alpha1.CloneSet)) +} - specReplicas = set.Spec.Replicas +func (a *CloneSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 { + return obj.(*alpha1.CloneSet).Spec.Replicas +} +func (a *CloneSetAdapter) GetSpecPartition(obj metav1.Object, _ []*corev1.Pod) *int32 { + set := obj.(*alpha1.CloneSet) if set.Spec.UpdateStrategy.Partition != nil { - partition, _ := intstr.GetValueFromIntOrPercent(set.Spec.UpdateStrategy.Partition, int(*set.Spec.Replicas), true) - specPartition = utilpointer.Int32Ptr(int32(partition)) + partition, _ := intstr.GetScaledValueFromIntOrPercent(set.Spec.UpdateStrategy.Partition, int(*set.Spec.Replicas), true) + return utilpointer.Int32Ptr(int32(partition)) } + return nil +} - statusReplicas = set.Status.Replicas - statusReadyReplicas = set.Status.ReadyReplicas - statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision) +func (a *CloneSetAdapter) GetStatusReplicas(obj metav1.Object) int32 { + return obj.(*alpha1.CloneSet).Status.Replicas +} - return +func (a *CloneSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 { + return obj.(*alpha1.CloneSet).Status.ReadyReplicas } func (a *CloneSetAdapter) GetSubsetFailure() *string { diff --git a/pkg/controller/uniteddeployment/adapter/deployment_adapter.go b/pkg/controller/uniteddeployment/adapter/deployment_adapter.go index d0978952f9..e1fe7024dc 100644 --- a/pkg/controller/uniteddeployment/adapter/deployment_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/deployment_adapter.go @@ -57,25 +57,28 @@ func (a *DeploymentAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 return obj.(*appsv1.Deployment).Status.ObservedGeneration } -// GetReplicaDetails returns the replicas detail the subset needs. -func (a *DeploymentAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { - // Convert to Deployment Object +func (a *DeploymentAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) { set := obj.(*appsv1.Deployment) + return a.getDeploymentPods(set) +} - // Get all pods belonging to deployment - var pods []*corev1.Pod - pods, err = a.getDeploymentPods(set) - if err != nil { - return - } +func (a *DeploymentAdapter) GetSpecReplicas(obj metav1.Object) *int32 { + set := obj.(*appsv1.Deployment) + return set.Spec.Replicas +} - // Set according replica counts - specReplicas = set.Spec.Replicas - statusReplicas = set.Status.Replicas - statusReadyReplicas = set.Status.ReadyReplicas - statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision) +func (a *DeploymentAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 { + return nil +} + +func (a *DeploymentAdapter) GetStatusReplicas(obj metav1.Object) int32 { + set := obj.(*appsv1.Deployment) + return set.Status.Replicas +} - return +func (a *DeploymentAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 { + set := obj.(*appsv1.Deployment) + return set.Status.ReadyReplicas } // GetSubsetFailure returns the failure information of the subset. diff --git a/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go b/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go index a31e2ad0b0..1952af5bb5 100644 --- a/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go @@ -58,29 +58,32 @@ func (a *StatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int6 return obj.(*appsv1.StatefulSet).Status.ObservedGeneration } -// GetReplicaDetails returns the replicas detail the subset needs. -func (a *StatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { - set := obj.(*appsv1.StatefulSet) - var pods []*corev1.Pod - pods, err = a.getStatefulSetPods(set) - if err != nil { - return - } +func (a *StatefulSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) { + return a.getStatefulSetPods(obj.(*appsv1.StatefulSet)) +} - specReplicas = set.Spec.Replicas +func (a *StatefulSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 { + return obj.(*appsv1.StatefulSet).Spec.Replicas +} + +func (a *StatefulSetAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 { + set := obj.(*appsv1.StatefulSet) if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType { revision := getRevision(&set.ObjectMeta) - specPartition = getCurrentPartition(pods, revision) + return getCurrentPartition(pods, revision) } else if set.Spec.UpdateStrategy.RollingUpdate != nil && set.Spec.UpdateStrategy.RollingUpdate.Partition != nil { - specPartition = set.Spec.UpdateStrategy.RollingUpdate.Partition + return set.Spec.UpdateStrategy.RollingUpdate.Partition } + return nil +} - statusReplicas = set.Status.Replicas - statusReadyReplicas = set.Status.ReadyReplicas - statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision) +func (a *StatefulSetAdapter) GetStatusReplicas(obj metav1.Object) int32 { + return obj.(*appsv1.StatefulSet).Status.Replicas +} - return +func (a *StatefulSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 { + return obj.(*appsv1.StatefulSet).Status.ReadyReplicas } // GetSubsetFailure returns the failure information of the subset. @@ -232,22 +235,6 @@ func (a *StatefulSetAdapter) getStatefulSetPods(set *appsv1.StatefulSet) ([]*cor return claimedPods, nil } -func calculateUpdatedReplicas(podList []*corev1.Pod, updatedRevision string) (updatedReplicas, updatedReadyReplicas int32) { - for _, pod := range podList { - revision := getRevision(&pod.ObjectMeta) - - // Only count pods that are updated and are not terminating - if revision == updatedRevision && pod.GetDeletionTimestamp() == nil { - updatedReplicas++ - if podutil.IsPodReady(pod) { - updatedReadyReplicas++ - } - } - } - - return -} - // deleteStuckPods tries to work around the blocking issue https://github.com/kubernetes/kubernetes/issues/67250 func (a *StatefulSetAdapter) deleteStuckPods(set *appsv1.StatefulSet, revision string, partition int32) error { pods, err := a.getStatefulSetPods(set) diff --git a/pkg/controller/uniteddeployment/allocator.go b/pkg/controller/uniteddeployment/allocator.go index cf5a71ac47..8c0192dfa3 100644 --- a/pkg/controller/uniteddeployment/allocator.go +++ b/pkg/controller/uniteddeployment/allocator.go @@ -18,6 +18,7 @@ package uniteddeployment import ( "fmt" + "math" "sort" "strings" @@ -68,6 +69,30 @@ func NewReplicaAllocator(ud *appsv1alpha1.UnitedDeployment) ReplicaAllocator { return &specificAllocator{UnitedDeployment: ud} } +// NotPendingReplicas refers to the number of Pods that an unschedulable subset can safely accommodate. +// Exceeding this number may lead to scheduling failures within that subset. +// This value is only effective in the Adaptive scheduling strategy. +func getNotPendingReplicasMap(nameToSubset *map[string]*Subset) map[string]int32 { + if nameToSubset == nil { + return nil + } + var result = make(map[string]int32) + for name, subset := range *nameToSubset { + result[name] = subset.Status.Replicas - subset.Status.UnschedulableStatus.PendingPods + } + return result +} + +func isSubSetUnschedulable(name string, nameToSubset *map[string]*Subset) (unschedulable bool) { + if subsetObj, ok := (*nameToSubset)[name]; ok { + unschedulable = subsetObj.Status.UnschedulableStatus.Unschedulable + } else { + // newly created subsets are all schedulable + unschedulable = false + } + return +} + type specificAllocator struct { *appsv1alpha1.UnitedDeployment subsets *subsetInfos @@ -250,43 +275,58 @@ type elasticAllocator struct { // maxReplicas: nil # will be satisfied with 4th priority // // the results of map will be: {"subset-a": 3, "subset-b": 2} -func (ac *elasticAllocator) Alloc(_ *map[string]*Subset) (*map[string]int32, error) { +func (ac *elasticAllocator) Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error) { replicas := int32(1) if ac.Spec.Replicas != nil { replicas = *ac.Spec.Replicas } - minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas) + minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas, nameToSubset) if err != nil { return nil, err } return ac.alloc(replicas, minReplicasMap, maxReplicasMap), nil } -func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32) (map[string]int32, map[string]int32, error) { - totalMin, totalMax := int64(0), int64(0) +func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameToSubset *map[string]*Subset) (map[string]int32, map[string]int32, error) { numSubset := len(ac.Spec.Topology.Subsets) minReplicasMap := make(map[string]int32, numSubset) maxReplicasMap := make(map[string]int32, numSubset) + notPendingReplicasMap := getNotPendingReplicasMap(nameToSubset) for index, subset := range ac.Spec.Topology.Subsets { minReplicas := int32(0) + maxReplicas := int32(math.MaxInt32) if subset.MinReplicas != nil { minReplicas, _ = ParseSubsetReplicas(replicas, *subset.MinReplicas) } - totalMin += int64(minReplicas) - minReplicasMap[subset.Name] = minReplicas - - maxReplicas := int32(1000000) if subset.MaxReplicas != nil { maxReplicas, _ = ParseSubsetReplicas(replicas, *subset.MaxReplicas) } - totalMax += int64(maxReplicas) + if ac.Spec.Topology.ScheduleStrategy.IsAdaptive() { + unschedulable := isSubSetUnschedulable(subset.Name, nameToSubset) + // This means that in the Adaptive scheduling strategy, an unschedulable subset can only be scaled down, not scaled up. + if notPendingReplicas, ok := notPendingReplicasMap[subset.Name]; unschedulable && ok { + klog.InfoS("Assign min(notPendingReplicas, minReplicas/maxReplicas) for unschedulable subset", + "subset", subset.Name) + minReplicas = integer.Int32Min(notPendingReplicas, minReplicas) + maxReplicas = integer.Int32Min(notPendingReplicas, maxReplicas) + } + // To prevent healthy pod from being deleted + if notPendingReplicas := notPendingReplicasMap[subset.Name]; !unschedulable && notPendingReplicas > minReplicas { + klog.InfoS("Assign min(notPendingReplicas, maxReplicas) to minReplicas to avoid deleting running pods", + "subset", subset.Name, "minReplicas", minReplicas, "notPendingReplicas", notPendingReplicas, "maxReplicas", maxReplicas) + minReplicas = integer.Int32Min(notPendingReplicas, maxReplicas) + } + } + + minReplicasMap[subset.Name] = minReplicas maxReplicasMap[subset.Name] = maxReplicas if minReplicas > maxReplicas { return nil, nil, fmt.Errorf("subset[%d].maxReplicas must be more than or equal to minReplicas", index) } } + klog.InfoS("elastic allocate maps calculated", "minReplicasMap", minReplicasMap, "maxReplicasMap", maxReplicasMap) return minReplicasMap, maxReplicasMap, nil } diff --git a/pkg/controller/uniteddeployment/allocator_test.go b/pkg/controller/uniteddeployment/allocator_test.go index cc50ab411b..1b933b7fc8 100644 --- a/pkg/controller/uniteddeployment/allocator_test.go +++ b/pkg/controller/uniteddeployment/allocator_test.go @@ -19,6 +19,7 @@ package uniteddeployment import ( "fmt" "sort" + "strconv" "testing" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -307,6 +308,92 @@ func TestCapacityAllocator(t *testing.T) { } } +func TestAdaptiveElasticAllocator(t *testing.T) { + getUnitedDeploymentAndSubsets := func(totalReplicas, minReplicas, maxReplicas, failedPods int32) ( + *appsv1alpha1.UnitedDeployment, map[string]*Subset) { + minR, maxR := intstr.FromInt32(minReplicas), intstr.FromInt32(maxReplicas) + return &appsv1alpha1.UnitedDeployment{ + Spec: appsv1alpha1.UnitedDeploymentSpec{ + Replicas: &totalReplicas, + Topology: appsv1alpha1.Topology{ + Subsets: []appsv1alpha1.Subset{ + { + Name: "subset-1", + MinReplicas: &minR, + MaxReplicas: &maxR, + }, + { + Name: "subset-2", + }, + }, + ScheduleStrategy: appsv1alpha1.UnitedDeploymentScheduleStrategy{ + Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType, + }, + }, + }, + }, map[string]*Subset{ + "subset-1": { + Status: SubsetStatus{ + UnschedulableStatus: SubsetUnschedulableStatus{ + Unschedulable: true, + PendingPods: failedPods, + }, + Replicas: maxReplicas, + }, + Spec: SubsetSpec{Replicas: minReplicas}, + }, + "subset-2": { + Status: SubsetStatus{}, + }, + } + } + cases := []struct { + totalReplicas, minReplicas, maxReplicas, pendingPods int32 + subset1Replicas, subset2Replicas int32 + }{ + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 5, + subset1Replicas: 0, subset2Replicas: 10, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 4, + subset1Replicas: 0, subset2Replicas: 10, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 3, + subset1Replicas: 1, subset2Replicas: 9, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 2, + subset1Replicas: 2, subset2Replicas: 8, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 1, + subset1Replicas: 3, subset2Replicas: 7, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 0, + subset1Replicas: 4, subset2Replicas: 6, + }, + } + for i, testCase := range cases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + ud, subsets := getUnitedDeploymentAndSubsets( + testCase.totalReplicas, testCase.minReplicas, testCase.maxReplicas, testCase.pendingPods) + alloc, err := NewReplicaAllocator(ud).Alloc(&subsets) + if err != nil { + t.Fatalf("unexpected alloc error %v", err) + } else { + subset1Replicas, subset2Replicas := (*alloc)["subset-1"], (*alloc)["subset-2"] + if subset1Replicas != testCase.subset1Replicas || subset2Replicas != testCase.subset2Replicas { + t.Fatalf("subset1Replicas = %d, subset1Replicas = %d, test case is %+v", + subset1Replicas, subset2Replicas, testCase) + } + } + }) + } +} + func createSubset(name string, replicas int32) *nameToReplicas { return &nameToReplicas{ Replicas: replicas, diff --git a/pkg/controller/uniteddeployment/revision_test.go b/pkg/controller/uniteddeployment/revision_test.go index 894ce7910c..101b9683fe 100644 --- a/pkg/controller/uniteddeployment/revision_test.go +++ b/pkg/controller/uniteddeployment/revision_test.go @@ -41,8 +41,9 @@ func TestRevisionManage(t *testing.T) { instance := &appsv1alpha1.UnitedDeployment{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "default", + Name: "foo", + Namespace: "default", + Finalizers: []string{UnitedDeploymentFinalizer}, }, Spec: appsv1alpha1.UnitedDeploymentSpec{ Replicas: &one, diff --git a/pkg/controller/uniteddeployment/subset.go b/pkg/controller/uniteddeployment/subset.go index 79b8f006c6..673c40a314 100644 --- a/pkg/controller/uniteddeployment/subset.go +++ b/pkg/controller/uniteddeployment/subset.go @@ -17,9 +17,9 @@ limitations under the License. package uniteddeployment import ( - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // Subset stores the details of a subset resource owned by one UnitedDeployment. @@ -36,6 +36,7 @@ type SubsetSpec struct { Replicas int32 UpdateStrategy SubsetUpdateStrategy SubsetRef ResourceRef + SubsetPods []*corev1.Pod } // SubsetStatus stores the observed state of the Subset. @@ -45,6 +46,12 @@ type SubsetStatus struct { ReadyReplicas int32 UpdatedReplicas int32 UpdatedReadyReplicas int32 + UnschedulableStatus SubsetUnschedulableStatus +} + +type SubsetUnschedulableStatus struct { + Unschedulable bool + PendingPods int32 } // SubsetUpdateStrategy stores the strategy detail of the Subset. @@ -72,7 +79,7 @@ type ControlInterface interface { CreateSubset(ud *appsv1alpha1.UnitedDeployment, unit string, revision string, replicas, partition int32) error // UpdateSubset updates the target subset with the input information. UpdateSubset(subSet *Subset, ud *appsv1alpha1.UnitedDeployment, revision string, replicas, partition int32) error - // UpdateSubset is used to delete the input subset. + // DeleteSubset is used to delete the input subset. DeleteSubset(*Subset) error // GetSubsetFailure extracts the subset failure message to expose on UnitedDeployment status. GetSubsetFailure(*Subset) *string diff --git a/pkg/controller/uniteddeployment/subset_control.go b/pkg/controller/uniteddeployment/subset_control.go index 9073125950..c8945359ac 100644 --- a/pkg/controller/uniteddeployment/subset_control.go +++ b/pkg/controller/uniteddeployment/subset_control.go @@ -39,7 +39,7 @@ type SubsetControl struct { adapter adapter.Adapter } -// GetAllSubsets returns all of subsets owned by the UnitedDeployment. +// GetAllSubsets returns all subsets owned by the UnitedDeployment. func (m *SubsetControl) GetAllSubsets(ud *alpha1.UnitedDeployment, updatedRevision string) (subSets []*Subset, err error) { selector, err := metav1.LabelSelectorAsSelector(ud.Spec.Selector) if err != nil { @@ -132,11 +132,6 @@ func (m *SubsetControl) IsExpected(subSet *Subset, revision string) bool { } func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision string) (*Subset, error) { - subSetName, err := getSubsetNameFrom(set) - if err != nil { - return nil, err - } - subset := &Subset{} subset.ObjectMeta = metav1.ObjectMeta{ Name: set.GetName(), @@ -154,28 +149,32 @@ func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision strin OwnerReferences: set.GetOwnerReferences(), Finalizers: set.GetFinalizers(), } - subset.Spec.SubsetName = subSetName - specReplicas, specPartition, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas, err := m.adapter.GetReplicaDetails(set, updatedRevision) + pods, err := m.adapter.GetSubsetPods(set) + if err != nil { + return nil, err + } + subset.Spec.SubsetPods = pods + + subSetName, err := getSubsetNameFrom(set) if err != nil { - return subset, err + return nil, err } + subset.Spec.SubsetName = subSetName - if specReplicas != nil { + if specReplicas := m.adapter.GetSpecReplicas(set); specReplicas != nil { subset.Spec.Replicas = *specReplicas } - if specPartition != nil { + if specPartition := m.adapter.GetSpecPartition(set, pods); specPartition != nil { subset.Spec.UpdateStrategy.Partition = *specPartition } + subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set) subset.Status.ObservedGeneration = m.adapter.GetStatusObservedGeneration(set) - subset.Status.Replicas = statusReplicas - subset.Status.ReadyReplicas = statusReadyReplicas - subset.Status.UpdatedReplicas = statusUpdatedReplicas - subset.Status.UpdatedReadyReplicas = statusUpdatedReadyReplicas - - subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set) + subset.Status.Replicas = m.adapter.GetStatusReplicas(set) + subset.Status.ReadyReplicas = m.adapter.GetStatusReadyReplicas(set) + subset.Status.UpdatedReplicas, subset.Status.UpdatedReadyReplicas = adapter.CalculateUpdatedReplicas(pods, updatedRevision) return subset, nil } diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller.go b/pkg/controller/uniteddeployment/uniteddeployment_controller.go index a14f5269cd..50f4abbc2c 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller.go @@ -21,6 +21,7 @@ import ( "flag" "fmt" "reflect" + "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -30,6 +31,7 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -38,10 +40,12 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" "github.com/openkruise/kruise/pkg/controller/uniteddeployment/adapter" + utilcontroller "github.com/openkruise/kruise/pkg/controller/util" "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" "github.com/openkruise/kruise/pkg/util/ratelimiter" + "github.com/openkruise/kruise/pkg/util/requeueduration" ) func init() { @@ -51,16 +55,17 @@ func init() { var ( concurrentReconciles = 3 controllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("UnitedDeployment") + durationStore = requeueduration.DurationStore{} ) const ( controllerName = "uniteddeployment-controller" - eventTypeRevisionProvision = "RevisionProvision" - eventTypeFindSubsets = "FindSubsets" - eventTypeDupSubsetsDelete = "DeleteDuplicatedSubsets" - eventTypeSubsetsUpdate = "UpdateSubset" - eventTypeSpecifySubbsetReplicas = "SpecifySubsetReplicas" + eventTypeRevisionProvision = "RevisionProvision" + eventTypeFindSubsets = "FindSubsets" + eventTypeDupSubsetsDelete = "DeleteDuplicatedSubsets" + eventTypeSubsetsUpdate = "UpdateSubset" + eventTypeSpecifySubsetReplicas = "SpecifySubsetReplicas" slowStartInitialBatchSize = 1 ) @@ -145,6 +150,8 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { var _ reconcile.Reconciler = &ReconcileUnitedDeployment{} +const UnitedDeploymentFinalizer = "apps.kruise.io/uniteddeployment-cleanup" + // ReconcileUnitedDeployment reconciles a UnitedDeployment object type ReconcileUnitedDeployment struct { client.Client @@ -179,10 +186,36 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci } if instance.DeletionTimestamp != nil { - return reconcile.Result{}, nil + if controllerutil.RemoveFinalizer(instance, UnitedDeploymentFinalizer) { + // to avoid memory leak + klog.InfoS("cleaning up UnitedDeployment", "unitedDeployment", request) + ResourceVersionExpectation.Delete(instance) + if err = r.updateUnitedDeploymentInstance(instance); err != nil { + klog.ErrorS(err, "Failed to remove UnitedDeploymentFinalizer", "unitedDeployment", request) + } + } + return reconcile.Result{}, err + } + + if controllerutil.AddFinalizer(instance, UnitedDeploymentFinalizer) { + klog.InfoS("adding UnitedDeploymentFinalizer") + if err = r.updateUnitedDeploymentInstance(instance); err != nil { + klog.ErrorS(err, "Failed to add UnitedDeploymentFinalizer", "unitedDeployment", request) + } + return reconcile.Result{}, err + } + + // make sure latest version is observed + ResourceVersionExpectation.Observe(instance) + if satisfied, _ := ResourceVersionExpectation.IsSatisfied(instance); !satisfied { + klog.InfoS("resource version not up-to-date, requeue in 1s", "resourceVersion", instance.GetResourceVersion()) + return reconcile.Result{RequeueAfter: time.Second}, nil } - oldStatus := instance.Status.DeepCopy() + oldStatus := instance.Status.DeepCopy() + for _, subset := range instance.Spec.Topology.Subsets { + instance.Status.GetSubsetStatus(subset.Name) // ensure subset status exists + } currentRevision, updatedRevision, _, collisionCount, err := r.constructUnitedDeploymentRevisions(instance) if err != nil { klog.ErrorS(err, "Failed to construct controller revision of UnitedDeployment", "unitedDeployment", klog.KObj(instance)) @@ -210,7 +243,7 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci if err != nil { klog.ErrorS(err, "UnitedDeployment specified subset replicas is ineffective", "unitedDeployment", klog.KObj(instance)) r.recorder.Eventf(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed %s", - eventTypeSpecifySubbsetReplicas), "Specified subset replicas is ineffective: %s", err.Error()) + eventTypeSpecifySubsetReplicas), "Specified subset replicas is ineffective: %s", err.Error()) return reconcile.Result{}, err } @@ -218,8 +251,10 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci nextUpdate := getNextUpdate(instance, nextReplicas, nextPartitions) klog.V(4).InfoS("Got UnitedDeployment next update", "unitedDeployment", klog.KObj(instance), "nextUpdate", nextUpdate) + ResourceVersionExpectation.Expect(instance) newStatus, err := r.manageSubsets(instance, nameToSubset, nextUpdate, currentRevision, updatedRevision, subsetType) if err != nil { + ResourceVersionExpectation.Delete(instance) klog.ErrorS(err, "Failed to update UnitedDeployment", "unitedDeployment", klog.KObj(instance)) r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeSubsetsUpdate), err.Error()) return reconcile.Result{}, err @@ -233,10 +268,27 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci } newStatus.LabelSelector = selector.String() - return r.updateStatus(instance, newStatus, oldStatus, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, collisionCount, control) + requeueAfter := durationStore.Pop(getUnitedDeploymentKey(instance)) + if requeueAfter > 0 { + klog.InfoS("Requeue needed", "afterSeconds", requeueAfter.Seconds()) + } + return reconcile.Result{RequeueAfter: requeueAfter}, + r.updateStatus(instance, newStatus, oldStatus, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, collisionCount, control) } -func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.UnitedDeployment, control ControlInterface, expectedRevision string) (*map[string]*Subset, error) { +func (r *ReconcileUnitedDeployment) updateUnitedDeploymentInstance(instance *appsv1alpha1.UnitedDeployment) error { + var err error + for i := 0; i < updateRetries; i++ { + if err = r.Update(context.Background(), instance); err == nil { + return nil + } + } + return err +} + +// getNameToSubset fetches all subset workloads in cluster managed by this UnitedDeployment +// if adaptive scheduling strategy is used, existing subset unscheduable status will be set true here (newly created subsets are default false) +func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.UnitedDeployment, control ControlInterface, expectedRevision string) (name2Subset *map[string]*Subset, err error) { subSets, err := control.GetAllSubsets(instance, expectedRevision) if err != nil { r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeFindSubsets), err.Error()) @@ -246,15 +298,67 @@ func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.Unite klog.V(4).InfoS("Classify UnitedDeployment by subSet name", "unitedDeployment", klog.KObj(instance)) nameToSubsets := r.classifySubsetBySubsetName(instance, subSets) - nameToSubset, err := r.deleteDupSubset(instance, nameToSubsets, control) + nameToSubset, err := r.deleteDupSubset(nameToSubsets, control) if err != nil { r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeDupSubsetsDelete), err.Error()) return nil, fmt.Errorf("fail to manage duplicate Subset of UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err) } + // If the Fixing scheduling strategy is used, the unschedulable state for all subsets remains false and + // the UnschedulableStatus of Subsets are not managed. + if instance.Spec.Topology.ScheduleStrategy.IsAdaptive() { + for name, subset := range *nameToSubset { + manageUnschedulableStatusForExistingSubset(name, subset, instance) + } + } + return nameToSubset, nil } +// manageUnschedulableStatusForExistingSubset manages subset unscheduable status and store them in the Subset.Status.UnschedulableStatus field. +func manageUnschedulableStatusForExistingSubset(name string, subset *Subset, ud *appsv1alpha1.UnitedDeployment) { + now := time.Now() + unitedDeploymentKey := getUnitedDeploymentKey(ud) + status := ud.Status.GetSubsetStatus(name) + condition := status.GetCondition(appsv1alpha1.UnitedDeploymentSubsetSchedulable) + // process with existing condition + if condition != nil && condition.Status == corev1.ConditionFalse { + // The unschedulable state of a subset lasts for at least 5 minutes. + recoverTime := condition.LastTransitionTime.Add(ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration()) + klog.InfoS("existing unschedulable subset found", "subset", name, "recoverTime", recoverTime) + if now.Before(recoverTime) { + klog.InfoS("subset is still unschedulable", "subset", name) + durationStore.Push(unitedDeploymentKey, recoverTime.Sub(now)) + subset.Status.UnschedulableStatus.Unschedulable = true + } else { + klog.InfoS("unschedulable subset recovered", "subset", name) + status.SetCondition(appsv1alpha1.UnitedDeploymentSubsetSchedulable, corev1.ConditionTrue, "recover", + fmt.Sprintf("unschedulable subset recovered after %f seconds", ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration().Seconds())) + } + } + // Maybe there exist some pending pods because the subset is unschedulable. + if subset.Status.ReadyReplicas < subset.Status.Replicas { + for _, pod := range subset.Spec.SubsetPods { + timeouted, checkAfter := utilcontroller.GetTimeBeforePendingTimeout(pod, ud.Spec.Topology.ScheduleStrategy.GetRescheduleCriticalDuration()) + if timeouted { + subset.Status.UnschedulableStatus.PendingPods++ + } + if checkAfter > 0 { + durationStore.Push(unitedDeploymentKey, checkAfter) + } + } + if subset.Status.UnschedulableStatus.PendingPods > 0 { + klog.InfoS("subset has pending pods", + "subset", subset.Name, "pendingPods", subset.Status.UnschedulableStatus.PendingPods) + subset.Status.UnschedulableStatus.Unschedulable = true + status.SetCondition(appsv1alpha1.UnitedDeploymentSubsetSchedulable, corev1.ConditionFalse, "reschedule", + "timeout pending pods found") + durationStore.Push(unitedDeploymentKey, ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration()) + } + } + klog.InfoS("subset status", "status", status) +} + func calcNextPartitions(ud *appsv1alpha1.UnitedDeployment, nextReplicas *map[string]int32) *map[string]int32 { partitions := map[string]int32{} for _, subset := range ud.Spec.Topology.Subsets { @@ -288,7 +392,7 @@ func getNextUpdate(ud *appsv1alpha1.UnitedDeployment, nextReplicas *map[string]i return next } -func (r *ReconcileUnitedDeployment) deleteDupSubset(ud *appsv1alpha1.UnitedDeployment, nameToSubsets map[string][]*Subset, control ControlInterface) (*map[string]*Subset, error) { +func (r *ReconcileUnitedDeployment) deleteDupSubset(nameToSubsets map[string][]*Subset, control ControlInterface) (*map[string]*Subset, error) { nameToSubset := map[string]*Subset{} for name, subsets := range nameToSubsets { if len(subsets) > 1 { @@ -348,10 +452,10 @@ func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(ud *appsv1alpha1. return mapping } -func (r *ReconcileUnitedDeployment) updateStatus(instance *appsv1alpha1.UnitedDeployment, newStatus, oldStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) (reconcile.Result, error) { +func (r *ReconcileUnitedDeployment) updateStatus(instance *appsv1alpha1.UnitedDeployment, newStatus, oldStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) error { newStatus = r.calculateStatus(newStatus, nameToSubset, nextReplicas, nextPartition, currentRevision, updatedRevision, collisionCount, control) _, err := r.updateUnitedDeployment(instance, oldStatus, newStatus) - return reconcile.Result{}, err + return err } func (r *ReconcileUnitedDeployment) calculateStatus(newStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) *appsv1alpha1.UnitedDeploymentStatus { @@ -431,7 +535,8 @@ func (r *ReconcileUnitedDeployment) updateUnitedDeployment(ud *appsv1alpha1.Unit ud.Generation == newStatus.ObservedGeneration && reflect.DeepEqual(oldStatus.SubsetReplicas, newStatus.SubsetReplicas) && reflect.DeepEqual(oldStatus.UpdateStatus, newStatus.UpdateStatus) && - reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) { + reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) && + reflect.DeepEqual(oldStatus.SubsetStatuses, newStatus.SubsetStatuses) { return ud, nil } @@ -439,13 +544,14 @@ func (r *ReconcileUnitedDeployment) updateUnitedDeployment(ud *appsv1alpha1.Unit var getErr, updateErr error for i, obj := 0, ud; ; i++ { - klog.V(4).InfoS("Updating status", + klog.V(4).InfoS("updating UnitedDeployment status", "updateCount", i, "unitedDeployment", klog.KObj(obj), "replicasSpec", obj.Spec.Replicas, "oldReplicas", obj.Status.Replicas, "newReplicas", newStatus.Replicas, "readyReplicasSpec", obj.Spec.Replicas, "oldReadyReplicas", obj.Status.ReadyReplicas, "newReadyReplicas", newStatus.ReadyReplicas, "oldUpdatedReplicas", obj.Status.UpdatedReplicas, "newUpdatedReplicas", newStatus.UpdatedReplicas, "oldUpdatedReadyReplicas", obj.Status.UpdatedReadyReplicas, "newUpdatedReadyReplicas", newStatus.UpdatedReadyReplicas, "oldObservedGeneration", obj.Status.ObservedGeneration, "newObservedGeneration", newStatus.ObservedGeneration, + "SubsetStatuses", obj.Status.SubsetStatuses, "newSubsetStatuses", newStatus.SubsetStatuses, ) obj.Status = *newStatus diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go b/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go index 6d52c50d65..e664f3ba58 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go @@ -18,6 +18,7 @@ package uniteddeployment import ( "testing" + "time" "github.com/onsi/gomega" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -95,7 +96,7 @@ func TestReconcile(t *testing.T) { }, } - // Setup the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a + // Set up the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a // channel when it is finished. mgr, err := manager.New(cfg, manager.Options{}) g.Expect(err).NotTo(gomega.HaveOccurred()) @@ -124,3 +125,95 @@ func TestReconcile(t *testing.T) { defer c.Delete(context.TODO(), instance) g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest))) } + +func TestUnschedulableStatusManagement(t *testing.T) { + g := gomega.NewGomegaWithT(t) + ud := &appsv1alpha1.UnitedDeployment{ + Spec: appsv1alpha1.UnitedDeploymentSpec{ + Topology: appsv1alpha1.Topology{ + ScheduleStrategy: appsv1alpha1.UnitedDeploymentScheduleStrategy{ + Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType, + }, + }, + }, + } + pod := corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodScheduled, + Status: corev1.ConditionFalse, + Reason: corev1.PodReasonUnschedulable, + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.NewTime(time.Now().Add(-15 * time.Second)), + }, + } + subset := &Subset{ + Status: SubsetStatus{ + ReadyReplicas: 0, + Replicas: 1, + }, + Spec: SubsetSpec{ + SubsetPods: []*corev1.Pod{&pod}, + }, + } + + // CASE1: Not timeouted yet + manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud) + g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(0)) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.Satisfy(func(after time.Duration) bool { + return after < 15*time.Second && after > 14*time.Second + })) + + //// CASE2: Timeouted + pod.CreationTimestamp = metav1.NewTime(time.Now().Add(-31 * time.Second)) + manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud) + g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(1)) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.BeEquivalentTo(appsv1alpha1.DefaultUnschedulableStatusLastDuration)) + + // CASE3: Unschedulable status + ud.Status.SubsetStatuses = []appsv1alpha1.UnitedDeploymentSubsetStatus{ + { + Name: subset.Name, + Conditions: []appsv1alpha1.UnitedDeploymentSubsetCondition{ + { + Type: appsv1alpha1.UnitedDeploymentSubsetSchedulable, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: time.Now().Add(-time.Minute)}, + }, + }, + }, + } + + subset.Status.ReadyReplicas = 1 + subset.Status.UnschedulableStatus.PendingPods = 0 + manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud) + g.Expect(g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(0))) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.Satisfy(func(after time.Duration) bool { + return after < appsv1alpha1.DefaultUnschedulableStatusLastDuration-time.Minute && + after > 59*time.Second+appsv1alpha1.DefaultUnschedulableStatusLastDuration-2*time.Minute + })) + + // CASE4: Status Reset + ud.Status.SubsetStatuses = []appsv1alpha1.UnitedDeploymentSubsetStatus{ + { + Name: subset.Name, + Conditions: []appsv1alpha1.UnitedDeploymentSubsetCondition{ + { + Type: appsv1alpha1.UnitedDeploymentSubsetSchedulable, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: time.Now().Add(-time.Minute - appsv1alpha1.DefaultUnschedulableStatusLastDuration)}, + }, + }, + }, + } + subset.Status.UnschedulableStatus.Unschedulable = false // reset to zero-value + manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud) + g.Expect(g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(0))) + g.Expect(subset.Status.UnschedulableStatus.Unschedulable).To(gomega.BeFalse()) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.BeEquivalentTo(0)) +} diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go b/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go index e26dea22fc..cd13d9b08d 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go @@ -23,11 +23,11 @@ import ( "strconv" "strings" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util/expectations" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) const updateRetries = 5 @@ -132,3 +132,9 @@ func filterOutCondition(conditions []appsv1alpha1.UnitedDeploymentCondition, con } return newConditions } + +func getUnitedDeploymentKey(ud *appsv1alpha1.UnitedDeployment) string { + return ud.GetNamespace() + "/" + ud.GetName() +} + +var ResourceVersionExpectation = expectations.NewReallyNewerResourceVersionExpectation() diff --git a/pkg/controller/uniteddeployment/uniteddeployment_update.go b/pkg/controller/uniteddeployment/uniteddeployment_update.go index 024945ce98..c3c8f41a07 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_update.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_update.go @@ -32,6 +32,7 @@ import ( func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeployment, nameToSubset *map[string]*Subset, nextUpdate map[string]SubsetUpdate, currentRevision, updatedRevision *appsv1.ControllerRevision, subsetType subSetType) (newStatus *appsv1alpha1.UnitedDeploymentStatus, updateErr error) { newStatus = ud.Status.DeepCopy() + exists, provisioned, err := r.manageSubsetProvision(ud, nameToSubset, nextUpdate, currentRevision, updatedRevision, subsetType) if err != nil { SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetProvisioned, corev1.ConditionFalse, "Error", err.Error())) @@ -79,6 +80,17 @@ func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeploym if updateErr == nil { SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionTrue, "", "")) } else { + // If using an Adaptive scheduling strategy, when the subset is scaled out leading to the creation of new Pods, + // future potential scheduling failures need to be checked for rescheduling. + var newPodCreated = false + for _, cell := range needUpdate { + subset := (*nameToSubset)[cell] + replicas := nextUpdate[cell].Replicas + newPodCreated = newPodCreated || subset.Spec.Replicas < replicas + } + if strategy := ud.Spec.Topology.ScheduleStrategy; strategy.IsAdaptive() && newPodCreated { + durationStore.Push(getUnitedDeploymentKey(ud), strategy.GetRescheduleCriticalDuration()) + } SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionFalse, "Error", updateErr.Error())) } return @@ -132,6 +144,11 @@ func (r *ReconcileUnitedDeployment) manageSubsetProvision(ud *appsv1alpha1.Unite return nil }) if createdErr == nil { + // When a new subset is created, regardless of whether it contains newly created Pods, + // a requeue is triggered to treat it as an existing subset and update its unschedulable information. + if strategy := ud.Spec.Topology.ScheduleStrategy; strategy.IsAdaptive() { + durationStore.Push(getUnitedDeploymentKey(ud), strategy.GetRescheduleCriticalDuration()) + } r.recorder.Eventf(ud.DeepCopy(), corev1.EventTypeNormal, fmt.Sprintf("Successful%s", eventTypeSubsetsUpdate), "Create %d Subset (%s)", createdNum, subsetType) } else { errs = append(errs, createdErr) diff --git a/pkg/controller/util/pod_condition_utils.go b/pkg/controller/util/pod_condition_utils.go index 44fa65a7aa..c2a69d339e 100644 --- a/pkg/controller/util/pod_condition_utils.go +++ b/pkg/controller/util/pod_condition_utils.go @@ -2,6 +2,7 @@ package util import ( "encoding/json" + "time" v1 "k8s.io/api/core/v1" ) @@ -22,3 +23,24 @@ func UpdateMessageKvCondition(kv map[string]interface{}, condition *v1.PodCondit message, _ := json.Marshal(kv) condition.Message = string(message) } + +// GetTimeBeforePendingTimeout return true when Pod was scheduled failed and timeout. +// nextCheckAfter > 0 means the pod is failed to schedule but not timeout yet. +func GetTimeBeforePendingTimeout(pod *v1.Pod, timeout time.Duration) (timeouted bool, nextCheckAfter time.Duration) { + if pod.DeletionTimestamp != nil || pod.Status.Phase != v1.PodPending || pod.Spec.NodeName != "" { + return false, -1 + } + for _, condition := range pod.Status.Conditions { + if condition.Type == v1.PodScheduled && condition.Status == v1.ConditionFalse && + condition.Reason == v1.PodReasonUnschedulable { + currentTime := time.Now() + expectSchedule := pod.CreationTimestamp.Add(timeout) + // schedule timeout + if expectSchedule.Before(currentTime) { + return true, -1 + } + return false, expectSchedule.Sub(currentTime) + } + } + return false, -1 +} diff --git a/pkg/controller/workloadspread/reschedule.go b/pkg/controller/workloadspread/reschedule.go index 810a357c47..a429053676 100644 --- a/pkg/controller/workloadspread/reschedule.go +++ b/pkg/controller/workloadspread/reschedule.go @@ -20,6 +20,7 @@ import ( "context" "time" + "github.com/openkruise/kruise/pkg/controller/util" corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -111,26 +112,9 @@ func (r *ReconcileWorkloadSpread) deletePodsForSubset(ws *appsv1alpha1.WorkloadS // PodUnscheduledTimeout return true when Pod was scheduled failed and timeout. func PodUnscheduledTimeout(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod) bool { - if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodPending || pod.Spec.NodeName != "" { - return false + timeouted, nextCheckAfter := util.GetTimeBeforePendingTimeout(pod, time.Second*time.Duration(*ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds)) + if nextCheckAfter > 0 { + durationStore.Push(getWorkloadSpreadKey(ws), nextCheckAfter) } - for _, condition := range pod.Status.Conditions { - if condition.Type == corev1.PodScheduled && condition.Status == corev1.ConditionFalse && - condition.Reason == corev1.PodReasonUnschedulable { - currentTime := time.Now() - rescheduleCriticalSeconds := *ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds - - expectSchedule := pod.CreationTimestamp.Add(time.Second * time.Duration(rescheduleCriticalSeconds)) - // schedule timeout - if expectSchedule.Before(currentTime) { - return true - } - - // no timeout, requeue key when expectSchedule is equal to time.Now() - durationStore.Push(getWorkloadSpreadKey(ws), expectSchedule.Sub(currentTime)) - - return false - } - } - return false + return timeouted } diff --git a/pkg/util/expectations/resource_version_expectation.go b/pkg/util/expectations/resource_version_expectation.go index fc11c21fc2..4d5e753870 100644 --- a/pkg/util/expectations/resource_version_expectation.go +++ b/pkg/util/expectations/resource_version_expectation.go @@ -36,6 +36,13 @@ func NewResourceVersionExpectation() ResourceVersionExpectation { return &realResourceVersionExpectation{objectVersions: make(map[types.UID]*objectCacheVersions, 100)} } +func NewReallyNewerResourceVersionExpectation() ResourceVersionExpectation { + return &reallyNewerResourceVersionExpectation{ + realResourceVersionExpectation: realResourceVersionExpectation{ + objectVersions: make(map[types.UID]*objectCacheVersions, 100), + }} +} + type realResourceVersionExpectation struct { sync.Mutex objectVersions map[types.UID]*objectCacheVersions @@ -119,3 +126,38 @@ func isResourceVersionNewer(old, new string) bool { return newCount >= oldCount } + +type reallyNewerResourceVersionExpectation struct { + realResourceVersionExpectation +} + +func (r *reallyNewerResourceVersionExpectation) Observe(obj metav1.Object) { + r.Lock() + defer r.Unlock() + + expectations := r.objectVersions[obj.GetUID()] + if expectations == nil { + return + } + if isResourceVersionReallyNewer(r.objectVersions[obj.GetUID()].version, obj.GetResourceVersion()) { + delete(r.objectVersions, obj.GetUID()) + } +} + +func isResourceVersionReallyNewer(old, new string) bool { + if len(old) == 0 { + return true + } + + oldCount, err := strconv.ParseUint(old, 10, 64) + if err != nil { + return true + } + + newCount, err := strconv.ParseUint(new, 10, 64) + if err != nil { + return false + } + + return newCount > oldCount +} diff --git a/pkg/util/workloadspread/workloadspread.go b/pkg/util/workloadspread/workloadspread.go index 77b3c3aace..5d8d9fe141 100644 --- a/pkg/util/workloadspread/workloadspread.go +++ b/pkg/util/workloadspread/workloadspread.go @@ -566,7 +566,7 @@ func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread, } // return two parameters -// 1. isRecord(bool) 2. SubsetStatus +// 1. isRecord(bool) 2. SubsetStatuses func isPodRecordedInSubset(subsetStatuses []appsv1alpha1.WorkloadSpreadSubsetStatus, podName string) (bool, *appsv1alpha1.WorkloadSpreadSubsetStatus) { for _, subset := range subsetStatuses { if _, ok := subset.CreatingPods[podName]; ok { diff --git a/test/e2e/apps/uniteddeployment.go b/test/e2e/apps/uniteddeployment.go index 9d41cd9d01..96ac43798a 100644 --- a/test/e2e/apps/uniteddeployment.go +++ b/test/e2e/apps/uniteddeployment.go @@ -1,13 +1,19 @@ package apps import ( + "context" "fmt" "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" "github.com/openkruise/kruise/test/e2e/framework" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" clientset "k8s.io/client-go/kubernetes" + "k8s.io/utils/ptr" ) var _ = SIGDescribe("uniteddeployment", func() { @@ -77,4 +83,118 @@ var _ = SIGDescribe("uniteddeployment", func() { udManager.Scale(1) udManager.CheckSubsets(replicasMap([]int32{0, 0, 1})) }) + + ginkgo.It("adaptive united deployment with elastic allocator", func() { + replicas := func(r int) *intstr.IntOrString { p := intstr.FromInt32(int32(r)); return &p } + replicasMap := func(replicas []int32) map[string]int32 { + replicaMap := make(map[string]int32) + for i, r := range replicas { + replicaMap[fmt.Sprintf("subset-%d", i)] = r + } + return replicaMap + } + unschedulableMap := func(unschedulables []bool) map[string]bool { + resultMap := make(map[string]bool) + for i, r := range unschedulables { + resultMap[fmt.Sprintf("subset-%d", i)] = r + } + return resultMap + } + + udManager := tester.NewUnitedDeploymentManager("adaptive-ud-elastic-test") + // enable adaptive scheduling + udManager.UnitedDeployment.Spec.Topology.ScheduleStrategy = appsv1alpha1.UnitedDeploymentScheduleStrategy{ + Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType, + Adaptive: &appsv1alpha1.AdaptiveUnitedDeploymentStrategy{ + RescheduleCriticalSeconds: ptr.To(int32(20)), + UnschedulableLastSeconds: ptr.To(int32(15)), + }, + } + udManager.AddSubset("subset-0", nil, nil, replicas(2)) + udManager.AddSubset("subset-1", nil, nil, replicas(2)) + udManager.AddSubset("subset-2", nil, nil, nil) + // make subset-1 unschedulable + nodeKey := "ud-e2e/to-make-a-bad-subset-elastic" + udManager.UnitedDeployment.Spec.Topology.Subsets[1].NodeSelectorTerm = corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: nodeKey, + Operator: corev1.NodeSelectorOpExists, + }, + }, + } + + ginkgo.By("creating united deployment") + udManager.Spec.Replicas = ptr.To(int32(3)) + _, err := f.KruiseClientSet.AppsV1alpha1().UnitedDeployments(udManager.Namespace).Create(context.Background(), + udManager.UnitedDeployment, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("wait for rescheduling, will take long") + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, true, false})) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1})) + fmt.Println() + + ginkgo.By("scale up while unschedulable") + udManager.Scale(4) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 2})) + fmt.Println() + + ginkgo.By("scale down while unschedulable") + udManager.Scale(3) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1})) + fmt.Println() + + ginkgo.By("wait subset recovery, will take long") + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + fmt.Println() + + ginkgo.By("scale up after recovery") + udManager.Scale(4) + udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 1})) + fmt.Println() + + ginkgo.By("scale down after recovery") + udManager.Scale(3) + udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 0})) // even pods in subset-1 are not ready + fmt.Println() + + ginkgo.By("create new subset") + udManager.AddSubset("subset-3", nil, replicas(2), nil) + udManager.Update() + fmt.Println() + + ginkgo.By("waiting final status after scaling up to new subset, will take long") + udManager.Scale(6) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false, false})) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 2, 2})) + fmt.Println() + + ginkgo.By("fix subset-1 and wait recover") + nodeList, err := f.ClientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + someNode := nodeList.Items[1] + someNode.Labels[nodeKey] = "haha" + _, err = f.ClientSet.CoreV1().Nodes().Update(context.Background(), &someNode, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + + ginkgo.By("waiting final status after deleting new subset") + udManager.Spec.Topology.Subsets = udManager.Spec.Topology.Subsets[:3] + udManager.Update() + udManager.CheckSubsetPods(replicasMap([]int32{2, 2, 2})) + fmt.Println() + + ginkgo.By("scale down after fixed") + udManager.Scale(3) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 0})) + fmt.Println() + + ginkgo.By("scale up after fixed") + udManager.Scale(5) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + udManager.CheckSubsetPods(replicasMap([]int32{2, 2, 1})) + fmt.Println() + }) }) diff --git a/test/e2e/framework/uniteddeployment.go b/test/e2e/framework/uniteddeployment.go index 02bab7b662..1247dc411d 100644 --- a/test/e2e/framework/uniteddeployment.go +++ b/test/e2e/framework/uniteddeployment.go @@ -3,6 +3,9 @@ package framework import ( "context" "fmt" + "reflect" + "time" + "github.com/onsi/gomega" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" @@ -13,8 +16,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" clientset "k8s.io/client-go/kubernetes" "k8s.io/utils/pointer" - "reflect" - "time" ) type UnitedDeploymentTester struct { @@ -31,6 +32,8 @@ func NewUnitedDeploymentTester(c clientset.Interface, kc kruiseclientset.Interfa } } +var zero = int64(0) + func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *UnitedDeploymentManager { return &UnitedDeploymentManager{ UnitedDeployment: &appsv1alpha1.UnitedDeployment{ @@ -64,6 +67,7 @@ func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *United }, }, Spec: v1.PodSpec{ + TerminationGracePeriodSeconds: &zero, Containers: []v1.Container{ { Name: "busybox", @@ -81,12 +85,14 @@ func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *United }, }, kc: t.kc, + c: t.c, } } type UnitedDeploymentManager struct { *appsv1alpha1.UnitedDeployment kc kruiseclientset.Interface + c clientset.Interface } func (m *UnitedDeploymentManager) AddSubset(name string, replicas, minReplicas, maxReplicas *intstr.IntOrString) { @@ -120,7 +126,12 @@ func (m *UnitedDeploymentManager) Create(replicas int32) { gomega.Eventually(func() bool { ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - return ud.Status.Replicas == replicas && ud.Generation == ud.Status.ObservedGeneration + ok := ud.Status.Replicas == replicas && ud.Generation == ud.Status.ObservedGeneration + if !ok { + fmt.Printf("UnitedDeploymentManager.Create failed\nud.Status.Replicas: %d, ud.Generation: %d, ud.Status.ObservedGeneration: %d\n", + ud.Status.Replicas, ud.Generation, ud.Status.ObservedGeneration) + } + return ok }, time.Minute, time.Second).Should(gomega.BeTrue()) } @@ -128,6 +139,57 @@ func (m *UnitedDeploymentManager) CheckSubsets(replicas map[string]int32) { gomega.Eventually(func() bool { ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - return ud.GetGeneration() == ud.Status.ObservedGeneration && *ud.Spec.Replicas == ud.Status.Replicas && reflect.DeepEqual(replicas, ud.Status.SubsetReplicas) - }, time.Minute, time.Second).Should(gomega.BeTrue()) + ok := ud.GetGeneration() == ud.Status.ObservedGeneration && *ud.Spec.Replicas == ud.Status.Replicas && reflect.DeepEqual(replicas, ud.Status.SubsetReplicas) + if !ok { + fmt.Printf("UnitedDeploymentManager.CheckSubsets failed\nud.GetGeneration(): %d, ud.Status.ObservedGeneration: %d, *ud.Spec.Replicas: %d, ud.Status.Replicas: %d, ud.Status.SubsetReplicas: %v\n", ud.GetGeneration(), + ud.Status.ObservedGeneration, *ud.Spec.Replicas, ud.Status.Replicas, ud.Status.SubsetReplicas) + } + return ok + }, 3*time.Minute, time.Second).Should(gomega.BeTrue()) +} + +func (m *UnitedDeploymentManager) Update() { + gomega.Eventually(func(g gomega.Gomega) { + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.Background(), m.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(gomega.HaveOccurred()) + ud.Spec = m.UnitedDeployment.DeepCopy().Spec + _, err = m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Update(context.Background(), ud, metav1.UpdateOptions{}) + g.Expect(err).NotTo(gomega.HaveOccurred()) + }, time.Minute, time.Second).Should(gomega.Succeed()) +} + +func (m *UnitedDeploymentManager) CheckSubsetPods(expect map[string]int32) { + fmt.Print("CheckSubsetPods ") + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func(g gomega.Gomega) { + actual := map[string]int32{} + for _, subset := range ud.Spec.Topology.Subsets { + podList, err := m.c.CoreV1().Pods(m.Namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("apps.kruise.io/subset-name=%s", subset.Name), + }) + g.Expect(err).NotTo(gomega.HaveOccurred()) + actual[subset.Name] = int32(len(podList.Items)) + } + g.Expect(expect).To(gomega.BeEquivalentTo(actual)) + }, time.Minute, 500*time.Millisecond).Should(gomega.Succeed()) + fmt.Println("pass") +} + +func (m *UnitedDeploymentManager) CheckUnschedulableStatus(expect map[string]bool) { + fmt.Print("CheckUnschedulableStatus ") + gomega.Eventually(func(g gomega.Gomega) { + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(gomega.HaveOccurred()) + g.Expect(ud.Status.SubsetStatuses != nil).To(gomega.BeTrue()) + actual := map[string]bool{} + for name := range expect { + status := ud.Status.GetSubsetStatus(name) + g.Expect(status != nil).To(gomega.BeTrue()) + condition := status.GetCondition(appsv1alpha1.UnitedDeploymentSubsetSchedulable) + actual[name] = condition != nil && condition.Status == v1.ConditionFalse + } + g.Expect(expect).To(gomega.BeEquivalentTo(actual)) + }, time.Minute, 500*time.Millisecond).Should(gomega.Succeed()) + fmt.Println("pass") }