Skip to content

Commit

Permalink
[local] Add support for doing refresh before IncreaseSize
Browse files Browse the repository at this point in the history
  • Loading branch information
domenicbozzuto authored and rrangith committed Oct 9, 2024
1 parent 5195fce commit 3c3b4bb
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 5 deletions.
8 changes: 5 additions & 3 deletions cluster-autoscaler/cloudprovider/azure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ When using K8s versions older than v1.18, we recommend using at least **v.1.17.5
As for CA versions older than 1.18, we recommend using at least **v.1.17.2, v1.16.5, v1.15.6**.

In addition, cluster-autoscaler exposes a `AZURE_VMSS_CACHE_TTL` environment variable which controls the rate of `GetVMScaleSet` being made. By default, this is 15 seconds but setting this to a higher value such as 60 seconds can protect against API throttling. The caches used are proactively incremented and decremented with the scale up and down operations and this higher value doesn't have any noticeable impact on performance. **Note that the value is in seconds**
`AZURE_VMSS_CACHE_FORCE_REFRESH` can also be set to always refresh the cache before an upscale.

| Config Name | Default | Environment Variable | Cloud Config File |
| ----------- | ------- | -------------------- | ----------------- |
| VmssCacheTTL | 60 | AZURE_VMSS_CACHE_TTL | vmssCacheTTL |
| Config Name | Default | Environment Variable | Cloud Config File |
|-----------------------|--------|--------------------------------|------------------------|
| VmssCacheTTL | 60 | AZURE_VMSS_CACHE_TTL | vmssCacheTTL |
| VmssCacheForceRefresh | false | AZURE_VMSS_CACHE_FORCE_REFRESH | vmssCacheForceRefresh |

