Skip to content

Commit

Permalink
SKS-2157: Cache placement group to reduce Tower API requests (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
haijianyang authored Dec 7, 2023
1 parent 04ef554 commit 319a4fb
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 67 deletions.
9 changes: 6 additions & 3 deletions controllers/elfcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,13 @@ func (r *ElfClusterReconciler) reconcileDelete(ctx *context.ClusterContext) (rec

func (r *ElfClusterReconciler) reconcileDeleteVMPlacementGroups(ctx *context.ClusterContext) (bool, error) {
placementGroupPrefix := towerresources.GetVMPlacementGroupNamePrefix(ctx.Cluster)
if pgCount, err := ctx.VMService.DeleteVMPlacementGroupsByNamePrefix(ctx, placementGroupPrefix); err != nil {
if pgNames, err := ctx.VMService.DeleteVMPlacementGroupsByNamePrefix(ctx, placementGroupPrefix); err != nil {
return false, err
} else if pgCount > 0 {
ctx.Logger.Info(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix), "count", pgCount)
} else if len(pgNames) > 0 {
ctx.Logger.Info(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix), "count", len(pgNames))

// Delete placement group caches.
delPGCaches(pgNames)

return false, nil
} else {
Expand Down
9 changes: 6 additions & 3 deletions controllers/elfcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,26 @@ var _ = Describe("ElfClusterReconciler", func() {
reconciler := &ElfClusterReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
elfClusterKey := capiutil.ObjectKey(elfCluster)

mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), towerresources.GetVMPlacementGroupNamePrefix(cluster)).Return(0, errors.New("some error"))
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), towerresources.GetVMPlacementGroupNamePrefix(cluster)).Return(nil, errors.New("some error"))

result, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfClusterKey})
Expect(result).To(BeZero())
Expect(err).To(HaveOccurred())

task.Status = models.NewTaskStatus(models.TaskStatusSUCCESSED)
logBuffer.Reset()
pg := fake.NewVMPlacementGroup(nil)
setPGCache(pg)
placementGroupPrefix := towerresources.GetVMPlacementGroupNamePrefix(cluster)
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(1, nil)
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return([]string{*pg.Name}, nil)
result, err = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfClusterKey})
Expect(result).NotTo(BeZero())
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix)))
Expect(getPGFromCache(*pg.Name)).To(BeNil())

