Skip to content

Commit

Permalink
feat: use TakeSnapshotIfChanged to avoid taking redundant snapshot (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
programmer04 authored May 28, 2024
1 parent cf2efda commit fd03b86
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 25 deletions.
6 changes: 3 additions & 3 deletions internal/dataplane/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func NewGenerator(cacheGraphProvider CacheGraphProvider, logger logr.Logger) *Ge
}
}

// GenerateExcludingAffected generates a new cache snapshot that excludes all objects that depend on the broken objects.
func (g *Generator) GenerateExcludingAffected(
// GenerateExcludingBrokenObjects generates a new cache snapshot that excludes all objects that depend on the broken objects.
func (g *Generator) GenerateExcludingBrokenObjects(
cache store.CacheStores,
brokenObjects []ObjectHash,
) (store.CacheStores, error) {
Expand All @@ -37,7 +37,7 @@ func (g *Generator) GenerateExcludingAffected(
return store.CacheStores{}, fmt.Errorf("failed to build cache graph: %w", err)
}

fallbackCache, err := cache.TakeSnapshot()
fallbackCache, _, err := cache.TakeSnapshotIfChanged(store.SnapshotHashEmpty)
if err != nil {
return store.CacheStores{}, fmt.Errorf("failed to take cache snapshot: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions internal/dataplane/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (m *mockGraphProvider) CacheToGraph(s store.CacheStores) (*fallback.ConfigG
return m.graph, nil
}

func TestGenerator_GenerateExcludingAffected(t *testing.T) {
func TestGenerator_GenerateExcludingBrokenObjects(t *testing.T) {
// We have to use real-world object types here as we're testing integration with store.CacheStores.
ingressClass := testIngressClass(t, "ingressClass")
service := testService(t, "service")
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) {
g := fallback.NewGenerator(graphProvider, logr.Discard())

t.Run("ingressClass is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.lastCalledWithStore, "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -68,7 +68,7 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) {
})

t.Run("service is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.lastCalledWithStore, "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -79,7 +79,7 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) {
})

t.Run("serviceFacade is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(serviceFacade)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.lastCalledWithStore, "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -90,7 +90,7 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) {
})

t.Run("plugin is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(plugin)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.lastCalledWithStore, "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand All @@ -101,7 +101,7 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) {
})

t.Run("multiple objects are broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
fallbackCache, err := g.GenerateExcludingBrokenObjects(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass), fallback.GetObjectHash(service)})
require.NoError(t, err)
require.Equal(t, inputCacheStores, graphProvider.lastCalledWithStore, "expected the generator to call CacheToGraph with the input cache stores")
require.NotSame(t, inputCacheStores, fallbackCache)
Expand Down
20 changes: 15 additions & 5 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type KongConfigBuilder interface {

// FallbackConfigGenerator generates a fallback configuration based on a cache snapshot and a set of broken objects.
type FallbackConfigGenerator interface {
GenerateExcludingAffected(store.CacheStores, []fallback.ObjectHash) (store.CacheStores, error)
GenerateExcludingBrokenObjects(store.CacheStores, []fallback.ObjectHash) (store.CacheStores, error)
}

// KongClient is a threadsafe high level API client for the Kong data-plane(s)
Expand Down Expand Up @@ -151,6 +151,10 @@ type KongClient struct {

// fallbackConfigGenerator is used to generate a fallback configuration in case of sync failures.
fallbackConfigGenerator FallbackConfigGenerator

// lastProcessedSnapshotHash stores the hash of the last processed Kubernetes objects cache snapshot. It's used to determine configuration
// changes. Please note it is always empty when the `FallbackConfiguration` feature gate is turned off.
lastProcessedSnapshotHash store.SnapshotHash
}

// NewKongClient provides a new KongClient object after connecting to the
Expand Down Expand Up @@ -413,13 +417,19 @@ func (c *KongClient) Update(ctx context.Context) error {
// based on the cache contents, we need to ensure it is not modified during the process.
var cacheSnapshot store.CacheStores
if c.kongConfig.FallbackConfiguration {
// TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6080
// Use TakeSnapshotIfChanged to avoid taking a snapshot if the cache hasn't changed.
var newSnapshotHash store.SnapshotHash
var err error
cacheSnapshot, err = c.cache.TakeSnapshot()
cacheSnapshot, newSnapshotHash, err = c.cache.TakeSnapshotIfChanged(c.lastProcessedSnapshotHash)
if err != nil {
return fmt.Errorf("failed to take snapshot of cache: %w", err)
}
// Empty snapshot hash means that the cache hasn't changed since the last snapshot was taken. That optimization can be used
// in main code path to avoid unnecessary processing. TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6095
if newSnapshotHash == store.SnapshotHashEmpty {
c.logger.V(util.DebugLevel).Info("No configuration change; pushing config to gateway is not necessary, skipping")
return nil
}
c.lastProcessedSnapshotHash = newSnapshotHash
c.kongConfigBuilder.UpdateCache(cacheSnapshot)
}

Expand Down Expand Up @@ -518,7 +528,7 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration(
if err != nil {
return fmt.Errorf("failed to extract broken objects from update error: %w", err)
}
fallbackCache, err := c.fallbackConfigGenerator.GenerateExcludingAffected(
fallbackCache, err := c.fallbackConfigGenerator.GenerateExcludingBrokenObjects(
cacheSnapshot,
brokenObjects,
)
Expand Down
78 changes: 70 additions & 8 deletions internal/dataplane/kong_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,20 @@ func (m mockConfigurationChangeDetector) HasConfigurationChanged(

// mockKongLastValidConfigFetcher is a mock implementation of FallbackConfigGenerator interface.
type mockFallbackConfigGenerator struct {
generateExcludingAffectedCalledWith lo.Tuple2[store.CacheStores, []fallback.ObjectHash]
generateExcludingAffectedResult store.CacheStores
GenerateExcludingBrokenObjectsCalledWith lo.Tuple2[store.CacheStores, []fallback.ObjectHash]
GenerateExcludingBrokenObjectsResult store.CacheStores
}

func newMockFallbackConfigGenerator() *mockFallbackConfigGenerator {
return &mockFallbackConfigGenerator{}
}

func (m *mockFallbackConfigGenerator) GenerateExcludingAffected(
func (m *mockFallbackConfigGenerator) GenerateExcludingBrokenObjects(
stores store.CacheStores,
hashes []fallback.ObjectHash,
) (store.CacheStores, error) {
m.generateExcludingAffectedCalledWith = lo.T2(stores, hashes)
return m.generateExcludingAffectedResult, nil
m.GenerateExcludingBrokenObjectsCalledWith = lo.T2(stores, hashes)
return m.GenerateExcludingBrokenObjectsResult, nil
}

func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *testing.T) {
Expand Down Expand Up @@ -1026,7 +1026,7 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) {

t.Log("Setting the fallback config generator to return a snapshot excluding the broken consumer")
fallbackCacheStoresToBeReturned := cacheStoresFromObjs(t, validConsumer)
fallbackConfigGenerator.generateExcludingAffectedResult = fallbackCacheStoresToBeReturned
fallbackConfigGenerator.GenerateExcludingBrokenObjectsResult = fallbackCacheStoresToBeReturned

t.Log("Calling KongClient.Update")
err = kongClient.Update(ctx)
Expand All @@ -1044,8 +1044,8 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) {
require.True(t, hasConsumer, "expected consumer to be in the first cache snapshot")

t.Log("Verifying that the fallback config generator was called with the first cache snapshot and the broken object hash")
expectedGenerateExcludingAffectedArgs := lo.T2(firstCacheUpdate, []fallback.ObjectHash{fallback.GetObjectHash(brokenConsumer)})
require.Equal(t, expectedGenerateExcludingAffectedArgs, fallbackConfigGenerator.generateExcludingAffectedCalledWith,
expectedGenerateExcludingBrokenObjectsArgs := lo.T2(firstCacheUpdate, []fallback.ObjectHash{fallback.GetObjectHash(brokenConsumer)})
require.Equal(t, expectedGenerateExcludingBrokenObjectsArgs, fallbackConfigGenerator.GenerateExcludingBrokenObjectsCalledWith,
"expected fallback config generator to be called with the first cache snapshot and the broken object hash")

t.Log("Verifying that the second config builder cache update contains the fallback snapshot")
Expand All @@ -1068,6 +1068,68 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) {
require.Equal(t, validConsumer.Username, *lastValidConfig.Consumers[0].Username)
}

func TestKongClient_FallbackConfiguration_SkipMakingRedundantSnapshot(t *testing.T) {
ctx := context.Background()
gwClient := mustSampleGatewayClient(t)
konnectClient := mustSampleKonnectClient(t)
clientsProvider := mockGatewayClientsProvider{
gatewayClients: []*adminapi.Client{gwClient},
konnectClient: konnectClient,
}
updateStrategyResolver := newMockUpdateStrategyResolver(t)
configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true}
configBuilder := newMockKongConfigBuilder()
lastValidConfigFetcher := &mockKongLastValidConfigFetcher{}
fallbackConfigGenerator := newMockFallbackConfigGenerator()

// We'll use KongConsumer as an example of an object, but it could be any supported type
// for the purpose of this test as the fallback config generator is mocked anyway.
someConsumer := func(name string) *kongv1.KongConsumer {
return &kongv1.KongConsumer{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
Annotations: map[string]string{
annotations.IngressClassKey: annotations.DefaultIngressClass,
},
},
Username: name,
}
}

originalCache := cacheStoresFromObjs(t, someConsumer("valid"))
kongClient, err := NewKongClient(
zapr.NewLogger(zap.NewNop()),
time.Second,
util.ConfigDumpDiagnostic{},
sendconfig.Config{
FallbackConfiguration: true,
},
mocks.NewEventRecorder(),
dpconf.DBModeOff,
clientsProvider,
updateStrategyResolver,
configChangeDetector,
lastValidConfigFetcher,
configBuilder,
originalCache,
fallbackConfigGenerator,
)
require.NoError(t, err)

t.Log("Calling KongClient.Update")
require.NoError(t, kongClient.Update(ctx))

t.Log("Verifying that the config builder cache was updated once")
require.Len(t, configBuilder.updateCacheCalls, 1)

t.Log("Calling KongClient.Update again")
require.NoError(t, kongClient.Update(ctx))

t.Log("Verifying that the config builder cache was not updated when config was not changed")
require.Len(t, configBuilder.updateCacheCalls, 1)
}

func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) {
ctx := context.Background()
gwClient := mustSampleGatewayClient(t)
Expand Down
8 changes: 5 additions & 3 deletions internal/store/cache_stores_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
)

// TakeSnapshot takes a snapshot of the CacheStores.
//
// Deprecated: use TakeSnapshotIfChanged instead.
func (c CacheStores) TakeSnapshot() (CacheStores, error) {
// Create a fresh CacheStores instance to store the snapshot
// in the c.takeSnapshot method. It happens here because it's
Expand Down Expand Up @@ -97,7 +99,7 @@ func (c CacheStores) TakeSnapshotIfChanged(previousSnapshotHash SnapshotHash) (
return string(uid) + resourceVer
})
if capturedErr != nil {
return CacheStores{}, "", capturedErr
return CacheStores{}, SnapshotHashEmpty, capturedErr
}
// Strings have to be used instead of byte slices, because Cmp.Ordered has to be satisfied.
slices.Sort(valuesForHashComputation)
Expand All @@ -110,12 +112,12 @@ func (c CacheStores) TakeSnapshotIfChanged(previousSnapshotHash SnapshotHash) (

// If the hash of the current state is the same as the hash of the previous snapshot, return an empty snapshot.
if newHash == previousSnapshotHash {
return CacheStores{}, "", nil
return CacheStores{}, SnapshotHashEmpty, nil
}

// Take a snapshot of the current state as the hash of the current state differs from the previous one.
if err := takeSnapshot(&snapshot, listOfStores); err != nil {
return CacheStores{}, "", fmt.Errorf("failed to take snapshot: %w", err)
return CacheStores{}, SnapshotHashEmpty, fmt.Errorf("failed to take snapshot: %w", err)
}
return snapshot, newHash, nil
}
Expand Down

0 comments on commit fd03b86

Please sign in to comment.