Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load balance support #65

Merged
merged 13 commits into from
Aug 26, 2024
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Run Test
run: |
echo 'Install evntest tool'
ENVTEST_VERSION=release-0.16
ENVTEST_VERSION=release-0.18
GOBIN=$GITHUB_WORKSPACE go install sigs.k8s.io/controller-runtime/tools/setup-envtest@$ENVTEST_VERSION
echo 'Run tests'
KUBEBUILDER_ASSETS=$($GITHUB_WORKSPACE/setup-envtest use 1.19 -p path) CGO_ENABLED=0 go test ./... -coverprofile cover.out
KUBEBUILDER_ASSETS=$($GITHUB_WORKSPACE/setup-envtest use 1.30.0 -p path) CGO_ENABLED=0 go test ./... -coverprofile cover.out
4 changes: 2 additions & 2 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: stable
go-version: 1.22.6
- name: golangci-lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.59
version: v1.59.1
args: --timeout 10m
4 changes: 3 additions & 1 deletion api/v1/azureappconfigurationprovider_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ type AzureAppConfigurationProviderSpec struct {
// +kubebuilder:validation:Format=uri
Endpoint *string `json:"endpoint,omitempty"`
// +kubebuilder:default=true
ReplicaDiscoveryEnabled bool `json:"replicaDiscoveryEnabled,omitempty"`
ReplicaDiscoveryEnabled bool `json:"replicaDiscoveryEnabled,omitempty"`
// +kubebuilder:default=false
LoadBalancingEnabled bool `json:"loadBalancingEnabled,omitempty"`
ConnectionStringReference *string `json:"connectionStringReference,omitempty"`
Target ConfigurationGenerationParameters `json:"target"`
Auth *AzureAppConfigurationProviderAuth `json:"auth,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ spec:
type: object
type: array
type: object
loadBalancingEnabled:
default: false
type: boolean
replicaDiscoveryEnabled:
default: true
type: boolean
Expand Down
12 changes: 11 additions & 1 deletion internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ limitations under the License.
package controller

import (
"context"
"path/filepath"
"testing"
"time"

"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -48,6 +50,8 @@ var k8sClient client.Client
var testEnv *envtest.Environment
var mockCtrl *gomock.Controller
var mockConfigurationSettings *mocks.MockConfigurationSettingsRetriever
var ctx context.Context
var cancel context.CancelFunc

func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)
Expand All @@ -58,6 +62,8 @@ func TestAPIs(t *testing.T) {
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

ctx, cancel = context.WithCancel(context.TODO())

By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
Expand Down Expand Up @@ -103,8 +109,12 @@ var _ = BeforeSuite(func() {

var _ = AfterSuite(func() {
By("tearing down the test environment")
cancel()
mockCtrl.Finish()
err := testEnv.Stop()

if err != nil {
time.Sleep(1 * time.Minute)
}
err = testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})
2 changes: 1 addition & 1 deletion internal/loader/configuraiton_setting_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ var _ = Describe("AppConfiguationProvider Get All Settings", func() {
Expect(err).Should(BeNil())
Expect(failedClient.FailedAttempts).Should(Equal(1))
Expect(failedClient.BackOffEndTime.IsZero()).Should(BeFalse())
Expect(succeededClient.FailedAttempts).Should(Equal(0))
Expect(succeededClient.FailedAttempts).Should(Equal(-1))
Expect(succeededClient.BackOffEndTime.IsZero()).Should(BeTrue())
Expect(len(allSettings.ConfigMapSettings)).Should(Equal(6))
Expect(allSettings.ConfigMapSettings["someKey1"]).Should(Equal("value1"))
Expand Down
4 changes: 4 additions & 0 deletions internal/loader/configuration_client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

type ConfigurationClientManager struct {
ReplicaDiscoveryEnabled bool
LoadBalancingEnabled bool
StaticClientWrappers []*ConfigurationClientWrapper
DynamicClientWrappers []*ConfigurationClientWrapper
validDomain string
Expand All @@ -46,6 +47,7 @@ type ConfigurationClientManager struct {
id string
lastFallbackClientAttempt metav1.Time
lastFallbackClientRefresh metav1.Time
lastSuccessfulEndpoint string
}

type ConfigurationClientWrapper struct {
Expand Down Expand Up @@ -93,6 +95,8 @@ var (
func NewConfigurationClientManager(ctx context.Context, provider acpv1.AzureAppConfigurationProvider) (ClientManager, error) {
manager := &ConfigurationClientManager{
ReplicaDiscoveryEnabled: provider.Spec.ReplicaDiscoveryEnabled,
LoadBalancingEnabled: provider.Spec.LoadBalancingEnabled,
lastSuccessfulEndpoint: "",
}

var err error
Expand Down
63 changes: 58 additions & 5 deletions internal/loader/configuration_setting_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,27 +502,48 @@ func (csl *ConfigurationSettingLoader) ExecuteFailoverPolicy(ctx context.Context
return nil, fmt.Errorf("no client is available to connect to the target App Configuration store")
}

if value, ok := os.LookupEnv(RequestTracingEnabled); ok {
if enabled, _ := strconv.ParseBool(value); enabled {
ctx = policy.WithHTTPHeader(ctx, createCorrelationContextHeader(ctx, csl.AzureAppConfigurationProvider, csl.ClientManager))
manager, ok := csl.ClientManager.(*ConfigurationClientManager)
if csl.AzureAppConfigurationProvider.Spec.LoadBalancingEnabled && ok && manager.lastSuccessfulEndpoint != "" && len(clients) > 1 {
nextClientIndex := 0
for _, clientWrapper := range clients {
nextClientIndex++
if clientWrapper.Endpoint == manager.lastSuccessfulEndpoint {
break
}
}

// If we found the last successful client,we'll rotate the list so that the next client is at the beginning
if nextClientIndex < len(clients) {
rotate(clients, nextClientIndex)
}
}

errors := make([]error, 0)
var tracingEnabled, isFailoverRequest bool
if value, ok := os.LookupEnv(RequestTracingEnabled); ok {
tracingEnabled, _ = strconv.ParseBool(value)
}
for _, clientWrapper := range clients {
successful := true
if tracingEnabled {
ctx = policy.WithHTTPHeader(ctx, createCorrelationContextHeader(ctx, csl.AzureAppConfigurationProvider, csl.ClientManager, isFailoverRequest))
}
settingsResponse, err := settingsClient.GetSettings(ctx, clientWrapper.Client)
successful := true
if err != nil {
successful = false
updateClientBackoffStatus(clientWrapper, successful)
if IsFailoverable(err) {
klog.Warningf("current client of '%s' failed to get settings: %s", clientWrapper.Endpoint, err.Error())
errors = append(errors, err)
isFailoverRequest = true
continue
}
return nil, err
}

if manager, ok := csl.ClientManager.(*ConfigurationClientManager); ok {
manager.lastSuccessfulEndpoint = clientWrapper.Endpoint
}
updateClientBackoffStatus(clientWrapper, successful)
return settingsResponse, nil
}
Expand All @@ -535,8 +556,17 @@ func (csl *ConfigurationSettingLoader) ExecuteFailoverPolicy(ctx context.Context
func updateClientBackoffStatus(clientWrapper *ConfigurationClientWrapper, successful bool) {
if successful {
clientWrapper.BackOffEndTime = metav1.Time{}
clientWrapper.FailedAttempts = 0
// Reset FailedAttempts when client succeeded
if clientWrapper.FailedAttempts > 0 {
clientWrapper.FailedAttempts = 0
}
// Use negative value to indicate that successful attempt
clientWrapper.FailedAttempts--
RichardChen820 marked this conversation as resolved.
Show resolved Hide resolved
} else {
//Reset FailedAttempts when client failed
if clientWrapper.FailedAttempts < 0 {
clientWrapper.FailedAttempts = 0
}
clientWrapper.FailedAttempts++
clientWrapper.BackOffEndTime = metav1.Time{Time: metav1.Now().Add(calculateBackoffDuration(clientWrapper.FailedAttempts))}
}
Expand Down Expand Up @@ -796,3 +826,26 @@ func MergeSecret(secret map[string]corev1.Secret, newSecret map[string]corev1.Se

return nil
}

// rotates the slice to the left by k positions
func rotate(clients []*ConfigurationClientWrapper, k int) {
n := len(clients)
k = k % n
if k == 0 {
return
}
// Reverse the entire slice
reverseClients(clients, 0, n-1)
// Reverse the first part
reverseClients(clients, 0, n-k-1)
// Reverse the second part
reverseClients(clients, n-k, n-1)
}

func reverseClients(clients []*ConfigurationClientWrapper, start, end int) {
for start < end {
clients[start], clients[end] = clients[end], clients[start]
start++
end--
}
}
154 changes: 81 additions & 73 deletions internal/loader/request_tracing.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,81 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

package loader

import (
acpv1 "azappconfig/provider/api/v1"
"context"
"fmt"
"net/http"
"os"
"strings"
)

type TracingKey string

type RequestTracing struct {
IsStartUp bool
}

const (
RequestTracingKey TracingKey = TracingKey("tracing")
AzureExtensionContext string = "AZURE_EXTENSION_CONTEXT"
)

func createCorrelationContextHeader(ctx context.Context, provider acpv1.AzureAppConfigurationProvider, clientManager ClientManager) http.Header {
header := http.Header{}
output := make([]string, 0)

output = append(output, "Host=Kubernetes")

if tracing := ctx.Value(RequestTracingKey); tracing != nil {
if tracing.(RequestTracing).IsStartUp {
output = append(output, "RequestType=StartUp")
} else {
output = append(output, "RequestType=Watch")
}
}

if provider.Spec.Secret != nil {
output = append(output, "UsesKeyVault")

if provider.Spec.Secret.Refresh != nil &&
provider.Spec.Secret.Refresh.Enabled {
output = append(output, "RefreshesKeyVault")
}
}

if provider.Spec.FeatureFlag != nil {
output = append(output, "UsesFeatureFlag")
}

if provider.Spec.ReplicaDiscoveryEnabled {
if manager, ok := clientManager.(*ConfigurationClientManager); ok {
replicaCount := 0
if manager.DynamicClientWrappers != nil {
replicaCount = len(manager.DynamicClientWrappers)
}

output = append(output, fmt.Sprintf("ReplicaCount=%d", replicaCount))
}
}

if _, ok := os.LookupEnv(AzureExtensionContext); ok {
output = append(output, "InstalledBy=Extension")
} else {
output = append(output, "InstalledBy=Helm")
}

header.Add("Correlation-Context", strings.Join(output, ","))

return header
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

package loader

import (
acpv1 "azappconfig/provider/api/v1"
"context"
"fmt"
"net/http"
"os"
"strings"
)

type TracingKey string

type RequestTracing struct {
IsStartUp bool
}

const (
RequestTracingKey TracingKey = TracingKey("tracing")
AzureExtensionContext string = "AZURE_EXTENSION_CONTEXT"
)

func createCorrelationContextHeader(ctx context.Context, provider acpv1.AzureAppConfigurationProvider, clientManager ClientManager, isFailoverRequest bool) http.Header {
header := http.Header{}
output := make([]string, 0)

output = append(output, "Host=Kubernetes")

if tracing := ctx.Value(RequestTracingKey); tracing != nil {
if tracing.(RequestTracing).IsStartUp {
output = append(output, "RequestType=StartUp")
} else {
output = append(output, "RequestType=Watch")
}
}

if provider.Spec.Secret != nil {
output = append(output, "UsesKeyVault")

if provider.Spec.Secret.Refresh != nil &&
provider.Spec.Secret.Refresh.Enabled {
output = append(output, "RefreshesKeyVault")
}
}

if provider.Spec.FeatureFlag != nil {
output = append(output, "UsesFeatureFlag")
}

if provider.Spec.ReplicaDiscoveryEnabled {
if manager, ok := clientManager.(*ConfigurationClientManager); ok {
replicaCount := 0
if manager.DynamicClientWrappers != nil {
replicaCount = len(manager.DynamicClientWrappers)
}

output = append(output, fmt.Sprintf("ReplicaCount=%d", replicaCount))
}

if isFailoverRequest {
output = append(output, "FailoverRequest")
}
}

if provider.Spec.LoadBalancingEnabled {
output = append(output, "Features=LB")
}

if _, ok := os.LookupEnv(AzureExtensionContext); ok {
output = append(output, "InstalledBy=Extension")
} else {
output = append(output, "InstalledBy=Helm")
}

header.Add("Correlation-Context", strings.Join(output, ","))

return header
}
Loading