diff --git a/controllers/elfcluster_controller.go b/controllers/elfcluster_controller.go index 4d322f82..a69312a5 100644 --- a/controllers/elfcluster_controller.go +++ b/controllers/elfcluster_controller.go @@ -279,6 +279,32 @@ func (r *ElfClusterReconciler) reconcileDeleteLabel(ctx *context.ClusterContext, return nil } +// cleanLabels cleans unused labels for Tower every day. +// If an error is encountered during the cleanup process, +// it will not be retried and will be started again in the next reconcile. +func (r *ElfClusterReconciler) cleanLabels(ctx *context.ClusterContext) { + // Locking ensures that only one coroutine cleans at the same time, + if ok := acquireTicketForGCTowerLabels(ctx.ElfCluster.Spec.Tower.Server); ok { + defer releaseTicketForForGCTowerLabels(ctx.ElfCluster.Spec.Tower.Server) + } else { + return + } + + ctx.Logger.V(1).Info(fmt.Sprintf("Cleaning labels for Tower %s", ctx.ElfCluster.Spec.Tower.Server)) + + keys := []string{towerresources.GetVMLabelClusterName(), towerresources.GetVMLabelVIP(), towerresources.GetVMLabelNamespace(), towerresources.GetVMLabelManaged()} + labelIDs, err := ctx.VMService.CleanLabels(keys) + if err != nil { + ctx.Logger.Error(err, fmt.Sprintf("failed to clean labels for Tower %s", ctx.ElfCluster.Spec.Tower.Server)) + + return + } + + recordGCTimeForTowerLabels(ctx.ElfCluster.Spec.Tower.Server) + + ctx.Logger.V(1).Info(fmt.Sprintf("Labels of Tower %s are cleaned successfully", ctx.ElfCluster.Spec.Tower.Server), "labelCount", len(labelIDs)) +} + func (r *ElfClusterReconciler) reconcileNormal(ctx *context.ClusterContext) (reconcile.Result, error) { //nolint:unparam ctx.Logger.Info("Reconciling ElfCluster") @@ -298,6 +324,8 @@ func (r *ElfClusterReconciler) reconcileNormal(ctx *context.ClusterContext) (rec return reconcile.Result{}, nil } + r.cleanLabels(ctx) + // Wait until the API server is online and accessible. if !r.isAPIServerOnline(ctx) { return reconcile.Result{}, nil diff --git a/controllers/elfcluster_controller_test.go b/controllers/elfcluster_controller_test.go index b9aa5158..5ec4750c 100644 --- a/controllers/elfcluster_controller_test.go +++ b/controllers/elfcluster_controller_test.go @@ -135,6 +135,9 @@ var _ = Describe("ElfClusterReconciler", func() { } fake.InitClusterOwnerReferences(ctrlContext, elfCluster, cluster) + keys := []string{towerresources.GetVMLabelClusterName(), towerresources.GetVMLabelVIP(), towerresources.GetVMLabelNamespace(), towerresources.GetVMLabelManaged()} + mockVMService.EXPECT().CleanLabels(keys).Return(nil, nil) + elfClusterKey := capiutil.ObjectKey(elfCluster) reconciler := &ElfClusterReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService} _, _ = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfClusterKey}) diff --git a/controllers/vm_limiter.go b/controllers/vm_limiter.go index 663583c4..99556ac2 100644 --- a/controllers/vm_limiter.go +++ b/controllers/vm_limiter.go @@ -144,6 +144,61 @@ func getKeyForVMDuplicate(name string) string { return fmt.Sprintf("vm:duplicate:%s", name) } +/* Label */ + +var labelOperationLock sync.Mutex + +func getKeyForGCLabel(tower string) string { + return fmt.Sprintf("label:gc:%s", tower) +} + +func getKeyForGCLabelTime(tower string) string { + return fmt.Sprintf("label:gc:time:%s", tower) +} + +// acquireTicketForGCTowerLabels returns whether label gc operation can be performed. +func acquireTicketForGCTowerLabels(tower string) bool { + labelOperationLock.Lock() + defer labelOperationLock.Unlock() + + if _, found := inMemoryCache.Get(getKeyForGCLabel(tower)); found { + return false + } + + key := getKeyForGCLabelTime(tower) + if val, found := inMemoryCache.Get(key); found { + lastGCTime, ok := val.(time.Time) + if ok { + if time.Now().Before(lastGCTime.Add(24 * time.Hour)) { + return false + } + } else { + // Delete unexpected data. + inMemoryCache.Delete(key) + } + } + + inMemoryCache.Set(getKeyForGCLabel(tower), nil, cache.NoExpiration) + + return true +} + +// releaseTicketForForGCTowerLabels releases the Tower whose labels are being cleared. +func releaseTicketForForGCTowerLabels(tower string) { + labelOperationLock.Lock() + defer labelOperationLock.Unlock() + + inMemoryCache.Delete(getKeyForGCLabel(tower)) +} + +// recordGCTimeForTowerLabels records the last GC label time of the specified Tower. +func recordGCTimeForTowerLabels(tower string) { + labelOperationLock.Lock() + defer labelOperationLock.Unlock() + + inMemoryCache.Set(getKeyForGCLabelTime(tower), time.Now(), cache.NoExpiration) +} + /* GPU */ type lockedGPUDevice struct { diff --git a/pkg/service/mock_services/vm_mock.go b/pkg/service/mock_services/vm_mock.go index fd096d8d..1411d174 100644 --- a/pkg/service/mock_services/vm_mock.go +++ b/pkg/service/mock_services/vm_mock.go @@ -83,6 +83,21 @@ func (mr *MockVMServiceMockRecorder) AddVMsToPlacementGroup(placementGroup, vmID return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddVMsToPlacementGroup", reflect.TypeOf((*MockVMService)(nil).AddVMsToPlacementGroup), placementGroup, vmIDs) } +// CleanLabels mocks base method. +func (m *MockVMService) CleanLabels(keys []string) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CleanLabels", keys) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CleanLabels indicates an expected call of CleanLabels. +func (mr *MockVMServiceMockRecorder) CleanLabels(keys interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanLabels", reflect.TypeOf((*MockVMService)(nil).CleanLabels), keys) +} + // Clone mocks base method. func (m *MockVMService) Clone(elfCluster *v1beta1.ElfCluster, elfMachine *v1beta1.ElfMachine, bootstrapData, host string, machineGPUDevices []*service.GPUDeviceInfo) (*models.WithTaskVM, error) { m.ctrl.T.Helper() diff --git a/pkg/service/vm.go b/pkg/service/vm.go index 0f382672..9fde9b94 100644 --- a/pkg/service/vm.go +++ b/pkg/service/vm.go @@ -67,6 +67,7 @@ type VMService interface { GetVlan(id string) (*models.Vlan, error) UpsertLabel(key, value string) (*models.Label, error) DeleteLabel(key, value string, strict bool) (string, error) + CleanLabels(keys []string) ([]string, error) AddLabelsToVM(vmID string, labels []string) (*models.Task, error) CreateVMPlacementGroup(name, clusterID string, vmPolicy models.VMVMPolicy) (*models.WithTaskVMPlacementGroup, error) GetVMPlacementGroup(name string) (*models.VMPlacementGroup, error) @@ -769,7 +770,7 @@ func (svr *TowerVMService) DeleteLabel(key, value string, strict bool) (string, if strict { deleteLabelParams.RequestBody.Where.AND = append( deleteLabelParams.RequestBody.Where.AND, - &models.LabelWhereInput{VMNum: TowerInt32(0)}, + &models.LabelWhereInput{TotalNum: TowerInt32(0)}, ) } @@ -785,6 +786,30 @@ func (svr *TowerVMService) DeleteLabel(key, value string, strict bool) (string, return *deleteLabelResp.Payload[0].Data.ID, nil } +// CleanLabels deletes specified unused labels. +// CleanLabels is used to clean unused labels regularly and should not be called frequently. +func (svr *TowerVMService) CleanLabels(keys []string) ([]string, error) { + deleteLabelParams := clientlabel.NewDeleteLabelParams() + deleteLabelParams.RequestBody = &models.LabelDeletionParams{ + Where: &models.LabelWhereInput{ + KeyIn: keys, + CreatedAtLte: TowerString(time.Now().Add(-24 * time.Hour).UTC().Format(time.RFC3339)), + }, + } + + deleteLabelResp, err := svr.Session.Label.DeleteLabel(deleteLabelParams) + if err != nil { + return nil, err + } + + labelIDs := make([]string, len(deleteLabelResp.Payload)) + for i := 0; i < len(deleteLabelResp.Payload); i++ { + labelIDs[i] = *deleteLabelResp.Payload[i].Data.ID + } + + return labelIDs, nil +} + // AddLabelsToVM adds a label to a VM. func (svr *TowerVMService) AddLabelsToVM(vmID string, labelIds []string) (*models.Task, error) { addLabelsParams := clientlabel.NewAddLabelsToResourcesParams()