diff --git a/apis/apps/v1alpha1/well_know_annotations.go b/apis/apps/v1alpha1/well_know_annotations.go index 68f6b212ba..caf740c77c 100644 --- a/apis/apps/v1alpha1/well_know_annotations.go +++ b/apis/apps/v1alpha1/well_know_annotations.go @@ -3,6 +3,8 @@ package v1alpha1 const ( // AnnotationUsingEnhancedLiveness indicates that the enhanced liveness probe of pod is enabled. AnnotationUsingEnhancedLiveness = "apps.kruise.io/using-enhanced-liveness" - // AnnotationUsingEnhancedLiveness indicates the backup probe (json types) of the pod native container livnessprobe configuration. + // AnnotationNativeContainerProbeContext indicates the backup probe (json types) of the pod native container livnessprobe configuration. AnnotationNativeContainerProbeContext = "apps.kruise.io/container-probe-context" + // AnnotationReallocate indicates whether controller should reallocate replicas between subsetes, which is set by webhook. + AnnotationReallocate = "apps.kruise.io/subset-reallocate" ) diff --git a/pkg/controller/uniteddeployment/allocator.go b/pkg/controller/uniteddeployment/allocator.go index ac560fc9a6..65b1da3f03 100644 --- a/pkg/controller/uniteddeployment/allocator.go +++ b/pkg/controller/uniteddeployment/allocator.go @@ -313,7 +313,7 @@ func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameTo if ac.Spec.Topology.ScheduleStrategy.IsAdaptive() { unschedulable := getSubSetUnschedulable(subset.Name, nameToSubset) // This means that in the Adaptive scheduling strategy, an unschedulable subset can only be scaled down, not scaled up. - if notPendingReplicas, ok := notPendingReplicasMap[subset.Name]; unschedulable && ok { + if notPendingReplicas, ok := notPendingReplicasMap[subset.Name]; getSubSetUnschedulable(subset.Name, nameToSubset) && ok { klog.InfoS("Assign min(notPendingReplicas, minReplicas/maxReplicas) for unschedulable subset", "subset", subset.Name) minReplicas = integer.Int32Min(notPendingReplicas, minReplicas) diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller.go b/pkg/controller/uniteddeployment/uniteddeployment_controller.go index 888f975da8..29f36e0aa5 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller.go @@ -210,14 +210,33 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci return reconcile.Result{}, err } - nextReplicas, err := NewReplicaAllocator(instance).Alloc(nameToSubset) - klog.V(4).InfoS("Got UnitedDeployment next replicas", "unitedDeployment", klog.KObj(instance), "nextReplicas", nextReplicas) - if err != nil { - klog.ErrorS(err, "UnitedDeployment specified subset replicas is ineffective", "unitedDeployment", klog.KObj(instance)) - r.recorder.Eventf(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed %s", - eventTypeSpecifySubsetReplicas), "Specified subset replicas is ineffective: %s", err.Error()) - return reconcile.Result{}, err + // A Reallocation of replicas between Subsets is triggered if and only if one of the following conditions is met: + // 1. The user manually modified the Spec, when webhook will set annotation + // "apps.kruise.io/subset-reallocate" = "true"; + // 2. During this Reconcile, a previously schedulable subset is found to have + // just become unschedulable, which means UnschedulableTimestamp is nil + nextReplicas := &map[string]int32{} + reallocate := instance.Annotations[appsv1alpha1.AnnotationReallocate] == "true" + for name, subset := range *nameToSubset { + if status := subset.Status.UnschedulableStatus; status.Unschedulable && status.UnschedulableTimestamp == nil { + klog.InfoS("new unschedulable subset discovered, should reallocate", "name", name) + reallocate = true + } + } + if reallocate { + klog.Info("reallocate subset replicas for spec is changed") + nextReplicas, err = NewReplicaAllocator(instance).Alloc(nameToSubset) + if err != nil { + klog.ErrorS(err, "UnitedDeployment specified subset replicas is ineffective", "unitedDeployment", klog.KObj(instance)) + r.recorder.Eventf(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed %s", + eventTypeSpecifySubsetReplicas), "Specified subset replicas is ineffective: %s", err.Error()) + return reconcile.Result{}, err + } + } else { + klog.Info("usd old subset replicas for spec is not changed") + nextReplicas = &oldStatus.SubsetReplicas } + klog.V(4).InfoS("Got UnitedDeployment next replicas", "unitedDeployment", klog.KObj(instance), "nextReplicas", nextReplicas) nextPartitions := calcNextPartitions(instance, nextReplicas) nextUpdate := getNextUpdate(instance, nextReplicas, nextPartitions) @@ -312,7 +331,9 @@ func manageUnschedulableStatusForExistingSubset(name string, subset *Subset, ud "subset", subset.Name, "pendingPods", subset.Status.UnschedulableStatus.PendingPods) if !subset.Status.UnschedulableStatus.Unschedulable { subset.Status.UnschedulableStatus.Unschedulable = true - subset.Status.UnschedulableStatus.UnschedulableTimestamp = &metav1.Time{Time: time.Now()} + // UnschedulableTimestamp is nil to + // 1. specify that this unschedulable status is just discovered. + // 2. set time after all process done, making it more accurate. durationStore.Push(unitedDeploymentKey, ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration()) } } @@ -432,7 +453,7 @@ func (r *ReconcileUnitedDeployment) calculateStatus(newStatus *appsv1alpha1.Unit // sync from status for _, subset := range *nameToSubset { - subsetReplicas, subsetReadyReplicas, subsetUpdatedReplicas, subsetUpdatedReadyReplicas := replicasStatusFn(subset) + subsetReplicas, subsetReadyReplicas, subsetUpdatedReplicas, subsetUpdatedReadyReplicas := replicasStatus(subset) newStatus.Replicas += subsetReplicas newStatus.ReadyReplicas += subsetReadyReplicas newStatus.UpdatedReplicas += subsetUpdatedReplicas @@ -472,11 +493,17 @@ func (r *ReconcileUnitedDeployment) calculateStatus(newStatus *appsv1alpha1.Unit SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetFailure, corev1.ConditionTrue, "Error", *subsetFailure)) } + // set new unschedulable timestamp + for name, status := range newStatus.SubsetUnschedulable { + if status.Unschedulable && status.UnschedulableTimestamp == nil { + status.UnschedulableTimestamp = &metav1.Time{Time: time.Now()} + newStatus.SubsetUnschedulable[name] = status + } + } + return newStatus } -var replicasStatusFn = replicasStatus - func replicasStatus(subset *Subset) (replicas, readyReplicas, updatedReplicas, updatedReadyReplicas int32) { replicas = subset.Status.Replicas readyReplicas = subset.Status.ReadyReplicas @@ -518,6 +545,11 @@ func (r *ReconcileUnitedDeployment) updateUnitedDeployment(ud *appsv1alpha1.Unit obj.Status = *newStatus updateErr = r.Client.Status().Update(context.TODO(), obj) + if annotations := ud.GetAnnotations(); updateErr == nil && annotations[appsv1alpha1.AnnotationReallocate] == "true" { + // set annotation to false because it is a one-shot operation。 + annotations[appsv1alpha1.AnnotationReallocate] = "false" + updateErr = r.Client.Update(context.TODO(), ud) + } if updateErr == nil { return obj, nil } diff --git a/pkg/webhook/uniteddeployment/mutating/uniteddeployment_create_update_handler.go b/pkg/webhook/uniteddeployment/mutating/uniteddeployment_create_update_handler.go index d1ad3e69ef..0cdd7ee885 100644 --- a/pkg/webhook/uniteddeployment/mutating/uniteddeployment_create_update_handler.go +++ b/pkg/webhook/uniteddeployment/mutating/uniteddeployment_create_update_handler.go @@ -19,6 +19,7 @@ package mutating import ( "context" "encoding/json" + "fmt" "net/http" "reflect" @@ -73,6 +74,16 @@ func (h *UnitedDeploymentCreateUpdateHandler) Handle(ctx context.Context, req ad } } defaults.SetDefaultsUnitedDeployment(obj, injectTemplateDefaults) + + oldObj := &appsv1alpha1.UnitedDeployment{} + _ = h.Decoder.DecodeRaw(req.OldObject, oldObj) + reallocate := !reflect.DeepEqual(oldObj.Spec, obj.Spec) + klog.InfoS("comparing spec", "reallocate", reallocate, "oldSpec", oldObj.Spec, "newSpec", obj.Spec) + if obj.Annotations == nil { + obj.Annotations = make(map[string]string) + } + obj.Annotations[appsv1alpha1.AnnotationReallocate] = fmt.Sprintf("%t", reallocate) + obj.Status = appsv1alpha1.UnitedDeploymentStatus{} if reflect.DeepEqual(obj, copy) { return admission.Allowed("")