From e1b22c668cc6535a6f339f3bf2dba00fccaaaa80 Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Mon, 5 Feb 2024 22:21:13 -0500 Subject: [PATCH] [Linear cache] Support use of stable versions in sotw When using sotw watches, the current behavior of the linear cache is to use the current version of the cache (monotonically increased on resource updates) as the version returned. In the past the request-provided version was compared to the version of the last resource update to compute the returned resources, allowing watch resumption when a client would reconnect. This behavior was actually not working as the subscribed resources could change while no cache updates occurred, or the newly requested resources were at an older cache version. PR #10 therefore no longer use this behavior to track resources to be returned, instead relying on the subscription state. A side effect is that watch resumption would always return all known resources. Delta watches have a mechanism to avoid this issue, by tracking per resource version and sending them as part of the initial request of a new subscription. Sotw do not allow per resource version in requests and responses, but by encoding the current subscription state through a hash of the returned versions map, this PR now allows resumption if the hash matches the response we would otherwise return. It still has two main limitations: it is less efficient (as we compute an entire response to then not reply) and we cannot track which resource (if any) changed, and will therefore return them all if anything has changed. Signed-off-by: Valerian Roche --- pkg/cache/v3/cache.go | 6 -- pkg/cache/v3/linear.go | 159 +++++++++++++++++++++++++---------- pkg/cache/v3/linear_test.go | 139 ++++++++++++++++++++++++++++++ pkg/cache/v3/snapshot.go | 8 +- pkg/cache/v3/subscription.go | 37 ++++++++ 5 files changed, 299 insertions(+), 50 deletions(-) create mode 100644 pkg/cache/v3/subscription.go diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index a130e3e5b6..a453f97adf 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -56,12 +56,6 @@ type Subscription interface { // This considers subtleties related to the current migration of wildcard definitions within the protocol. // More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return IsWildcard() bool - - // WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription. - // It is currently only applicable to delta-xds. - // If the request is wildcard, it will always return true, - // otherwise it will compare the provided resources to the list of resources currently subscribed - WatchesResources(resourceNames map[string]struct{}) bool } // ConfigWatcher requests watches for configuration resources by a node, last diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 909f64f1a0..1b3a374a56 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -16,8 +16,11 @@ package cache import ( "context" + "encoding/hex" "errors" "fmt" + "hash/fnv" + "sort" "strconv" "strings" "sync" @@ -37,6 +40,45 @@ type cachedResource struct { // resourceVersion is the version of the resource itself (built through stable marshaling). // It is only set if computeResourceVersion is set to true on the cache. resourceVersion string + + // marshaledResource contains the marshaled version of the resource if it has been + // computed already. If it has not it will not be set + marshaledResource []byte +} + +func newCachedResource(res types.Resource, cacheVersion string, computeStableVersion bool) (cachedResource, error) { + r := cachedResource{ + Resource: res, + cacheVersion: cacheVersion, + } + if computeStableVersion { + if err := r.computeStableVersion(); err != nil { + return r, err + } + } + return r, nil +} + +func (c cachedResource) getVersion(useStableVersion bool) string { + if useStableVersion { + return c.resourceVersion + } + return c.cacheVersion +} + +func (c *cachedResource) computeStableVersion() error { + // hash our version in here and build the version map + marshaledResource, err := MarshalResource(c.Resource) + if err != nil { + return err + } + v := HashResource(marshaledResource) + if v == "" { + return errors.New("failed to build resource version") + } + c.marshaledResource = marshaledResource + c.resourceVersion = v + return nil } type watches struct { @@ -89,6 +131,10 @@ type LinearCache struct { // computeResourceVersion indicates whether the cache is currently computing and storing stable resource versions. computeResourceVersion bool + // useStableVersionsInSotw switches to a new version model for sotw watches. + // The version is then encoding the known resources of the subscription + useStableVersionsInSotw bool + log log.Logger mu sync.RWMutex @@ -133,6 +179,17 @@ func WithComputeStableVersions() LinearCacheOption { } } +// WithSotwStableVersions changes the versions returned in sotw to encode the list of resources known +// in the subscription. +// The use of stable versions for sotw also deduplicates updates to clients if the cache updates are +// not changing the content of the resource. +func WithSotwStableVersions() LinearCacheOption { + return func(cache *LinearCache) { + cache.computeResourceVersion = true + cache.useStableVersionsInSotw = true + } +} + // NewLinearCache creates a new cache. See the comments on the struct definition. func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { out := &LinearCache{ @@ -150,11 +207,10 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { for name, resource := range out.resources { resource.cacheVersion = out.getVersion() if out.computeResourceVersion { - version, err := computeResourceStableVersion(resource.Resource) + err := resource.computeStableVersion() if err != nil { out.log.Errorf("failed to build stable versions for resource %s: %s", name, err) } - resource.resourceVersion = version } out.resources[name] = resource } @@ -171,18 +227,13 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, ignoreReturned knownVersions = make(map[string]string) } - getVersion := func(c cachedResource) string { return c.cacheVersion } - if useStableVersion { - getVersion = func(c cachedResource) string { return c.resourceVersion } - } - if sub.IsWildcard() { for resourceName, resource := range cache.resources { knownVersion, ok := knownVersions[resourceName] if !ok { // This resource is not yet known by the client (new resource added in the cache or newly subscribed). changedResources = append(changedResources, resourceName) - } else if knownVersion != getVersion(resource) { + } else if knownVersion != resource.getVersion(useStableVersion) { // The client knows an outdated version. changedResources = append(changedResources, resourceName) } @@ -212,7 +263,7 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, ignoreReturned if !known { // This resource is not yet known by the client (new resource added in the cache or newly subscribed). changedResources = append(changedResources, resourceName) - } else if knownVersion != getVersion(res) { + } else if knownVersion != res.getVersion(useStableVersion) { // The client knows an outdated version. changedResources = append(changedResources, resourceName) } @@ -230,8 +281,26 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, ignoreReturned return changedResources, removedResources } +func computeSotwStableVersion(versionMap map[string]string) string { + keys := make([]string, 0, len(versionMap)) + for key := range versionMap { + keys = append(keys, key) + } + sort.Strings(keys) + + // Reuse the hash used on resources. + hasher := fnv.New64a() + for _, key := range keys { + hasher.Write([]byte(key)) + hasher.Write([]byte("/")) + hasher.Write([]byte(versionMap[key])) + hasher.Write([]byte("^")) + } + return hex.EncodeToString(hasher.Sum(nil)) +} + func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturnedResources bool) *RawResponse { - changedResources, removedResources := cache.computeResourceChange(watch.subscription, ignoreReturnedResources, false) + changedResources, removedResources := cache.computeResourceChange(watch.subscription, ignoreReturnedResources, cache.useStableVersionsInSotw) if len(changedResources) == 0 && len(removedResources) == 0 && !ignoreReturnedResources { // Nothing changed. return nil @@ -257,14 +326,14 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne for _, resourceName := range changedResources { cachedResource := cache.resources[resourceName] resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource}) - returnedVersions[resourceName] = cachedResource.cacheVersion + returnedVersions[resourceName] = cachedResource.getVersion(cache.useStableVersionsInSotw) } case watch.subscription.IsWildcard(): // Include all resources for the type. resources = make([]types.ResourceWithTTL, 0, len(cache.resources)) for resourceName, cachedResource := range cache.resources { resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource}) - returnedVersions[resourceName] = cachedResource.cacheVersion + returnedVersions[resourceName] = cachedResource.getVersion(cache.useStableVersionsInSotw) } default: // Include all resources matching the subscription, with no concern on whether @@ -279,7 +348,7 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne continue } resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource}) - returnedVersions[resourceName] = cachedResource.cacheVersion + returnedVersions[resourceName] = cachedResource.getVersion(cache.useStableVersionsInSotw) } } @@ -297,11 +366,16 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne return nil } + responseVersion := cacheVersion + if cache.useStableVersionsInSotw { + responseVersion = cache.versionPrefix + computeSotwStableVersion(returnedVersions) + } + return &RawResponse{ Request: watch.Request, Resources: resources, ReturnedResources: returnedVersions, - Version: cacheVersion, + Version: responseVersion, Ctx: context.Background(), } } @@ -322,9 +396,9 @@ func (cache *LinearCache) computeDeltaResponse(watch DeltaResponseWatch) *RawDel cacheVersion := cache.getVersion() resources := make([]types.Resource, 0, len(changedResources)) for _, resourceName := range changedResources { - resource := cache.resources[resourceName] - resources = append(resources, resource.Resource) - returnedVersions[resourceName] = resource.resourceVersion + cachedResource := cache.resources[resourceName] + resources = append(resources, cachedResource.Resource) + returnedVersions[resourceName] = cachedResource.getVersion(true) } // Cleanup resources no longer existing in the cache or no longer subscribed. for _, resourceName := range removedResources { @@ -397,32 +471,12 @@ func (cache *LinearCache) notifyAll(modified []string) { } } -func computeResourceStableVersion(res types.Resource) (string, error) { - // hash our version in here and build the version map - marshaledResource, err := MarshalResource(res) - if err != nil { - return "", err - } - v := HashResource(marshaledResource) - if v == "" { - return "", errors.New("failed to build resource version") - } - return v, nil -} - func (cache *LinearCache) addResourceToCache(name string, res types.Resource) error { - update := cachedResource{ - Resource: res, - cacheVersion: cache.getVersion(), - } - if cache.computeResourceVersion { - version, err := computeResourceStableVersion(res) - if err != nil { - return err - } - update.resourceVersion = version + r, err := newCachedResource(res, cache.getVersion(), cache.computeResourceVersion) + if err != nil { + return err } - cache.resources[name] = update + cache.resources[name] = r return nil } @@ -561,7 +615,27 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value defer cache.mu.Unlock() response := cache.computeSotwResponse(watch, ignoreCurrentSubscriptionResources) + shouldReply := false if response != nil { + // If the request + // - is the first + // - provides a non-empty version, matching the version prefix + // and the cache uses stable versions, if the generated versions are the same as the previous one, we do not return the response. + // This avoids resending all data if the new subscription is just a resumption of the previous one. + if cache.useStableVersionsInSotw && request.GetResponseNonce() == "" && !ignoreCurrentSubscriptionResources { + shouldReply = request.GetVersionInfo() != response.Version + + // We confirmed the content of the known resources, store them in the watch we create. + subscription := newWatchSubscription(sub) + subscription.returnedResources = response.ReturnedResources + watch.subscription = subscription + sub = subscription + } else { + shouldReply = true + } + } + + if shouldReply { cache.log.Debugf("[linear cache] replying to the watch with resources %v (subscription values %v, known %v)", response.GetReturnedResources(), sub.SubscribedResources(), sub.ReturnedResources()) watch.Response <- response return func() {}, nil @@ -626,11 +700,10 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscripti cache.log.Infof("[linear cache] activating stable resource version computation for %s", cache.typeURL) for name, cachedResource := range cache.resources { - version, err := computeResourceStableVersion(cachedResource.Resource) + err := cachedResource.computeStableVersion() if err != nil { return func() {}, fmt.Errorf("failed to build stable versions for resource %s: %w", name, err) } - cachedResource.resourceVersion = version cache.resources[name] = cachedResource } } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 0218ace59c..53fa20bae1 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" @@ -1469,3 +1470,141 @@ func TestLinearSotwNonWildcard(t *testing.T) { checkPendingWatch(4) }) } + +func TestLinearSotwVersion(t *testing.T) { + cache := NewLinearCache(resource.EndpointType, WithLogger(log.NewTestLogger(t)), WithInitialResources( + map[string]types.Resource{ + "a": &endpoint.ClusterLoadAssignment{ClusterName: "a"}, + "b": &endpoint.ClusterLoadAssignment{ClusterName: "b"}, + "c": &endpoint.ClusterLoadAssignment{ClusterName: "c"}, + }, + ), WithSotwStableVersions()) + + buildRequest := func(res []string, version string) *discovery.DiscoveryRequest { + return &discovery.DiscoveryRequest{ + ResourceNames: res, + TypeUrl: resource.EndpointType, + VersionInfo: version, + } + } + + var lastVersion string + t.Run("watch without any version is replied to", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{"a", "b", "d"}, "") + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + resp := verifyResponseResources(t, w, resource.EndpointType, "1d0dadc055487bf8", "a", "b") + lastVersion, err = resp.GetVersion() + require.NoError(t, err) + assert.NotEmpty(t, lastVersion) + }) + + t.Run("watch opened with the same last version", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{"a", "b", "d"}, lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + mustBlock(t, w) + }) + + t.Run("watch opened with the same last version and different prefix", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{"a", "b", "d"}, "test-prefix-"+lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + verifyResponseResources(t, w, resource.EndpointType, lastVersion, "a", "b") + }) + + t.Run("watch opened with the same last version missing prefix", func(t *testing.T) { + cache := NewLinearCache(resource.EndpointType, WithLogger(log.NewTestLogger(t)), WithInitialResources( + map[string]types.Resource{ + "a": &endpoint.ClusterLoadAssignment{ClusterName: "a"}, + "b": &endpoint.ClusterLoadAssignment{ClusterName: "b"}, + "c": &endpoint.ClusterLoadAssignment{ClusterName: "c"}, + }, + ), WithSotwStableVersions(), WithVersionPrefix("test-prefix-")) + + req := buildRequest([]string{"a", "b", "d"}, lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + verifyResponseResources(t, w, resource.EndpointType, "test-prefix-1d0dadc055487bf8", "a", "b") + }) + + t.Run("watch opened with the same last version including prefix", func(t *testing.T) { + cache := NewLinearCache(resource.EndpointType, WithLogger(log.NewTestLogger(t)), WithInitialResources( + map[string]types.Resource{ + "a": &endpoint.ClusterLoadAssignment{ClusterName: "a"}, + "b": &endpoint.ClusterLoadAssignment{ClusterName: "b"}, + "c": &endpoint.ClusterLoadAssignment{ClusterName: "c"}, + }, + ), WithSotwStableVersions(), WithVersionPrefix("test-prefix-")) + + req := buildRequest([]string{"a", "b", "d"}, "test-prefix-"+lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + mustBlock(t, w) + }) + + t.Run("watch opened with the same last version, different resource not changing the response", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{"a", "b", "e"}, lastVersion) + sub := subFromRequest(req) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, sub, w) + require.NoError(t, err) + mustBlock(t, w) + + _ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"}) + // Resources a and b are still at the proper version, so not returned + resp := verifyResponseResources(t, w, resource.EndpointType, "ef89d29eb6398b90", "e") + updateFromSotwResponse(resp, &sub, req) + + w = make(chan Response, 1) + _, err = cache.CreateWatch(req, sub, w) + require.NoError(t, err) + mustBlock(t, w) + // Resource is not changed, nothing is returned + _ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"}) + mustBlock(t, w) + + _ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e", Policy: &endpoint.ClusterLoadAssignment_Policy{ + EndpointStaleAfter: durationpb.New(5 * time.Second), + }}) + // Resources a and b are still at the proper version, so not returned + verifyResponseResources(t, w, resource.EndpointType, "51d88a339c93515b", "e") + + _ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"}) + + // Another watch created with the proper version does not trigger + req2 := buildRequest([]string{"a", "b", "e"}, "ef89d29eb6398b90") + sub2 := subFromRequest(req2) + w = make(chan Response, 1) + _, err = cache.CreateWatch(req2, sub2, w) + require.NoError(t, err) + mustBlock(t, w) + }) + + t.Run("watch opened with the same last version and returning more resources", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{}, lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + verifyResponseResources(t, w, resource.EndpointType, "be5530af715f4980", "a", "b", "c", "e") + }) + + t.Run("watch opened with the same last version and returning less resources", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{"a", "d"}, lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + verifyResponseResources(t, w, resource.EndpointType, "0aa479b0bd7e5474", "a") + }) +} diff --git a/pkg/cache/v3/snapshot.go b/pkg/cache/v3/snapshot.go index bddcecb57f..8770d6916a 100644 --- a/pkg/cache/v3/snapshot.go +++ b/pkg/cache/v3/snapshot.go @@ -188,10 +188,16 @@ func (s *Snapshot) ConstructVersionMap() error { } for _, r := range resources.Items { - v, err := computeResourceStableVersion(r.Resource) + // Hash our version in here and build the version map. + marshaledResource, err := MarshalResource(r.Resource) if err != nil { + return err + } + v := HashResource(marshaledResource) + if v == "" { return fmt.Errorf("failed to build resource version: %w", err) } + s.VersionMap[typeURL][GetResourceName(r.Resource)] = v } } diff --git a/pkg/cache/v3/subscription.go b/pkg/cache/v3/subscription.go new file mode 100644 index 0000000000..c528b7c38c --- /dev/null +++ b/pkg/cache/v3/subscription.go @@ -0,0 +1,37 @@ +package cache + +// watchSubscription is used to provide the minimum feature set we need from subscription. +// As only a few APIs are used, and we don't return the subscription, this allows the cache +// to store an altered version of the subscription without modifying the immutable one provided. +type watchSubscription struct { + returnedResources map[string]string + subscribedResources map[string]struct{} + isWildcard bool +} + +func (s watchSubscription) ReturnedResources() map[string]string { + return s.returnedResources +} + +func (s watchSubscription) SubscribedResources() map[string]struct{} { + return s.subscribedResources +} + +func (s watchSubscription) IsWildcard() bool { + return s.isWildcard +} + +func newWatchSubscription(s Subscription) watchSubscription { + clone := watchSubscription{ + isWildcard: s.IsWildcard(), + returnedResources: make(map[string]string, len(s.ReturnedResources())), + subscribedResources: make(map[string]struct{}, len(s.SubscribedResources())), + } + for name, version := range s.ReturnedResources() { + clone.returnedResources[name] = version + } + for name := range s.SubscribedResources() { + clone.subscribedResources[name] = struct{}{} + } + return clone +}