Skip to content

Commit

Permalink
Locking to avoid concurrent label operations
Browse files Browse the repository at this point in the history
  • Loading branch information
haijianyang committed Jan 8, 2024
1 parent 78b9763 commit c03ff2c
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 44 deletions.
88 changes: 74 additions & 14 deletions controllers/elfmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,9 +1190,9 @@ func (r *ElfMachineReconciler) reconcileLabels(ctx *context.MachineContext, vm *
capeManagedLabel := getLabelFromCache(capeManagedLabelKey)
if capeManagedLabel == nil {
var err error
capeManagedLabel, err = ctx.VMService.UpsertLabel(capeManagedLabelKey, "true")
capeManagedLabel, err = r.upsertLabel(ctx, capeManagedLabelKey, "true")
if err != nil {
return false, errors.Wrapf(err, "failed to upsert label "+towerresources.GetVMLabelManaged())
return false, err

Check warning on line 1195 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1195

Added line #L1195 was not covered by tests
}

setLabelInCache(capeManagedLabel)
Expand All @@ -1206,34 +1206,94 @@ func (r *ElfMachineReconciler) reconcileLabels(ctx *context.MachineContext, vm *
}
}

namespaceLabel, err := ctx.VMService.UpsertLabel(towerresources.GetVMLabelNamespace(), ctx.ElfMachine.Namespace)
namespaceLabel, err := r.upsertLabel(ctx, towerresources.GetVMLabelNamespace(), ctx.ElfMachine.Namespace)
if err != nil {
return false, errors.Wrapf(err, "failed to upsert label "+towerresources.GetVMLabelNamespace())
return false, err

Check warning on line 1211 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1211

Added line #L1211 was not covered by tests
}
clusterNameLabel, err := ctx.VMService.UpsertLabel(towerresources.GetVMLabelClusterName(), ctx.ElfCluster.Name)

clusterNameLabel, err := r.upsertLabel(ctx, towerresources.GetVMLabelClusterName(), ctx.ElfCluster.Name)
if err != nil {
return false, errors.Wrapf(err, "failed to upsert label "+towerresources.GetVMLabelClusterName())
return false, err

Check warning on line 1216 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1216

Added line #L1216 was not covered by tests
}

var vipLabel *models.Label
if machineutil.IsControlPlaneMachine(ctx.ElfMachine) {
vipLabel, err = ctx.VMService.UpsertLabel(towerresources.GetVMLabelVIP(), ctx.ElfCluster.Spec.ControlPlaneEndpoint.Host)
vipLabel, err = r.upsertLabel(ctx, towerresources.GetVMLabelVIP(), ctx.ElfCluster.Spec.ControlPlaneEndpoint.Host)

Check warning on line 1221 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1221

Added line #L1221 was not covered by tests
if err != nil {
return false, errors.Wrapf(err, "failed to upsert label "+towerresources.GetVMLabelVIP())
return false, err

Check warning on line 1223 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1223

Added line #L1223 was not covered by tests
}
}

labelIDs := []string{*namespaceLabel.ID, *clusterNameLabel.ID, *capeManagedLabel.ID}
labels := []*models.Label{namespaceLabel, clusterNameLabel, capeManagedLabel}
if machineutil.IsControlPlaneMachine(ctx.ElfMachine) {
labelIDs = append(labelIDs, *vipLabel.ID)
labels = append(labels, vipLabel)
}

Check warning on line 1230 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1229-L1230

Added lines #L1229 - L1230 were not covered by tests

if ok, err := r.addLabelsToVM(ctx, vm, labels); err != nil || !ok {
return ok, err
}
r.Logger.V(3).Info("Upsert labels", "labelIds", labelIDs)
_, err = ctx.VMService.AddLabelsToVM(*vm.ID, labelIDs)

return true, nil
}

