Skip to content

Commit

Permalink
1. adapter.go: GetReplicaDetails returns pods in the subset
Browse files Browse the repository at this point in the history
2. xxx_adapter.go: return pods implementation ⬆️
3. allocator.go: about safeReplica
4. pod_condition_utils.go: extract PodUnscheduledTimeout function from workloadwpread
5. reschedule.go: PodUnscheduledTimeout function extracted
6. subset.go: add some field to Subset object to carry related information
7. subset_control.go: store subset pods to Subset object
8. uniteddeployment_controller.go
   1. add requeue feature to check failed pods
   2. subset unschedulable status management
9. uniteddeployment_types.go: API change
10. uniteddeployment_update.go: sync unschedulable to CR

Signed-off-by: AiRanthem <[email protected]>
  • Loading branch information
AiRanthem committed Sep 2, 2024
1 parent f5508c5 commit e9fd8d4
Show file tree
Hide file tree
Showing 19 changed files with 965 additions and 64 deletions.
81 changes: 80 additions & 1 deletion apis/apps/v1alpha1/uniteddeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package v1alpha1

import (
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -165,6 +167,10 @@ type Topology struct {
// +patchStrategy=merge
// +optional
Subsets []Subset `json:"subsets,omitempty" patchStrategy:"merge" patchMergeKey:"name"`

// ScheduleStrategy indicates the strategy the UnitedDeployment used to preform the schedule between each of subsets.
// +optional
ScheduleStrategy UnitedDeploymentScheduleStrategy `json:"scheduleStrategy,omitempty"`
}

// Subset defines the detail of a subset.
Expand Down Expand Up @@ -218,6 +224,69 @@ type Subset struct {
Patch runtime.RawExtension `json:"patch,omitempty"`
}

// UnitedDeploymentScheduleStrategyType is a string enumeration type that enumerates
// all possible schedule strategies for the UnitedDeployment controller.
// +kubebuilder:validation:Enum=Adaptive;Fixed;""
type UnitedDeploymentScheduleStrategyType string

const (
// AdaptiveUnitedDeploymentScheduleStrategyType represents that when a pod is stuck in the pending status and cannot
// be scheduled, allow it to be rescheduled to another subset.
AdaptiveUnitedDeploymentScheduleStrategyType UnitedDeploymentScheduleStrategyType = "Adaptive"
// FixedUnitedDeploymentScheduleStrategyType represents that pods are strictly scheduled to the selected subset
// even if scheduling fail.
FixedUnitedDeploymentScheduleStrategyType UnitedDeploymentScheduleStrategyType = "Fixed"
)

const (
DefaultRescheduleCriticalDuration = 30 * time.Second
DefaultUnschedulableStatusLastDuration = 300 * time.Second
)

// AdaptiveUnitedDeploymentStrategy is used to communicate parameters when Type is AdaptiveUnitedDeploymentScheduleStrategyType.
type AdaptiveUnitedDeploymentStrategy struct {
// RescheduleCriticalSeconds indicates how long controller will reschedule a schedule failed Pod to the subset that has
// redundant capacity after the subset where the Pod lives. If a Pod was scheduled failed and still in an unschedulabe status
// over RescheduleCriticalSeconds duration, the controller will reschedule it to a suitable subset. Default is 30 seconds.
// +optional
RescheduleCriticalSeconds *int32 `json:"rescheduleCriticalSeconds,omitempty"`

// UnschedulableLastSeconds is used to set the number of seconds for a Subset to recover from an unschedulable state,
// with a default value of 300 seconds.
// +optional
UnschedulableLastSeconds *int32 `json:"unschedulableLastSeconds,omitempty"`
}

// UnitedDeploymentScheduleStrategy defines the schedule performance of UnitedDeployment.
type UnitedDeploymentScheduleStrategy struct {
// Type indicates the type of the UnitedDeploymentScheduleStrategy.
// Default is Fixed
// +optional
Type UnitedDeploymentScheduleStrategyType `json:"type,omitempty"`

// Adaptive is used to communicate parameters when Type is AdaptiveUnitedDeploymentScheduleStrategyType.
// +optional
Adaptive *AdaptiveUnitedDeploymentStrategy `json:"adaptive,omitempty"`
}

func (s *UnitedDeploymentScheduleStrategy) IsAdaptive() bool {
return s.Type == AdaptiveUnitedDeploymentScheduleStrategyType
}

func (s *UnitedDeploymentScheduleStrategy) GetRescheduleCriticalDuration() time.Duration {
if s.Adaptive == nil || s.Adaptive.RescheduleCriticalSeconds == nil {
return DefaultRescheduleCriticalDuration
}
return time.Duration(*s.Adaptive.RescheduleCriticalSeconds) * time.Second
}

func (s *UnitedDeploymentScheduleStrategy) GetUnschedulableLastDuration() time.Duration {
if s.Adaptive == nil || s.Adaptive.UnschedulableLastSeconds == nil {
return DefaultUnschedulableStatusLastDuration
}
return time.Duration(*s.Adaptive.UnschedulableLastSeconds) * time.Second
}

// UnitedDeploymentStatus defines the observed state of UnitedDeployment.
type UnitedDeploymentStatus struct {
// ObservedGeneration is the most recent generation observed for this UnitedDeployment. It corresponds to the
Expand Down Expand Up @@ -252,6 +321,10 @@ type UnitedDeploymentStatus struct {
// +optional
SubsetReplicas map[string]int32 `json:"subsetReplicas,omitempty"`

// Record whether each subset is unschedulable.
// +optional
SubsetUnschedulable map[string]UnschedulableStatus `json:"subsetUnschedulable,omitempty"`

// Represents the latest available observations of a UnitedDeployment's current state.
// +optional
Conditions []UnitedDeploymentCondition `json:"conditions,omitempty"`
Expand All @@ -278,7 +351,7 @@ type UnitedDeploymentCondition struct {
// The reason for the condition's last transition.
Reason string `json:"reason,omitempty"`

// A human readable message indicating details about the transition.
// A human-readable message indicating details about the transition.
Message string `json:"message,omitempty"`
}

Expand All @@ -293,6 +366,12 @@ type UpdateStatus struct {
CurrentPartitions map[string]int32 `json:"currentPartitions,omitempty"`
}

type UnschedulableStatus struct {
Unschedulable bool `json:"unschedulable"`
UnschedulableTimestamp metav1.Time `json:"unschedulableTimestamp,omitempty"`
FailedPods int32 `json:"failedPods,omitempty"`
}

// +genclient
// +genclient:method=GetScale,verb=get,subresource=scale,result=k8s.io/api/autoscaling/v1.Scale
// +genclient:method=UpdateScale,verb=update,subresource=scale,input=k8s.io/api/autoscaling/v1.Scale,result=k8s.io/api/autoscaling/v1.Scale
Expand Down
69 changes: 69 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 49 additions & 1 deletion config/crd/bases/apps.kruise.io_uniteddeployments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,38 @@ spec:
description: Topology describes the pods distribution detail between
each of subsets.
properties:
scheduleStrategy:
description: ScheduleStrategy indicates the strategy the UnitedDeployment
used to preform the schedule between each of subsets.
properties:
adaptive:
description: Adaptive is used to communicate parameters when
Type is AdaptiveUnitedDeploymentScheduleStrategyType.
properties:
rescheduleCriticalSeconds:
description: |-
RescheduleCriticalSeconds indicates how long controller will reschedule a schedule failed Pod to the subset that has
redundant capacity after the subset where the Pod lives. If a Pod was scheduled failed and still in an unschedulabe status
over RescheduleCriticalSeconds duration, the controller will reschedule it to a suitable subset. Default is 30 seconds.
format: int32
type: integer
unschedulableLastSeconds:
description: |-
UnschedulableLastSeconds is used to set the number of seconds for a Subset to recover from an unschedulable state,
with a default value of 300 seconds.
format: int32
type: integer
type: object
type:
description: |-
Type indicates the type of the UnitedDeploymentScheduleStrategy.
Default is Fixed
enum:
- Adaptive
- Fixed
- ""
type: string
type: object
subsets:
description: |-
Contains the details of each subset. Each element in this array represents one subset
Expand Down Expand Up @@ -1161,7 +1193,7 @@ spec:
format: date-time
type: string
message:
description: A human readable message indicating details about
description: A human-readable message indicating details about
the transition.
type: string
reason:
Expand Down Expand Up @@ -1204,6 +1236,22 @@ spec:
description: Records the topology detail information of the replicas
of each subset.
type: object
subsetUnschedulable:
additionalProperties:
properties:
failedPods:
format: int32
type: integer
unschedulable:
type: boolean
unschedulableTimestamp:
format: date-time
type: string
required:
- unschedulable
type: object
description: Record whether each subset is unschedulable.
type: object
updateStatus:
description: Records the information of update progress.
properties:
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/uniteddeployment/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package adapter

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -25,14 +26,15 @@ import (
)

type Adapter interface {
// NewResourceObject creates a empty subset object.
// NewResourceObject creates an empty subset object.
NewResourceObject() client.Object
// NewResourceListObject creates a empty subset list object.
// NewResourceListObject creates an empty subset list object.
NewResourceListObject() client.ObjectList
// GetStatusObservedGeneration returns the observed generation of the subset.
GetStatusObservedGeneration(subset metav1.Object) int64
// GetReplicaDetails returns the replicas information of the subset status.
GetReplicaDetails(subset metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error)
GetReplicaDetails(subset metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas,
statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error)
// GetSubsetFailure returns failure information of the subset.
GetSubsetFailure() *string
// ApplySubsetTemplate updates the subset to the latest revision.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ func (a *AdvancedStatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Obje
}

// GetReplicaDetails returns the replicas detail the subset needs.
func (a *AdvancedStatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) {
func (a *AdvancedStatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) {

Check warning on line 66 in pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go#L66

Added line #L66 was not covered by tests
set := obj.(*v1beta1.StatefulSet)
var pods []*corev1.Pod
pods, err = a.getStatefulSetPods(set)
if err != nil {
return
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/uniteddeployment/adapter/cloneset_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ func (a *CloneSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 {
return obj.(*alpha1.CloneSet).Status.ObservedGeneration
}

func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) {
func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) {

Check warning on line 44 in pkg/controller/uniteddeployment/adapter/cloneset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/cloneset_adapter.go#L44

Added line #L44 was not covered by tests

set := obj.(*alpha1.CloneSet)

var pods []*corev1.Pod

pods, err = a.getCloneSetPods(set)

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,11 @@ func (a *DeploymentAdapter) GetStatusObservedGeneration(obj metav1.Object) int64
}

// GetReplicaDetails returns the replicas detail the subset needs.
func (a *DeploymentAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) {
func (a *DeploymentAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) {
// Convert to Deployment Object

Check warning on line 62 in pkg/controller/uniteddeployment/adapter/deployment_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/deployment_adapter.go#L61-L62

Added lines #L61 - L62 were not covered by tests
set := obj.(*appsv1.Deployment)

// Get all pods belonging to deployment
var pods []*corev1.Pod
pods, err = a.getDeploymentPods(set)
if err != nil {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ func (a *StatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int6
}

// GetReplicaDetails returns the replicas detail the subset needs.
func (a *StatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) {
func (a *StatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) {
set := obj.(*appsv1.StatefulSet)
var pods []*corev1.Pod
pods, err = a.getStatefulSetPods(set)
if err != nil {
return
Expand Down
Loading

0 comments on commit e9fd8d4

Please sign in to comment.