From c8dc91368b772a04ea1a39cb5f69ee6344d093c2 Mon Sep 17 00:00:00 2001 From: linglingye001 <143174321+linglingye001@users.noreply.github.com> Date: Mon, 26 Aug 2024 13:39:10 +0800 Subject: [PATCH] Load balance support (#65) * load balance support * update load balance tracing * upgrade golangci version * update * skip cache for golangci lint * update * update * update envtest version * update envtest tool version * add cancel context * resolve comments * fix lint error * no need to update isFailoverRequest when succeed to get settings --- .github/workflows/ci.yaml | 4 +- .github/workflows/golangci-lint.yml | 2 +- api/v1/azureappconfigurationprovider_types.go | 4 +- ...fig.io_azureappconfigurationproviders.yaml | 3 + internal/controller/suite_test.go | 12 +- .../configuraiton_setting_loader_test.go | 2 +- .../loader/configuration_client_manager.go | 4 + .../loader/configuration_setting_loader.go | 63 ++++++- internal/loader/request_tracing.go | 154 +++++++++--------- 9 files changed, 164 insertions(+), 84 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 000d840..977332a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -25,7 +25,7 @@ jobs: - name: Run Test run: | echo 'Install evntest tool' - ENVTEST_VERSION=release-0.16 + ENVTEST_VERSION=release-0.18 GOBIN=$GITHUB_WORKSPACE go install sigs.k8s.io/controller-runtime/tools/setup-envtest@$ENVTEST_VERSION echo 'Run tests' - KUBEBUILDER_ASSETS=$($GITHUB_WORKSPACE/setup-envtest use 1.19 -p path) CGO_ENABLED=0 go test ./... -coverprofile cover.out \ No newline at end of file + KUBEBUILDER_ASSETS=$($GITHUB_WORKSPACE/setup-envtest use 1.30.0 -p path) CGO_ENABLED=0 go test ./... -coverprofile cover.out \ No newline at end of file diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 8c9c9f5..8cab1c9 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -26,5 +26,5 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v6 with: - version: v1.59 + version: v1.59.1 args: --timeout 10m \ No newline at end of file diff --git a/api/v1/azureappconfigurationprovider_types.go b/api/v1/azureappconfigurationprovider_types.go index 65675ac..29cf506 100644 --- a/api/v1/azureappconfigurationprovider_types.go +++ b/api/v1/azureappconfigurationprovider_types.go @@ -32,7 +32,9 @@ type AzureAppConfigurationProviderSpec struct { // +kubebuilder:validation:Format=uri Endpoint *string `json:"endpoint,omitempty"` // +kubebuilder:default=true - ReplicaDiscoveryEnabled bool `json:"replicaDiscoveryEnabled,omitempty"` + ReplicaDiscoveryEnabled bool `json:"replicaDiscoveryEnabled,omitempty"` + // +kubebuilder:default=false + LoadBalancingEnabled bool `json:"loadBalancingEnabled,omitempty"` ConnectionStringReference *string `json:"connectionStringReference,omitempty"` Target ConfigurationGenerationParameters `json:"target"` Auth *AzureAppConfigurationProviderAuth `json:"auth,omitempty"` diff --git a/config/crd/bases/azconfig.io_azureappconfigurationproviders.yaml b/config/crd/bases/azconfig.io_azureappconfigurationproviders.yaml index 8aa0866..3663ed1 100644 --- a/config/crd/bases/azconfig.io_azureappconfigurationproviders.yaml +++ b/config/crd/bases/azconfig.io_azureappconfigurationproviders.yaml @@ -166,6 +166,9 @@ spec: type: object type: array type: object + loadBalancingEnabled: + default: false + type: boolean replicaDiscoveryEnabled: default: true type: boolean diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index e25e4ef..9abde0b 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -19,8 +19,10 @@ limitations under the License. package controller import ( + "context" "path/filepath" "testing" + "time" "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" @@ -48,6 +50,8 @@ var k8sClient client.Client var testEnv *envtest.Environment var mockCtrl *gomock.Controller var mockConfigurationSettings *mocks.MockConfigurationSettingsRetriever +var ctx context.Context +var cancel context.CancelFunc func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -58,6 +62,8 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + ctx, cancel = context.WithCancel(context.TODO()) + By("bootstrapping test environment") testEnv = &envtest.Environment{ CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, @@ -103,8 +109,12 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { By("tearing down the test environment") + cancel() mockCtrl.Finish() err := testEnv.Stop() - + if err != nil { + time.Sleep(1 * time.Minute) + } + err = testEnv.Stop() Expect(err).NotTo(HaveOccurred()) }) diff --git a/internal/loader/configuraiton_setting_loader_test.go b/internal/loader/configuraiton_setting_loader_test.go index 70aa599..54b1b17 100644 --- a/internal/loader/configuraiton_setting_loader_test.go +++ b/internal/loader/configuraiton_setting_loader_test.go @@ -1159,7 +1159,7 @@ var _ = Describe("AppConfiguationProvider Get All Settings", func() { Expect(err).Should(BeNil()) Expect(failedClient.FailedAttempts).Should(Equal(1)) Expect(failedClient.BackOffEndTime.IsZero()).Should(BeFalse()) - Expect(succeededClient.FailedAttempts).Should(Equal(0)) + Expect(succeededClient.FailedAttempts).Should(Equal(-1)) Expect(succeededClient.BackOffEndTime.IsZero()).Should(BeTrue()) Expect(len(allSettings.ConfigMapSettings)).Should(Equal(6)) Expect(allSettings.ConfigMapSettings["someKey1"]).Should(Equal("value1")) diff --git a/internal/loader/configuration_client_manager.go b/internal/loader/configuration_client_manager.go index 8405efb..08bba42 100644 --- a/internal/loader/configuration_client_manager.go +++ b/internal/loader/configuration_client_manager.go @@ -37,6 +37,7 @@ import ( type ConfigurationClientManager struct { ReplicaDiscoveryEnabled bool + LoadBalancingEnabled bool StaticClientWrappers []*ConfigurationClientWrapper DynamicClientWrappers []*ConfigurationClientWrapper validDomain string @@ -46,6 +47,7 @@ type ConfigurationClientManager struct { id string lastFallbackClientAttempt metav1.Time lastFallbackClientRefresh metav1.Time + lastSuccessfulEndpoint string } type ConfigurationClientWrapper struct { @@ -93,6 +95,8 @@ var ( func NewConfigurationClientManager(ctx context.Context, provider acpv1.AzureAppConfigurationProvider) (ClientManager, error) { manager := &ConfigurationClientManager{ ReplicaDiscoveryEnabled: provider.Spec.ReplicaDiscoveryEnabled, + LoadBalancingEnabled: provider.Spec.LoadBalancingEnabled, + lastSuccessfulEndpoint: "", } var err error diff --git a/internal/loader/configuration_setting_loader.go b/internal/loader/configuration_setting_loader.go index 52fafa8..9380b4c 100644 --- a/internal/loader/configuration_setting_loader.go +++ b/internal/loader/configuration_setting_loader.go @@ -502,27 +502,48 @@ func (csl *ConfigurationSettingLoader) ExecuteFailoverPolicy(ctx context.Context return nil, fmt.Errorf("no client is available to connect to the target App Configuration store") } - if value, ok := os.LookupEnv(RequestTracingEnabled); ok { - if enabled, _ := strconv.ParseBool(value); enabled { - ctx = policy.WithHTTPHeader(ctx, createCorrelationContextHeader(ctx, csl.AzureAppConfigurationProvider, csl.ClientManager)) + manager, ok := csl.ClientManager.(*ConfigurationClientManager) + if csl.AzureAppConfigurationProvider.Spec.LoadBalancingEnabled && ok && manager.lastSuccessfulEndpoint != "" && len(clients) > 1 { + nextClientIndex := 0 + for _, clientWrapper := range clients { + nextClientIndex++ + if clientWrapper.Endpoint == manager.lastSuccessfulEndpoint { + break + } + } + + // If we found the last successful client,we'll rotate the list so that the next client is at the beginning + if nextClientIndex < len(clients) { + rotate(clients, nextClientIndex) } } errors := make([]error, 0) + var tracingEnabled, isFailoverRequest bool + if value, ok := os.LookupEnv(RequestTracingEnabled); ok { + tracingEnabled, _ = strconv.ParseBool(value) + } for _, clientWrapper := range clients { - successful := true + if tracingEnabled { + ctx = policy.WithHTTPHeader(ctx, createCorrelationContextHeader(ctx, csl.AzureAppConfigurationProvider, csl.ClientManager, isFailoverRequest)) + } settingsResponse, err := settingsClient.GetSettings(ctx, clientWrapper.Client) + successful := true if err != nil { successful = false updateClientBackoffStatus(clientWrapper, successful) if IsFailoverable(err) { klog.Warningf("current client of '%s' failed to get settings: %s", clientWrapper.Endpoint, err.Error()) errors = append(errors, err) + isFailoverRequest = true continue } return nil, err } + if manager, ok := csl.ClientManager.(*ConfigurationClientManager); ok { + manager.lastSuccessfulEndpoint = clientWrapper.Endpoint + } updateClientBackoffStatus(clientWrapper, successful) return settingsResponse, nil } @@ -535,8 +556,17 @@ func (csl *ConfigurationSettingLoader) ExecuteFailoverPolicy(ctx context.Context func updateClientBackoffStatus(clientWrapper *ConfigurationClientWrapper, successful bool) { if successful { clientWrapper.BackOffEndTime = metav1.Time{} - clientWrapper.FailedAttempts = 0 + // Reset FailedAttempts when client succeeded + if clientWrapper.FailedAttempts > 0 { + clientWrapper.FailedAttempts = 0 + } + // Use negative value to indicate that successful attempt + clientWrapper.FailedAttempts-- } else { + //Reset FailedAttempts when client failed + if clientWrapper.FailedAttempts < 0 { + clientWrapper.FailedAttempts = 0 + } clientWrapper.FailedAttempts++ clientWrapper.BackOffEndTime = metav1.Time{Time: metav1.Now().Add(calculateBackoffDuration(clientWrapper.FailedAttempts))} } @@ -796,3 +826,26 @@ func MergeSecret(secret map[string]corev1.Secret, newSecret map[string]corev1.Se return nil } + +// rotates the slice to the left by k positions +func rotate(clients []*ConfigurationClientWrapper, k int) { + n := len(clients) + k = k % n + if k == 0 { + return + } + // Reverse the entire slice + reverseClients(clients, 0, n-1) + // Reverse the first part + reverseClients(clients, 0, n-k-1) + // Reverse the second part + reverseClients(clients, n-k, n-1) +} + +func reverseClients(clients []*ConfigurationClientWrapper, start, end int) { + for start < end { + clients[start], clients[end] = clients[end], clients[start] + start++ + end-- + } +} diff --git a/internal/loader/request_tracing.go b/internal/loader/request_tracing.go index 9996c1d..f4196b6 100644 --- a/internal/loader/request_tracing.go +++ b/internal/loader/request_tracing.go @@ -1,73 +1,81 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -package loader - -import ( - acpv1 "azappconfig/provider/api/v1" - "context" - "fmt" - "net/http" - "os" - "strings" -) - -type TracingKey string - -type RequestTracing struct { - IsStartUp bool -} - -const ( - RequestTracingKey TracingKey = TracingKey("tracing") - AzureExtensionContext string = "AZURE_EXTENSION_CONTEXT" -) - -func createCorrelationContextHeader(ctx context.Context, provider acpv1.AzureAppConfigurationProvider, clientManager ClientManager) http.Header { - header := http.Header{} - output := make([]string, 0) - - output = append(output, "Host=Kubernetes") - - if tracing := ctx.Value(RequestTracingKey); tracing != nil { - if tracing.(RequestTracing).IsStartUp { - output = append(output, "RequestType=StartUp") - } else { - output = append(output, "RequestType=Watch") - } - } - - if provider.Spec.Secret != nil { - output = append(output, "UsesKeyVault") - - if provider.Spec.Secret.Refresh != nil && - provider.Spec.Secret.Refresh.Enabled { - output = append(output, "RefreshesKeyVault") - } - } - - if provider.Spec.FeatureFlag != nil { - output = append(output, "UsesFeatureFlag") - } - - if provider.Spec.ReplicaDiscoveryEnabled { - if manager, ok := clientManager.(*ConfigurationClientManager); ok { - replicaCount := 0 - if manager.DynamicClientWrappers != nil { - replicaCount = len(manager.DynamicClientWrappers) - } - - output = append(output, fmt.Sprintf("ReplicaCount=%d", replicaCount)) - } - } - - if _, ok := os.LookupEnv(AzureExtensionContext); ok { - output = append(output, "InstalledBy=Extension") - } else { - output = append(output, "InstalledBy=Helm") - } - - header.Add("Correlation-Context", strings.Join(output, ",")) - - return header -} +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package loader + +import ( + acpv1 "azappconfig/provider/api/v1" + "context" + "fmt" + "net/http" + "os" + "strings" +) + +type TracingKey string + +type RequestTracing struct { + IsStartUp bool +} + +const ( + RequestTracingKey TracingKey = TracingKey("tracing") + AzureExtensionContext string = "AZURE_EXTENSION_CONTEXT" +) + +func createCorrelationContextHeader(ctx context.Context, provider acpv1.AzureAppConfigurationProvider, clientManager ClientManager, isFailoverRequest bool) http.Header { + header := http.Header{} + output := make([]string, 0) + + output = append(output, "Host=Kubernetes") + + if tracing := ctx.Value(RequestTracingKey); tracing != nil { + if tracing.(RequestTracing).IsStartUp { + output = append(output, "RequestType=StartUp") + } else { + output = append(output, "RequestType=Watch") + } + } + + if provider.Spec.Secret != nil { + output = append(output, "UsesKeyVault") + + if provider.Spec.Secret.Refresh != nil && + provider.Spec.Secret.Refresh.Enabled { + output = append(output, "RefreshesKeyVault") + } + } + + if provider.Spec.FeatureFlag != nil { + output = append(output, "UsesFeatureFlag") + } + + if provider.Spec.ReplicaDiscoveryEnabled { + if manager, ok := clientManager.(*ConfigurationClientManager); ok { + replicaCount := 0 + if manager.DynamicClientWrappers != nil { + replicaCount = len(manager.DynamicClientWrappers) + } + + output = append(output, fmt.Sprintf("ReplicaCount=%d", replicaCount)) + } + + if isFailoverRequest { + output = append(output, "FailoverRequest") + } + } + + if provider.Spec.LoadBalancingEnabled { + output = append(output, "Features=LB") + } + + if _, ok := os.LookupEnv(AzureExtensionContext); ok { + output = append(output, "InstalledBy=Extension") + } else { + output = append(output, "InstalledBy=Helm") + } + + header.Add("Correlation-Context", strings.Join(output, ",")) + + return header +}