Skip to content

Commit

Permalink
update: realloc strategy
Browse files Browse the repository at this point in the history
Signed-off-by: AiRanthem <[email protected]>
  • Loading branch information
AiRanthem committed Sep 4, 2024
1 parent 68e8262 commit 4a03b42
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 13 deletions.
4 changes: 3 additions & 1 deletion apis/apps/v1alpha1/well_know_annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package v1alpha1
const (
// AnnotationUsingEnhancedLiveness indicates that the enhanced liveness probe of pod is enabled.
AnnotationUsingEnhancedLiveness = "apps.kruise.io/using-enhanced-liveness"
// AnnotationUsingEnhancedLiveness indicates the backup probe (json types) of the pod native container livnessprobe configuration.
// AnnotationNativeContainerProbeContext indicates the backup probe (json types) of the pod native container livnessprobe configuration.
AnnotationNativeContainerProbeContext = "apps.kruise.io/container-probe-context"
// AnnotationReallocate indicates whether controller should reallocate replicas between subsetes, which is set by webhook.
AnnotationReallocate = "apps.kruise.io/subset-reallocate"
)
2 changes: 1 addition & 1 deletion pkg/controller/uniteddeployment/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameTo
if ac.Spec.Topology.ScheduleStrategy.IsAdaptive() {
unschedulable := getSubSetUnschedulable(subset.Name, nameToSubset)
// This means that in the Adaptive scheduling strategy, an unschedulable subset can only be scaled down, not scaled up.
if notPendingReplicas, ok := notPendingReplicasMap[subset.Name]; unschedulable && ok {
if notPendingReplicas, ok := notPendingReplicasMap[subset.Name]; getSubSetUnschedulable(subset.Name, nameToSubset) && ok {
klog.InfoS("Assign min(notPendingReplicas, minReplicas/maxReplicas) for unschedulable subset",
"subset", subset.Name)
minReplicas = integer.Int32Min(notPendingReplicas, minReplicas)
Expand Down
54 changes: 43 additions & 11 deletions pkg/controller/uniteddeployment/uniteddeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,33 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci
return reconcile.Result{}, err
}

nextReplicas, err := NewReplicaAllocator(instance).Alloc(nameToSubset)
klog.V(4).InfoS("Got UnitedDeployment next replicas", "unitedDeployment", klog.KObj(instance), "nextReplicas", nextReplicas)
if err != nil {
klog.ErrorS(err, "UnitedDeployment specified subset replicas is ineffective", "unitedDeployment", klog.KObj(instance))
r.recorder.Eventf(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed %s",
eventTypeSpecifySubsetReplicas), "Specified subset replicas is ineffective: %s", err.Error())
return reconcile.Result{}, err
// A Reallocation of replicas between Subsets is triggered if and only if one of the following conditions is met:
// 1. The user manually modified the Spec, when webhook will set annotation
// "apps.kruise.io/subset-reallocate" = "true";
// 2. During this Reconcile, a previously schedulable subset is found to have
// just become unschedulable, which means UnschedulableTimestamp is nil
nextReplicas := &map[string]int32{}

Check failure on line 218 in pkg/controller/uniteddeployment/uniteddeployment_controller.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ineffectual assignment to nextReplicas (ineffassign)
reallocate := instance.Annotations[appsv1alpha1.AnnotationReallocate] == "true"
for name, subset := range *nameToSubset {
if status := subset.Status.UnschedulableStatus; status.Unschedulable && status.UnschedulableTimestamp == nil {
klog.InfoS("new unschedulable subset discovered, should reallocate", "name", name)
reallocate = true
}
}
if reallocate {
klog.Info("reallocate subset replicas for spec is changed")
nextReplicas, err = NewReplicaAllocator(instance).Alloc(nameToSubset)
if err != nil {
klog.ErrorS(err, "UnitedDeployment specified subset replicas is ineffective", "unitedDeployment", klog.KObj(instance))
r.recorder.Eventf(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed %s",
eventTypeSpecifySubsetReplicas), "Specified subset replicas is ineffective: %s", err.Error())
return reconcile.Result{}, err
}
} else {
klog.Info("usd old subset replicas for spec is not changed")
nextReplicas = &oldStatus.SubsetReplicas
}
klog.V(4).InfoS("Got UnitedDeployment next replicas", "unitedDeployment", klog.KObj(instance), "nextReplicas", nextReplicas)

nextPartitions := calcNextPartitions(instance, nextReplicas)
nextUpdate := getNextUpdate(instance, nextReplicas, nextPartitions)
Expand Down Expand Up @@ -312,7 +331,9 @@ func manageUnschedulableStatusForExistingSubset(name string, subset *Subset, ud
"subset", subset.Name, "pendingPods", subset.Status.UnschedulableStatus.PendingPods)
if !subset.Status.UnschedulableStatus.Unschedulable {
subset.Status.UnschedulableStatus.Unschedulable = true
subset.Status.UnschedulableStatus.UnschedulableTimestamp = &metav1.Time{Time: time.Now()}
// UnschedulableTimestamp is nil to
// 1. specify that this unschedulable status is just discovered.
// 2. set time after all process done, making it more accurate.
durationStore.Push(unitedDeploymentKey, ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration())
}
}
Expand Down Expand Up @@ -432,7 +453,7 @@ func (r *ReconcileUnitedDeployment) calculateStatus(newStatus *appsv1alpha1.Unit

// sync from status
for _, subset := range *nameToSubset {
subsetReplicas, subsetReadyReplicas, subsetUpdatedReplicas, subsetUpdatedReadyReplicas := replicasStatusFn(subset)
subsetReplicas, subsetReadyReplicas, subsetUpdatedReplicas, subsetUpdatedReadyReplicas := replicasStatus(subset)
newStatus.Replicas += subsetReplicas
newStatus.ReadyReplicas += subsetReadyReplicas
newStatus.UpdatedReplicas += subsetUpdatedReplicas
Expand Down Expand Up @@ -472,11 +493,17 @@ func (r *ReconcileUnitedDeployment) calculateStatus(newStatus *appsv1alpha1.Unit
SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetFailure, corev1.ConditionTrue, "Error", *subsetFailure))
}

