Skip to content

Commit

Permalink
test suspend feature (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
chriskery authored Oct 9, 2023
1 parent 8f68321 commit 1bbadca
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

# Image URL to use all building/pushing image targets
IMG ?= registry.cn-shanghai.aliyuncs.com/eflops-bcp/kubecluster:latest
IMG ?= docker.io/chriskery/kubecluster:latest
# ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary.
ENVTEST_K8S_VERSION = 1.28.0

Expand Down
2 changes: 1 addition & 1 deletion apis/kubecluster.org/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ type ClusterStatus struct {
// +resource:path=kubeclusters
//+kubebuilder:resource:scope=Namespaced,path=kubeclusters,shortName={"kc","kcluster"}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:JSONPath=`.metadata.creationTimestamp`,name="Age",type=date
//+kubebuilder:printcolumn:JSONPath=`.status.conditions[-1:].type`,name="State",type=string
//+kubebuilder:subresource:status

// KubeCluster is the Schema for the clusters API
type KubeCluster struct {
Expand Down
6 changes: 6 additions & 0 deletions apis/kubecluster.org/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ type RunPolicy struct {
// Default to None.
CleanKubeNodePolicy *CleanKubeNodePolicy `json:"CleanKubeNodePolicy,omitempty"`

// TTLSecondsAfterFinished is the TTL to clean up clusters.
// It may take extra ReconcilePeriod seconds for the cleanup, since
// reconcile gets called periodically.
// Default to infinite.
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"`

// Specifies the duration in seconds relative to the startTime that the KubeCluster may be active
// before the system tries to terminate it; value must be positive integer.
// +optional
Expand Down
6 changes: 6 additions & 0 deletions manifests/crd/bases/kubecluster.org_kubeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8226,6 +8226,12 @@ spec:
handle this. Suspending a KubeCluster will reset the StartTime
field of the KubeCluster. \n Defaults to false."
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up clusters.
It may take extra ReconcilePeriod seconds for the cleanup, since
reconcile gets called periodically. Default to infinite.
format: int32
type: integer
type: object
required:
- clusterReplicaSpec
Expand Down
2 changes: 1 addition & 1 deletion manifests/samples/slurm-ubuntu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ spec:
template:
spec:
containers:
- name: ubuntu
- name: kubenode
# image: registry.cn-hangzhou.aliyuncs.com/eflops/slurm-dev:sysbench
image: registry.cn-hangzhou.aliyuncs.com/eflops/slurm-dev:sysbench
resources:
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (r *KubeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
schemaReconciler.Default(kcluster)

if err = r.ReconcileKubeCluster(kcluster, schemaReconciler); err != nil {
logrus.Warnf("Reconcile Kube CLuster error %v", err)
logrus.Warnf("Reconcile Kube Cluster error %v", err)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -414,7 +414,7 @@ func (r *KubeClusterReconciler) DeleteCluster(metaObject metav1.Object) error {
}
if err := r.Delete(context.Background(), kubecluster); err != nil {
r.recorder.Eventf(kubecluster, corev1.EventTypeWarning, control.FailedDeletePodReason, "Error deleting: %v", err)
logrus.Error(err, "failed to delete cluster", "namespace", kubecluster.Namespace, "name", kubecluster.Name)
logrus.Error(err, " failed to delete cluster ", " namespace ", kubecluster.Namespace, " name ", kubecluster.Name)
return err
}
r.recorder.Eventf(kubecluster, corev1.EventTypeNormal, control.SuccessfulDeletePodReason, "Deleted cluster: %v", kubecluster.Name)
Expand Down
48 changes: 38 additions & 10 deletions pkg/controller/ctrlcommon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/chriskery/kubecluster/pkg/controller/expectation"
"github.com/chriskery/kubecluster/pkg/core"
"github.com/chriskery/kubecluster/pkg/util"
"github.com/chriskery/kubecluster/pkg/util/clusterutil"
"github.com/chriskery/kubecluster/pkg/util/k8sutil"
"github.com/chriskery/kubecluster/pkg/util/misc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
Expand All @@ -46,6 +46,7 @@ import (
schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
"strings"
"sync"
"time"
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
)
Expand Down Expand Up @@ -251,7 +252,7 @@ func (cc *ClusterController) ReconcileKubeCluster(kcluster *v1alpha1.KubeCluster
clusterName := metaObject.GetName()
clusterKind := cc.Controller.GetAPIGroupVersionKind().Kind
oldStatus := kcluster.Status.DeepCopy()
if misc.IsClusterSuspended(runPolicy) {
if clusterutil.IsClusterSuspended(runPolicy) {
if err = cc.CleanUpResources(runPolicy, runtimeObject, metaObject, kcluster.Status, pods); err != nil {
return err
}
Expand Down Expand Up @@ -329,7 +330,7 @@ func (cc *ClusterController) ReconcileKubeCluster(kcluster *v1alpha1.KubeCluster
return err
}

if err = cc.CleanupCluster(runtimeObject); err != nil {
if err = cc.CleanupCluster(runPolicy, runtimeObject, kcluster.Status); err != nil {
return err
}

Expand Down Expand Up @@ -531,20 +532,47 @@ func (cc *ClusterController) CleanUpResources(
cc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", metaObject.GetName())
}
}
if err := cc.CleanupCluster(runtimeObject); err != nil {

if err := cc.CleanupCluster(runPolicy, runtimeObject, clusterStatus); err != nil {
return err
}
return nil
}

func (cc *ClusterController) CleanupCluster(runtimeObject runtime.Object) error {
func (cc *ClusterController) CleanupCluster(runPolicy *v1alpha1.RunPolicy, runtimeObject runtime.Object, clusterStatus v1alpha1.ClusterStatus) error {
currentTime := time.Now()
ttl := runPolicy.TTLSecondsAfterFinished
if ttl == nil {
return nil
}
duration := time.Second * time.Duration(*ttl)
if clusterStatus.CompletionTime == nil {
return fmt.Errorf("job completion time is nil, cannot cleanup")
}

finishTime := clusterStatus.CompletionTime
expireTime := finishTime.Add(duration)
metaObject, _ := runtimeObject.(metav1.Object)
err := cc.Controller.DeleteCluster(metaObject)
if err != nil {
log.Errorf("FailedDeleteKubeCluster: %s", err)
cc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeleteKubeCluster", err.Error())
return err
if currentTime.After(expireTime) {
err := cc.Controller.DeleteCluster(metaObject)
if err != nil {
log.Errorf("FailedDeleteKubeCluster: %s", err)
cc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeleteKubeCluster", err.Error())
return err
}
} else {
if finishTime.After(currentTime) {
util.LoggerForCluster(metaObject).Warnf("Found Job finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.")
}
remaining := expireTime.Sub(currentTime)
key, err := KeyFunc(runtimeObject)
if err != nil {
util.LoggerForCluster(metaObject).Warnf("Couldn't get key for cluster object: %v", err)
return err
}
cc.WorkQueue.AddAfter(key, remaining)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/ctrlcommon/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/chriskery/kubecluster/pkg/common"
"github.com/chriskery/kubecluster/pkg/controller/expectation"
"github.com/chriskery/kubecluster/pkg/util"
miscutil "github.com/chriskery/kubecluster/pkg/util/clusterutil"
utillabels "github.com/chriskery/kubecluster/pkg/util/labels"
miscutil "github.com/chriskery/kubecluster/pkg/util/misc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
corev1 "k8s.io/api/core/v1"
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/misc/util.go → pkg/util/clusterutil/util.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package misc
package clusterutil

import (
kubeclusterorgv1alpha1 "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1"
Expand Down

0 comments on commit 1bbadca

Please sign in to comment.