func (r *ElfMachineReconciler) upsertLabel(ctx *context.MachineContext, key, value string) (*models.Label, error) {
label, err := ctx.VMService.GetLabel(key, value)
if err != nil && !service.IsLabelNotFound(err) {
return nil, err

Check warning on line 1242 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1242

Added line #L1242 was not covered by tests
} else if label != nil {
return label, nil
}

// Locking ensures that only one coroutine operates on the label at the same time,
// and concurrent operations will cause data inconsistency in the label.
labelKeyValues := []string{fmt.Sprintf("%s:%s", key, value)}
if ok := acquireTicketForLabelsOperation(labelKeyValues); ok {
defer releaseTicketForLabelsOperation(labelKeyValues)
} else {
return nil, nil
}

Check warning on line 1254 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1249-L1254

Added lines #L1249 - L1254 were not covered by tests

label, err = ctx.VMService.CreateLabel(key, value)

Check warning on line 1256 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1256

Added line #L1256 was not covered by tests
if err != nil {
delLabelCache(capeManagedLabelKey)
return nil, errors.Wrapf(err, "failed to create label %s:%s", key, value)
}

Check warning on line 1259 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1258-L1259

Added lines #L1258 - L1259 were not covered by tests

return false, err
if label == nil {
fmt.Println("********label", "null")
} else {
fmt.Println("********label", label.ID, label.Key, label.Value)
}

Check warning on line 1265 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1261-L1265

Added lines #L1261 - L1265 were not covered by tests

ctx.Logger.Info("Creating label succeeded", "id", *label.ID, "key", key, "value", value)

return label, nil

Check warning on line 1269 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1267-L1269

Added lines #L1267 - L1269 were not covered by tests
}

func (r *ElfMachineReconciler) addLabelsToVM(ctx *context.MachineContext, vm *models.VM, labels []*models.Label) (bool, error) {
labelIDs := make([]string, len(labels))
labelKeyValues := make([]string, len(labels))
for i := 0; i < len(labels); i++ {
labelIDs[i] = *labels[i].ID
labelKeyValues[i] = fmt.Sprintf("%s:%s", *labels[i].Key, *labels[i].Value)
}

// Locking ensures that only one coroutine operates on the label at the same time,
// and concurrent operations will cause data inconsistency in the label.
if ok := acquireTicketForLabelsOperation(labelKeyValues); ok {
defer releaseTicketForLabelsOperation(labelKeyValues)
} else {
return false, nil
}

Check warning on line 1286 in controllers/elfmachine_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/elfmachine_controller.go#L1285-L1286

Added lines #L1285 - L1286 were not covered by tests

_, err := ctx.VMService.AddLabelsToVM(*vm.ID, labelIDs)
if err != nil {
delLabelCache(towerresources.GetVMLabelManaged())

return false, errors.Wrapf(err, "failed to add labels %s to vm", labelKeyValues)
}

ctx.Logger.Info("Adding labels to vm succeeded", "labels", labelKeyValues)

return true, nil
}