// set new unschedulable timestamp
for name, status := range newStatus.SubsetUnschedulable {
if status.Unschedulable && status.UnschedulableTimestamp == nil {
status.UnschedulableTimestamp = &metav1.Time{Time: time.Now()}
newStatus.SubsetUnschedulable[name] = status
}
}

return newStatus
}

var replicasStatusFn = replicasStatus

func replicasStatus(subset *Subset) (replicas, readyReplicas, updatedReplicas, updatedReadyReplicas int32) {
replicas = subset.Status.Replicas
readyReplicas = subset.Status.ReadyReplicas
Expand Down Expand Up @@ -518,6 +545,11 @@ func (r *ReconcileUnitedDeployment) updateUnitedDeployment(ud *appsv1alpha1.Unit
obj.Status = *newStatus

updateErr = r.Client.Status().Update(context.TODO(), obj)
if annotations := ud.GetAnnotations(); updateErr == nil && annotations[appsv1alpha1.AnnotationReallocate] == "true" {
// set annotation to false because it is a one-shot operation。
annotations[appsv1alpha1.AnnotationReallocate] = "false"
updateErr = r.Client.Update(context.TODO(), ud)
}
if updateErr == nil {
return obj, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mutating
import (
"context"
"encoding/json"
"fmt"
"net/http"
"reflect"

Expand Down Expand Up @@ -73,6 +74,16 @@ func (h *UnitedDeploymentCreateUpdateHandler) Handle(ctx context.Context, req ad
}
}
defaults.SetDefaultsUnitedDeployment(obj, injectTemplateDefaults)

oldObj := &appsv1alpha1.UnitedDeployment{}
_ = h.Decoder.DecodeRaw(req.OldObject, oldObj)
reallocate := !reflect.DeepEqual(oldObj.Spec, obj.Spec)
klog.InfoS("comparing spec", "reallocate", reallocate, "oldSpec", oldObj.Spec, "newSpec", obj.Spec)
if obj.Annotations == nil {
obj.Annotations = make(map[string]string)
}
obj.Annotations[appsv1alpha1.AnnotationReallocate] = fmt.Sprintf("%t", reallocate)

obj.Status = appsv1alpha1.UnitedDeploymentStatus{}
if reflect.DeepEqual(obj, copy) {
return admission.Allowed("")
Expand Down

0 comments on commit 4a03b42

Please sign in to comment.