The `AZURE_VMSS_VMS_CACHE_TTL` environment variable affects the `GetScaleSetVms` (VMSS VM List) calls rate. The default value is 300 seconds.
A configurable jitter (`AZURE_VMSS_VMS_CACHE_JITTER` environment variable, default 0) expresses the maximum number of second that will be subtracted from that initial VMSS cache TTL after a new VMSS is discovered by the cluster-autoscaler: this can prevent a dogpile effect on clusters having many VMSS.
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type Config struct {
Deployment string `json:"deployment" yaml:"deployment"`
DeploymentParameters map[string]interface{} `json:"deploymentParameters" yaml:"deploymentParameters"`

// VMSS cache option to force refresh of the vmss capacity before an upscale
VmssCacheForceRefresh bool `json:"vmssCacheForceRefresh" yaml:"vmssCacheForceRefresh"`
// Jitter in seconds subtracted from the VMSS cache TTL before the first refresh
VmssVmsCacheJitter int `json:"vmssVmsCacheJitter" yaml:"vmssVmsCacheJitter"`

Expand Down Expand Up @@ -220,6 +222,9 @@ func BuildAzureConfig(configReader io.Reader) (*Config, error) {
if _, err = assignFromEnvIfExists(&cfg.UserAssignedIdentityID, "ARM_USER_ASSIGNED_IDENTITY_ID"); err != nil {
return nil, err
}
if _, err = assignBoolFromEnvIfExists(&cfg.VmssCacheForceRefresh, "AZURE_VMSS_CACHE_FORCE_REFRESH"); err != nil {
return nil, err
}
if _, err = assignIntFromEnvIfExists(&cfg.VmssCacheTTLInSeconds, "AZURE_VMSS_CACHE_TTL_IN_SECONDS"); err != nil {
return nil, err
}
Expand Down
25 changes: 24 additions & 1 deletion cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,15 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error {
return fmt.Errorf("size increase must be positive")
}

if scaleSet.manager.config.VmssCacheForceRefresh {
if err := scaleSet.manager.forceRefresh(); err != nil {
klog.Errorf("VMSS %s: force refresh of VMSSs failed: %v", scaleSet.Name, err)
return err
}
klog.V(4).Infof("VMSS: %s, forced refreshed before checking size", scaleSet.Name)
scaleSet.invalidateLastSizeRefreshWithLock()
}

size, err := scaleSet.getScaleSetSize()
if err != nil {
return err
Expand Down Expand Up @@ -437,9 +446,20 @@ func (scaleSet *ScaleSet) createOrUpdateInstances(vmssInfo *compute.VirtualMachi

// Update the new capacity to cache.
vmssSizeMutex.Lock()
vmssInfo.Sku.Capacity = &newSize
rejectIncrease := false
lastCapacity := *vmssInfo.Sku.Capacity
if newSize < lastCapacity {
rejectIncrease = true
} else {
vmssInfo.Sku.Capacity = &newSize
}
vmssSizeMutex.Unlock()

if rejectIncrease {
klog.Warningf("VMSS %s: rejecting size decrease from %d to %d", scaleSet.Name, lastCapacity, newSize)
return fmt.Errorf("vmss %s: rejected size decrease from %d to %d", scaleSet.Name, lastCapacity, newSize)
}

// Compose a new VMSS for updating.
op := compute.VirtualMachineScaleSet{
Name: vmssInfo.Name,
Expand Down Expand Up @@ -468,6 +488,7 @@ func (scaleSet *ScaleSet) createOrUpdateInstances(vmssInfo *compute.VirtualMachi
// Proactively set the VMSS size so autoscaler makes better decisions.
scaleSet.curSize = newSize
scaleSet.lastSizeRefresh = time.Now()
klog.V(3).Infof("VMSS %s: requested size increase from %d to %d", scaleSet.Name, lastCapacity, newSize)

go scaleSet.waitForCreateOrUpdateInstances(future)
return nil
Expand Down Expand Up @@ -536,8 +557,10 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered
if !hasUnregisteredNodes {
scaleSet.sizeMutex.Lock()
scaleSet.curSize -= int64(len(instanceIDs))
curSize := scaleSet.curSize
scaleSet.lastSizeRefresh = time.Now()
scaleSet.sizeMutex.Unlock()
klog.V(3).Infof("VMSS %s: had unregistered nodes, current size decremented by %d to %d", scaleSet.Name, len(instanceIDs), curSize)
}

// Proactively set the status of the instances to be deleted in cache
Expand Down
77 changes: 76 additions & 1 deletion cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func TestIncreaseSizeOnVMProvisioningFailed(t *testing.T) {
}

mockVMSSClient := mockvmssclient.NewMockInterface(ctrl)
mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil)
mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes()
mockVMSSClient.EXPECT().CreateOrUpdateAsync(gomock.Any(), manager.config.ResourceGroup, vmssName, gomock.Any()).Return(nil, nil)
mockVMSSClient.EXPECT().WaitForCreateOrUpdateResult(gomock.Any(), gomock.Any(), manager.config.ResourceGroup).Return(&http.Response{StatusCode: http.StatusOK}, nil).AnyTimes()
manager.azClient.virtualMachineScaleSetsClient = mockVMSSClient
Expand Down Expand Up @@ -491,6 +491,81 @@ func TestIncreaseSizeOnVMSSUpdating(t *testing.T) {
assert.NoError(t, err)
}

func TestIncreaseSizeWithForceRefresh(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

orchestrationModes := [2]compute.OrchestrationMode{compute.Uniform, compute.Flexible}

expectedVMSSVMs := newTestVMSSVMList(3)
expectedVMs := newTestVMList(3)

for _, orchMode := range orchestrationModes {

provider := newTestProvider(t)
expectedScaleSets := newTestVMSSList(3, "test-asg", "eastus", orchMode)
provider.azureManager.explicitlyConfigured["test-asg"] = true
provider.azureManager.config.VmssCacheForceRefresh = true

mockVMSSClient := mockvmssclient.NewMockInterface(ctrl)
mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).Times(2)
mockVMSSClient.EXPECT().CreateOrUpdateAsync(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(nil, nil)
mockVMSSClient.EXPECT().WaitForCreateOrUpdateResult(gomock.Any(), gomock.Any(), provider.azureManager.config.ResourceGroup).Return(&http.Response{StatusCode: http.StatusOK}, nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient

if orchMode == compute.Uniform {
mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl)
mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient
} else {

provider.azureManager.config.EnableVmssFlexNodes = true
mockVMClient := mockvmclient.NewMockInterface(ctrl)
mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(expectedVMs, nil).AnyTimes()
provider.azureManager.azClient.virtualMachinesClient = mockVMClient
}
err := provider.azureManager.forceRefresh()
assert.NoError(t, err)

ss := newTestScaleSet(provider.azureManager, "test-asg-doesnt-exist")
err = ss.IncreaseSize(100)
expectedErr := fmt.Errorf("could not find vmss: test-asg-doesnt-exist")
assert.Equal(t, expectedErr, err)

registered := provider.azureManager.RegisterNodeGroup(
newTestScaleSet(provider.azureManager, "test-asg"))
assert.True(t, registered)
assert.Equal(t, len(provider.NodeGroups()), 1)

// current target size is 2.
targetSize, err := provider.NodeGroups()[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 3, targetSize)

// Simulate missing instances
mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(newTestVMSSList(1, "test-asg", "eastus", orchMode), nil).AnyTimes()
if orchMode == compute.Uniform {
mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl)
mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(newTestVMSSVMList(1), nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient
} else {
provider.azureManager.config.EnableVmssFlexNodes = true
mockVMClient := mockvmclient.NewMockInterface(ctrl)
mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(newTestVMList(1), nil).AnyTimes()
provider.azureManager.azClient.virtualMachinesClient = mockVMClient
}

// Update cache on IncreaseSize and increase back to 3 nodes.
err = provider.NodeGroups()[0].IncreaseSize(2)
assert.NoError(t, err)

// new target size should be 3.
targetSize, err = provider.NodeGroups()[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 3, targetSize)
}
}

func TestBelongs(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down

0 comments on commit 3c3b4bb

Please sign in to comment.