Expand Down
20 changes: 10 additions & 10 deletions controllers/elfmachine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ var _ = Describe("ElfMachineReconciler", func() {
mockVMService.EXPECT().Get(elfMachine.Status.VMRef).Return(vm, nil)
mockVMService.EXPECT().GetVMNics(*vm.ID).Return([]*models.VMNic{nic}, nil)
mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(placementGroup, nil)
mockVMService.EXPECT().UpsertLabel(gomock.Any(), gomock.Any()).Times(3).Return(fake.NewTowerLabel(), nil)
mockVMService.EXPECT().GetLabel(gomock.Any(), gomock.Any()).Times(3).Return(fake.NewTowerLabel(), nil)
mockVMService.EXPECT().AddLabelsToVM(gomock.Any(), gomock.Any()).Times(1)
mockVMService.EXPECT().FindVMsByName(elfMachine.Name).Return(nil, nil)

Expand Down Expand Up @@ -1695,7 +1695,7 @@ var _ = Describe("ElfMachineReconciler", func() {

Context("Reconcile ElfMachine providerID", func() {
BeforeEach(func() {
mockVMService.EXPECT().UpsertLabel(gomock.Any(), gomock.Any()).Times(3).Return(fake.NewTowerLabel(), nil)
mockVMService.EXPECT().GetLabel(gomock.Any(), gomock.Any()).Times(3).Return(fake.NewTowerLabel(), nil)
mockVMService.EXPECT().AddLabelsToVM(gomock.Any(), gomock.Any()).Times(1)
})

Expand Down Expand Up @@ -1760,7 +1760,7 @@ var _ = Describe("ElfMachineReconciler", func() {

mockVMService.EXPECT().Get(elfMachine.Status.VMRef).Times(20).Return(vm, nil)
mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Times(40).Return(placementGroup, nil)
mockVMService.EXPECT().UpsertLabel(gomock.Any(), gomock.Any()).Times(60).Return(fake.NewTowerLabel(), nil)
mockVMService.EXPECT().GetLabel(gomock.Any(), gomock.Any()).Times(60).Return(fake.NewTowerLabel(), nil)
mockVMService.EXPECT().AddLabelsToVM(gomock.Any(), gomock.Any()).Times(20)
mockVMService.EXPECT().FindVMsByName(elfMachine.Name).Times(12).Return(nil, nil)

Expand Down Expand Up @@ -2189,7 +2189,7 @@ var _ = Describe("ElfMachineReconciler", func() {
mockVMService.EXPECT().Get(elfMachine.Status.VMRef).Return(vm, nil)
mockVMService.EXPECT().GetVMNics(*vm.ID).Return([]*models.VMNic{nic}, nil)
mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Times(2).Return(placementGroup, nil)
mockVMService.EXPECT().UpsertLabel(gomock.Any(), gomock.Any()).Times(3).Return(fake.NewTowerLabel(), nil)
mockVMService.EXPECT().GetLabel(gomock.Any(), gomock.Any()).Times(3).Return(fake.NewTowerLabel(), nil)
mockVMService.EXPECT().AddLabelsToVM(gomock.Any(), gomock.Any()).Times(1)
mockVMService.EXPECT().FindVMsByName(elfMachine.Name).Return(nil, nil)

Expand Down Expand Up @@ -3473,19 +3473,19 @@ var _ = Describe("ElfMachineReconciler", func() {

unexpectedError := errors.New("unexpected error")
setLabelInCache(capeManagedLabel)
mockVMService.EXPECT().UpsertLabel(*namespaceLabel.Key, *namespaceLabel.Value).Return(namespaceLabel, nil)
mockVMService.EXPECT().UpsertLabel(*clusterNameLabel.Key, *clusterNameLabel.Value).Return(clusterNameLabel, nil)
mockVMService.EXPECT().GetLabel(*namespaceLabel.Key, *namespaceLabel.Value).Return(namespaceLabel, nil)
mockVMService.EXPECT().GetLabel(*clusterNameLabel.Key, *clusterNameLabel.Value).Return(clusterNameLabel, nil)
mockVMService.EXPECT().AddLabelsToVM(*vm.ID, gomock.InAnyOrder([]string{*capeManagedLabel.ID, *namespaceLabel.ID, *clusterNameLabel.ID})).Return(nil, unexpectedError)
reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
ok, err := reconciler.reconcileLabels(machineContext, vm)
Expect(ok).To(BeFalse())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(unexpectedError.Error()))
Expect(err.Error()).To(ContainSubstring(unexpectedError.Error()))
Expect(getLabelFromCache(*capeManagedLabel.Key)).To(BeNil())

mockVMService.EXPECT().UpsertLabel(*capeManagedLabel.Key, *capeManagedLabel.Value).Return(capeManagedLabel, nil)
mockVMService.EXPECT().UpsertLabel(*namespaceLabel.Key, *namespaceLabel.Value).Return(namespaceLabel, nil)
mockVMService.EXPECT().UpsertLabel(*clusterNameLabel.Key, *clusterNameLabel.Value).Return(clusterNameLabel, nil)
mockVMService.EXPECT().GetLabel(*capeManagedLabel.Key, *capeManagedLabel.Value).Return(capeManagedLabel, nil)
mockVMService.EXPECT().GetLabel(*namespaceLabel.Key, *namespaceLabel.Value).Return(namespaceLabel, nil)
mockVMService.EXPECT().GetLabel(*clusterNameLabel.Key, *clusterNameLabel.Value).Return(clusterNameLabel, nil)
mockVMService.EXPECT().AddLabelsToVM(*vm.ID, gomock.InAnyOrder([]string{*capeManagedLabel.ID, *namespaceLabel.ID, *clusterNameLabel.ID})).Return(nil, nil)
reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
ok, err = reconciler.reconcileLabels(machineContext, vm)
Expand Down
36 changes: 36 additions & 0 deletions controllers/vm_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,42 @@ func getKeyForVMDuplicate(name string) string {
return fmt.Sprintf("vm:duplicate:%s", name)
}

/* Label */

var labelOperationLock sync.Mutex

func getKeyForLabel(keyValue string) string {
return fmt.Sprintf("label:%s", keyValue)
}

// acquireTicketForLabelsOperation returns whether label operation can be performed.
func acquireTicketForLabelsOperation(keyValues []string) bool {
labelOperationLock.Lock()
defer labelOperationLock.Unlock()

for i := 0; i < len(keyValues); i++ {
if _, found := inMemoryCache.Get(getKeyForLabel(keyValues[i])); found {
return false
}

Check warning on line 163 in controllers/vm_limiter.go

View check run for this annotation

Codecov / codecov/patch

controllers/vm_limiter.go#L162-L163

Added lines #L162 - L163 were not covered by tests
}

for i := 0; i < len(keyValues); i++ {
inMemoryCache.Set(getKeyForLabel(keyValues[i]), nil, cache.NoExpiration)
}

return true
}

// releaseTicketForLabelsOperation releases the label being operated.
func releaseTicketForLabelsOperation(keyValues []string) {
labelOperationLock.Lock()
defer labelOperationLock.Unlock()

for i := 0; i < len(keyValues); i++ {
inMemoryCache.Delete(getKeyForLabel(keyValues[i]))
}
}

/* GPU */

type lockedGPUDevice struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
VMPlacementGroupDuplicate = "PLACEMENT_GROUP_DUPLICATE_NAME"
LabelCreateFailed = "LABEL_CREATE_FAILED"
LabelAddFailed = "LABEL_ADD_FAILED"
LabelNotFound = "LABEL_NOT_FOUND"
CloudInitError = "VM_CLOUD_INIT_CONFIG_ERROR"
MemoryInsufficientError = "HostAvailableMemoryFilter"
PlacementGroupError = "PlacementGroupFilter" // SMTX OS <= 5.0.4
Expand All @@ -49,6 +50,10 @@ func IsVMNotFound(err error) bool {
return strings.Contains(err.Error(), VMNotFound)
}

func IsLabelNotFound(err error) bool {
return strings.Contains(err.Error(), LabelNotFound)

Check warning on line 54 in pkg/service/errors.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/errors.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}

func IsVMDuplicate(err error) bool {
return strings.Contains(err.Error(), VMDuplicate)
}
Expand Down
45 changes: 30 additions & 15 deletions pkg/service/mock_services/vm_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions pkg/service/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type VMService interface {
GetHost(id string) (*models.Host, error)
GetHostsByCluster(clusterID string) (Hosts, error)
GetVlan(id string) (*models.Vlan, error)
UpsertLabel(key, value string) (*models.Label, error)
GetLabel(key, value string) (*models.Label, error)
CreateLabel(key, value string) (*models.Label, error)
DeleteLabel(key, value string, strict bool) (string, error)
AddLabelsToVM(vmID string, labels []string) (*models.Task, error)
CreateVMPlacementGroup(name, clusterID string, vmPolicy models.VMVMPolicy) (*models.WithTaskVMPlacementGroup, error)
Expand Down Expand Up @@ -722,31 +723,38 @@ func (svr *TowerVMService) WaitTask(ctx goctx.Context, id string, timeout, inter
return task, err
}

// UpsertLabel upserts a label.
func (svr *TowerVMService) UpsertLabel(key, value string) (*models.Label, error) {
func (svr *TowerVMService) GetLabel(key, value string) (*models.Label, error) {

Check warning on line 726 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L726

Added line #L726 was not covered by tests
getLabelParams := clientlabel.NewGetLabelsParams()
getLabelParams.RequestBody = &models.GetLabelsRequestBody{
Where: &models.LabelWhereInput{
Key: TowerString(key),
Value: TowerString(value),
},
}

Check warning on line 734 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L734

Added line #L734 was not covered by tests
getLabelResp, err := svr.Session.Label.GetLabels(getLabelParams)
if err != nil {
return nil, err
}
if len(getLabelResp.Payload) > 0 {
return getLabelResp.Payload[0], nil

if len(getLabelResp.Payload) == 0 {
return nil, errors.New(LabelNotFound)

Check warning on line 741 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L740-L741

Added lines #L740 - L741 were not covered by tests
}

return getLabelResp.Payload[0], nil

Check warning on line 744 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L744

Added line #L744 was not covered by tests
}

func (svr *TowerVMService) CreateLabel(key, value string) (*models.Label, error) {

Check warning on line 747 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L747

Added line #L747 was not covered by tests
createLabelParams := clientlabel.NewCreateLabelParams()
createLabelParams.RequestBody = []*models.LabelCreationParams{
{Key: &key, Value: &value},
}

Check warning on line 752 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L752

Added line #L752 was not covered by tests
createLabelResp, err := svr.Session.Label.CreateLabel(createLabelParams)
if err != nil {
return nil, err
}

if len(createLabelResp.Payload) == 0 {
return nil, errors.New(LabelCreateFailed)
}
Expand Down

0 comments on commit c03ff2c

Please sign in to comment.