logBuffer.Reset()
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(0, nil)
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(nil, nil)
mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelClusterName(), elfCluster.Name, true).Return("labelid", nil)
mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelVIP(), elfCluster.Spec.ControlPlaneEndpoint.Host, false).Return("labelid", nil)
mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelNamespace(), elfCluster.Namespace, true).Return("", nil)
Expand Down
18 changes: 17 additions & 1 deletion controllers/elfmachine_controller_placement_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (r *ElfMachineReconciler) createPlacementGroup(ctx *context.MachineContext,

ctx.Logger.Info("Creating placement group succeeded", "taskID", *task.ID, "placementGroup", placementGroupName)

placementGroup, err := ctx.VMService.GetVMPlacementGroup(placementGroupName)
placementGroup, err := r.getPlacementGroup(ctx, placementGroupName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -345,7 +345,14 @@ func (r *ElfMachineReconciler) getAvailableHostsForVM(ctx *context.MachineContex
return availableHosts
}

// getPlacementGroup returns the specified placement group.
// getPlacementGroup will get the placement group from the cache first.
// If the placement group does not exist in the cache, it will be fetched from Tower and saved to the cache(expiration time is 10s).
func (r *ElfMachineReconciler) getPlacementGroup(ctx *context.MachineContext, placementGroupName string) (*models.VMPlacementGroup, error) {
if placementGroup := getPGFromCache(placementGroupName); placementGroup != nil {
return placementGroup, nil
}

placementGroup, err := ctx.VMService.GetVMPlacementGroup(placementGroupName)
if err != nil {
return nil, err
Expand All @@ -358,6 +365,9 @@ func (r *ElfMachineReconciler) getPlacementGroup(ctx *context.MachineContext, pl
return nil, nil
}

// Save placement group cache.
setPGCache(placementGroup)

return placementGroup, nil
}

Expand Down Expand Up @@ -563,6 +573,9 @@ func (r *ElfMachineReconciler) addVMsToPlacementGroup(ctx *context.MachineContex
return err
}

// Delete placement group cache.
delPGCaches([]string{*placementGroup.Name})

taskID := *task.ID
task, err = ctx.VMService.WaitTask(ctx, taskID, config.WaitTaskTimeoutForPlacementGroupOperation, config.WaitTaskInterval)
if err != nil {
Expand Down Expand Up @@ -638,6 +651,9 @@ func (r *ElfMachineReconciler) deletePlacementGroup(ctx *context.MachineContext)
return false, nil
} else {
ctx.Logger.Info(fmt.Sprintf("Placement group %s deleted", *placementGroup.Name))

// Delete placement group cache.
delPGCaches([]string{*placementGroup.Name})
}

return true, nil
Expand Down
43 changes: 36 additions & 7 deletions controllers/elfmachine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ var _ = Describe("ElfMachineReconciler", func() {
})

It("should create a new VM if none exists", func() {
resetVMTaskErrorCache()
resetMemoryCache()
vm := fake.NewTowerVM()
vm.Name = &elfMachine.Name
elfCluster.Spec.Cluster = clusterKey
Expand Down Expand Up @@ -297,7 +297,7 @@ var _ = Describe("ElfMachineReconciler", func() {
Expect(reconciler.Client.Get(reconciler, elfMachineKey, elfMachine)).To(Succeed())
Expect(elfMachine.Status.VMRef).To(Equal(*vm.ID))
Expect(elfMachine.Status.TaskRef).To(Equal(*task.ID))
resetVMTaskErrorCache()
resetMemoryCache()
})

It("should recover from lost task", func() {
Expand Down Expand Up @@ -822,7 +822,7 @@ var _ = Describe("ElfMachineReconciler", func() {

Context("powerOnVM", func() {
It("should", func() {
resetVMTaskErrorCache()
resetMemoryCache()
vm := fake.NewTowerVM()
vm.Host = &models.NestedHost{ID: service.TowerString(fake.ID())}
elfMachine.Status.VMRef = *vm.LocalID
Expand All @@ -844,7 +844,7 @@ var _ = Describe("ElfMachineReconciler", func() {
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring("and the retry silence period passes, will try to power on the VM again"))
expectConditions(elfMachine, []conditionAssertion{{infrav1.VMProvisionedCondition, corev1.ConditionFalse, clusterv1.ConditionSeverityInfo, infrav1.PoweringOnReason}})
resetVMTaskErrorCache()
resetMemoryCache()

// GPU
unexpectedError := errors.New("unexpected error")
Expand Down Expand Up @@ -1136,12 +1136,14 @@ var _ = Describe("ElfMachineReconciler", func() {
mockVMService.EXPECT().FindByIDs([]string{*vm2.ID}).Return([]*models.VM{vm2}, nil)
mockVMService.EXPECT().AddVMsToPlacementGroup(placementGroup, gomock.Any()).Return(task, nil)
mockVMService.EXPECT().WaitTask(gomock.Any(), *task.ID, config.WaitTaskTimeoutForPlacementGroupOperation, config.WaitTaskInterval).Return(task, nil)
setPGCache(placementGroup)

reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
ok, err := reconciler.joinPlacementGroup(machineContext, vm)
Expect(ok).To(BeTrue())
Expect(err).To(BeZero())
Expect(logBuffer.String()).To(ContainSubstring("Updating placement group succeeded"))
Expect(getPGFromCache(*placementGroup.Name)).To(BeNil())
})

It("should not migrate VM when VM is running and KCP is in rolling update", func() {
Expand Down Expand Up @@ -2670,10 +2672,12 @@ var _ = Describe("ElfMachineReconciler", func() {
mockVMService.EXPECT().GetVMPlacementGroup(placementGroupName).Return(placementGroup, nil)
mockVMService.EXPECT().DeleteVMPlacementGroupByID(gomock.Any(), *placementGroup.ID).Return(true, nil)

setPGCache(placementGroup)
reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
ok, err = reconciler.deletePlacementGroup(machineContext)
Expect(ok).To(BeTrue())
Expect(err).NotTo(HaveOccurred())
Expect(getPGFromCache(*placementGroup.Name)).To(BeNil())

md.DeletionTimestamp = nil
md.Spec.Replicas = pointer.Int32(0)
Expand Down Expand Up @@ -2952,7 +2956,7 @@ var _ = Describe("ElfMachineReconciler", func() {

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
resetVMTaskErrorCache()
resetMemoryCache()
mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(nil, errors.New(service.VMPlacementGroupNotFound))
mockVMService.EXPECT().GetCluster(elfCluster.Spec.Cluster).Return(towerCluster, nil)
mockVMService.EXPECT().CreateVMPlacementGroup(gomock.Any(), *towerCluster.ID, towerresources.GetVMPlacementGroupPolicy(machine)).Return(withTaskVMPlacementGroup, nil)
Expand All @@ -2966,7 +2970,7 @@ var _ = Describe("ElfMachineReconciler", func() {

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
resetVMTaskErrorCache()
resetMemoryCache()
mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(nil, errors.New(service.VMPlacementGroupNotFound))
mockVMService.EXPECT().GetCluster(elfCluster.Spec.Cluster).Return(towerCluster, nil)
mockVMService.EXPECT().CreateVMPlacementGroup(gomock.Any(), *towerCluster.ID, towerresources.GetVMPlacementGroupPolicy(machine)).Return(withTaskVMPlacementGroup, nil)
Expand All @@ -2990,6 +2994,31 @@ var _ = Describe("ElfMachineReconciler", func() {
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring(fmt.Sprintf("Tower has duplicate placement group, skip creating placement group %s", placementGroupName)))
})

It("should save and get placement group cache", func() {
ctrlContext := newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, md)
machineContext := newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService)
fake.InitOwnerReferences(ctrlContext, elfCluster, cluster, elfMachine, machine)
placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctrlContext.Client, machine, cluster)
Expect(err).NotTo(HaveOccurred())
placementGroup := fake.NewVMPlacementGroup(nil)
placementGroup.Name = service.TowerString(placementGroupName)

mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(placementGroup, nil)
Expect(getPGFromCache(*placementGroup.Name)).To(BeNil())
reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
pg, err := reconciler.getPlacementGroup(machineContext, placementGroupName)
Expect(err).To(BeZero())
Expect(pg).To(Equal(placementGroup))
Expect(getPGFromCache(*placementGroup.Name)).To(Equal(placementGroup))

// Use cache
reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
pg, err = reconciler.getPlacementGroup(machineContext, placementGroupName)
Expect(err).To(BeZero())
Expect(pg).To(Equal(placementGroup))
Expect(getPGFromCache(*placementGroup.Name)).To(Equal(placementGroup))
})
})

Context("Reconcile static IP allocation", func() {
Expand Down Expand Up @@ -3123,7 +3152,7 @@ var _ = Describe("ElfMachineReconciler", func() {
It("should handle failed/succeeded task", func() {
elfMachine.Spec.GPUDevices = []infrav1.GPUPassthroughDeviceSpec{{Model: "A16", Count: 1}}

resetVMTaskErrorCache()
resetMemoryCache()
task := fake.NewTowerTask()
task.Status = models.NewTaskStatus(models.TaskStatusFAILED)
elfMachine.Status.TaskRef = *task.ID
Expand Down
52 changes: 43 additions & 9 deletions controllers/tower_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func isELFScheduleVMErrorRecorded(ctx *context.MachineContext) (bool, string, er
func recordElfClusterMemoryInsufficient(ctx *context.MachineContext, isInsufficient bool) {
key := getKeyForInsufficientMemoryError(ctx.ElfCluster.Spec.Cluster)
if isInsufficient {
vmTaskErrorCache.Set(key, newClusterResource(), resourceDuration)
inMemoryCache.Set(key, newClusterResource(), resourceDuration)
} else {
vmTaskErrorCache.Delete(key)
inMemoryCache.Delete(key)
}
}

Expand All @@ -94,9 +94,9 @@ func recordPlacementGroupPolicyNotSatisfied(ctx *context.MachineContext, isPGPol

key := getKeyForDuplicatePlacementGroupError(placementGroupName)
if isPGPolicyNotSatisfied {
vmTaskErrorCache.Set(key, newClusterResource(), resourceDuration)
inMemoryCache.Set(key, newClusterResource(), resourceDuration)
} else {
vmTaskErrorCache.Delete(key)
inMemoryCache.Delete(key)
}

return nil
Expand Down Expand Up @@ -146,13 +146,13 @@ func canRetry(key string) bool {
}

func getClusterResource(key string) *clusterResource {
if val, found := vmTaskErrorCache.Get(key); found {
if val, found := inMemoryCache.Get(key); found {
if resource, ok := val.(*clusterResource); ok {
return resource
}

// Delete unexpected data.
vmTaskErrorCache.Delete(key)
inMemoryCache.Delete(key)
}

return nil
Expand All @@ -166,6 +166,40 @@ func getKeyForDuplicatePlacementGroupError(placementGroup string) string {
return fmt.Sprintf("pg:duplicate:%s", placementGroup)
}

// pgCacheDuration is the lifespan of placement group cache.
const pgCacheDuration = 20 * time.Second

func getKeyForPGCache(pgName string) string {
return fmt.Sprintf("pg:%s:cache", pgName)
}

// setPGCache saves the specified placement group to the memory,
// which can reduce access to the Tower service.
func setPGCache(pg *models.VMPlacementGroup) {
inMemoryCache.Set(getKeyForPGCache(*pg.Name), *pg, gpuCacheDuration)
}

// delPGCaches deletes the specified placement group caches.
func delPGCaches(pgNames []string) {
for i := 0; i < len(pgNames); i++ {
inMemoryCache.Delete(getKeyForPGCache(pgNames[i]))
}
}

// getPGFromCache gets the specified placement group from the memory.
func getPGFromCache(pgName string) *models.VMPlacementGroup {
key := getKeyForPGCache(pgName)
if val, found := inMemoryCache.Get(key); found {
if pg, ok := val.(models.VMPlacementGroup); ok {
return &pg
}
// Delete unexpected data.
inMemoryCache.Delete(key)
}

return nil
}

/* GPU */

// gpuCacheDuration is the lifespan of gpu cache.
Expand All @@ -179,7 +213,7 @@ func getKeyForGPUVMInfo(gpuID string) string {
// which can reduce access to the Tower service.
func setGPUVMInfosCache(gpuVMInfos service.GPUVMInfos) {
gpuVMInfos.Iterate(func(g *models.GpuVMInfo) {
vmTaskErrorCache.Set(getKeyForGPUVMInfo(*g.ID), *g, gpuCacheDuration)
inMemoryCache.Set(getKeyForGPUVMInfo(*g.ID), *g, gpuCacheDuration)
})
}

Expand All @@ -188,12 +222,12 @@ func getGPUVMInfosFromCache(gpuIDs []string) service.GPUVMInfos {
gpuVMInfos := service.NewGPUVMInfos()
for i := 0; i < len(gpuIDs); i++ {
key := getKeyForGPUVMInfo(gpuIDs[i])
if val, found := vmTaskErrorCache.Get(key); found {
if val, found := inMemoryCache.Get(key); found {
if gpuVMInfo, ok := val.(models.GpuVMInfo); ok {
gpuVMInfos.Insert(&gpuVMInfo)
}
// Delete unexpected data.
vmTaskErrorCache.Delete(key)
inMemoryCache.Delete(key)
}
}

Expand Down
Loading

0 comments on commit 319a4fb

Please sign in to comment.