diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index a130e3e5b..a453f97ad 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -56,12 +56,6 @@ type Subscription interface { // This considers subtleties related to the current migration of wildcard definitions within the protocol. // More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return IsWildcard() bool - - // WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription. - // It is currently only applicable to delta-xds. - // If the request is wildcard, it will always return true, - // otherwise it will compare the provided resources to the list of resources currently subscribed - WatchesResources(resourceNames map[string]struct{}) bool } // ConfigWatcher requests watches for configuration resources by a node, last diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 909f64f1a..1b3a374a5 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -16,8 +16,11 @@ package cache import ( "context" + "encoding/hex" "errors" "fmt" + "hash/fnv" + "sort" "strconv" "strings" "sync" @@ -37,6 +40,45 @@ type cachedResource struct { // resourceVersion is the version of the resource itself (built through stable marshaling). // It is only set if computeResourceVersion is set to true on the cache. resourceVersion string + + // marshaledResource contains the marshaled version of the resource if it has been + // computed already. If it has not it will not be set + marshaledResource []byte +} + +func newCachedResource(res types.Resource, cacheVersion string, computeStableVersion bool) (cachedResource, error) { + r := cachedResource{ + Resource: res, + cacheVersion: cacheVersion, + } + if computeStableVersion { + if err := r.computeStableVersion(); err != nil { + return r, err + } + } + return r, nil +} + +func (c cachedResource) getVersion(useStableVersion bool) string { + if useStableVersion { + return c.resourceVersion + } + return c.cacheVersion +} + +func (c *cachedResource) computeStableVersion() error { + // hash our version in here and build the version map + marshaledResource, err := MarshalResource(c.Resource) + if err != nil { + return err + } + v := HashResource(marshaledResource) + if v == "" { + return errors.New("failed to build resource version") + } + c.marshaledResource = marshaledResource + c.resourceVersion = v + return nil } type watches struct { @@ -89,6 +131,10 @@ type LinearCache struct { // computeResourceVersion indicates whether the cache is currently computing and storing stable resource versions. computeResourceVersion bool + // useStableVersionsInSotw switches to a new version model for sotw watches. + // The version is then encoding the known resources of the subscription + useStableVersionsInSotw bool + log log.Logger mu sync.RWMutex @@ -133,6 +179,17 @@ func WithComputeStableVersions() LinearCacheOption { } } +// WithSotwStableVersions changes the versions returned in sotw to encode the list of resources known +// in the subscription. +// The use of stable versions for sotw also deduplicates updates to clients if the cache updates are +// not changing the content of the resource. +func WithSotwStableVersions() LinearCacheOption { + return func(cache *LinearCache) { + cache.computeResourceVersion = true + cache.useStableVersionsInSotw = true + } +} + // NewLinearCache creates a new cache. See the comments on the struct definition. func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { out := &LinearCache{ @@ -150,11 +207,10 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { for name, resource := range out.resources { resource.cacheVersion = out.getVersion() if out.computeResourceVersion { - version, err := computeResourceStableVersion(resource.Resource) + err := resource.computeStableVersion() if err != nil { out.log.Errorf("failed to build stable versions for resource %s: %s", name, err) } - resource.resourceVersion = version } out.resources[name] = resource } @@ -171,18 +227,13 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, ignoreReturned knownVersions = make(map[string]string) } - getVersion := func(c cachedResource) string { return c.cacheVersion } - if useStableVersion { - getVersion = func(c cachedResource) string { return c.resourceVersion } - } - if sub.IsWildcard() { for resourceName, resource := range cache.resources { knownVersion, ok := knownVersions[resourceName] if !ok { // This resource is not yet known by the client (new resource added in the cache or newly subscribed). changedResources = append(changedResources, resourceName) - } else if knownVersion != getVersion(resource) { + } else if knownVersion != resource.getVersion(useStableVersion) { // The client knows an outdated version. changedResources = append(changedResources, resourceName) } @@ -212,7 +263,7 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, ignoreReturned if !known { // This resource is not yet known by the client (new resource added in the cache or newly subscribed). changedResources = append(changedResources, resourceName) - } else if knownVersion != getVersion(res) { + } else if knownVersion != res.getVersion(useStableVersion) { // The client knows an outdated version. changedResources = append(changedResources, resourceName) } @@ -230,8 +281,26 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, ignoreReturned return changedResources, removedResources } +func computeSotwStableVersion(versionMap map[string]string) string { + keys := make([]string, 0, len(versionMap)) + for key := range versionMap { + keys = append(keys, key) + } + sort.Strings(keys) + + // Reuse the hash used on resources. + hasher := fnv.New64a() + for _, key := range keys { + hasher.Write([]byte(key)) + hasher.Write([]byte("/")) + hasher.Write([]byte(versionMap[key])) + hasher.Write([]byte("^")) + } + return hex.EncodeToString(hasher.Sum(nil)) +} + func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturnedResources bool) *RawResponse { - changedResources, removedResources := cache.computeResourceChange(watch.subscription, ignoreReturnedResources, false) + changedResources, removedResources := cache.computeResourceChange(watch.subscription, ignoreReturnedResources, cache.useStableVersionsInSotw) if len(changedResources) == 0 && len(removedResources) == 0 && !ignoreReturnedResources { // Nothing changed. return nil @@ -257,14 +326,14 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne for _, resourceName := range changedResources { cachedResource := cache.resources[resourceName] resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource}) - returnedVersions[resourceName] = cachedResource.cacheVersion + returnedVersions[resourceName] = cachedResource.getVersion(cache.useStableVersionsInSotw) } case watch.subscription.IsWildcard(): // Include all resources for the type. resources = make([]types.ResourceWithTTL, 0, len(cache.resources)) for resourceName, cachedResource := range cache.resources { resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource}) - returnedVersions[resourceName] = cachedResource.cacheVersion + returnedVersions[resourceName] = cachedResource.getVersion(cache.useStableVersionsInSotw) } default: // Include all resources matching the subscription, with no concern on whether @@ -279,7 +348,7 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne continue } resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource}) - returnedVersions[resourceName] = cachedResource.cacheVersion + returnedVersions[resourceName] = cachedResource.getVersion(cache.useStableVersionsInSotw) } } @@ -297,11 +366,16 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne return nil } + responseVersion := cacheVersion + if cache.useStableVersionsInSotw { + responseVersion = cache.versionPrefix + computeSotwStableVersion(returnedVersions) + } + return &RawResponse{ Request: watch.Request, Resources: resources, ReturnedResources: returnedVersions, - Version: cacheVersion, + Version: responseVersion, Ctx: context.Background(), } } @@ -322,9 +396,9 @@ func (cache *LinearCache) computeDeltaResponse(watch DeltaResponseWatch) *RawDel cacheVersion := cache.getVersion() resources := make([]types.Resource, 0, len(changedResources)) for _, resourceName := range changedResources { - resource := cache.resources[resourceName] - resources = append(resources, resource.Resource) - returnedVersions[resourceName] = resource.resourceVersion + cachedResource := cache.resources[resourceName] + resources = append(resources, cachedResource.Resource) + returnedVersions[resourceName] = cachedResource.getVersion(true) } // Cleanup resources no longer existing in the cache or no longer subscribed. for _, resourceName := range removedResources { @@ -397,32 +471,12 @@ func (cache *LinearCache) notifyAll(modified []string) { } } -func computeResourceStableVersion(res types.Resource) (string, error) { - // hash our version in here and build the version map - marshaledResource, err := MarshalResource(res) - if err != nil { - return "", err - } - v := HashResource(marshaledResource) - if v == "" { - return "", errors.New("failed to build resource version") - } - return v, nil -} - func (cache *LinearCache) addResourceToCache(name string, res types.Resource) error { - update := cachedResource{ - Resource: res, - cacheVersion: cache.getVersion(), - } - if cache.computeResourceVersion { - version, err := computeResourceStableVersion(res) - if err != nil { - return err - } - update.resourceVersion = version + r, err := newCachedResource(res, cache.getVersion(), cache.computeResourceVersion) + if err != nil { + return err } - cache.resources[name] = update + cache.resources[name] = r return nil } @@ -561,7 +615,27 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value defer cache.mu.Unlock() response := cache.computeSotwResponse(watch, ignoreCurrentSubscriptionResources) + shouldReply := false if response != nil { + // If the request + // - is the first + // - provides a non-empty version, matching the version prefix + // and the cache uses stable versions, if the generated versions are the same as the previous one, we do not return the response. + // This avoids resending all data if the new subscription is just a resumption of the previous one. + if cache.useStableVersionsInSotw && request.GetResponseNonce() == "" && !ignoreCurrentSubscriptionResources { + shouldReply = request.GetVersionInfo() != response.Version + + // We confirmed the content of the known resources, store them in the watch we create. + subscription := newWatchSubscription(sub) + subscription.returnedResources = response.ReturnedResources + watch.subscription = subscription + sub = subscription + } else { + shouldReply = true + } + } + + if shouldReply { cache.log.Debugf("[linear cache] replying to the watch with resources %v (subscription values %v, known %v)", response.GetReturnedResources(), sub.SubscribedResources(), sub.ReturnedResources()) watch.Response <- response return func() {}, nil @@ -626,11 +700,10 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscripti cache.log.Infof("[linear cache] activating stable resource version computation for %s", cache.typeURL) for name, cachedResource := range cache.resources { - version, err := computeResourceStableVersion(cachedResource.Resource) + err := cachedResource.computeStableVersion() if err != nil { return func() {}, fmt.Errorf("failed to build stable versions for resource %s: %w", name, err) } - cachedResource.resourceVersion = version cache.resources[name] = cachedResource } } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 0218ace59..53fa20bae 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" @@ -1469,3 +1470,141 @@ func TestLinearSotwNonWildcard(t *testing.T) { checkPendingWatch(4) }) } + +func TestLinearSotwVersion(t *testing.T) { + cache := NewLinearCache(resource.EndpointType, WithLogger(log.NewTestLogger(t)), WithInitialResources( + map[string]types.Resource{ + "a": &endpoint.ClusterLoadAssignment{ClusterName: "a"}, + "b": &endpoint.ClusterLoadAssignment{ClusterName: "b"}, + "c": &endpoint.ClusterLoadAssignment{ClusterName: "c"}, + }, + ), WithSotwStableVersions()) + + buildRequest := func(res []string, version string) *discovery.DiscoveryRequest { + return &discovery.DiscoveryRequest{ + ResourceNames: res, + TypeUrl: resource.EndpointType, + VersionInfo: version, + } + } + + var lastVersion string + t.Run("watch without any version is replied to", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{"a", "b", "d"}, "") + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + resp := verifyResponseResources(t, w, resource.EndpointType, "1d0dadc055487bf8", "a", "b") + lastVersion, err = resp.GetVersion() + require.NoError(t, err) + assert.NotEmpty(t, lastVersion) + }) + + t.Run("watch opened with the same last version", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{"a", "b", "d"}, lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + mustBlock(t, w) + }) + + t.Run("watch opened with the same last version and different prefix", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{"a", "b", "d"}, "test-prefix-"+lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + verifyResponseResources(t, w, resource.EndpointType, lastVersion, "a", "b") + }) + + t.Run("watch opened with the same last version missing prefix", func(t *testing.T) { + cache := NewLinearCache(resource.EndpointType, WithLogger(log.NewTestLogger(t)), WithInitialResources( + map[string]types.Resource{ + "a": &endpoint.ClusterLoadAssignment{ClusterName: "a"}, + "b": &endpoint.ClusterLoadAssignment{ClusterName: "b"}, + "c": &endpoint.ClusterLoadAssignment{ClusterName: "c"}, + }, + ), WithSotwStableVersions(), WithVersionPrefix("test-prefix-")) + + req := buildRequest([]string{"a", "b", "d"}, lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + verifyResponseResources(t, w, resource.EndpointType, "test-prefix-1d0dadc055487bf8", "a", "b") + }) + + t.Run("watch opened with the same last version including prefix", func(t *testing.T) { + cache := NewLinearCache(resource.EndpointType, WithLogger(log.NewTestLogger(t)), WithInitialResources( + map[string]types.Resource{ + "a": &endpoint.ClusterLoadAssignment{ClusterName: "a"}, + "b": &endpoint.ClusterLoadAssignment{ClusterName: "b"}, + "c": &endpoint.ClusterLoadAssignment{ClusterName: "c"}, + }, + ), WithSotwStableVersions(), WithVersionPrefix("test-prefix-")) + + req := buildRequest([]string{"a", "b", "d"}, "test-prefix-"+lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + mustBlock(t, w) + }) + + t.Run("watch opened with the same last version, different resource not changing the response", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{"a", "b", "e"}, lastVersion) + sub := subFromRequest(req) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, sub, w) + require.NoError(t, err) + mustBlock(t, w) + + _ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"}) + // Resources a and b are still at the proper version, so not returned + resp := verifyResponseResources(t, w, resource.EndpointType, "ef89d29eb6398b90", "e") + updateFromSotwResponse(resp, &sub, req) + + w = make(chan Response, 1) + _, err = cache.CreateWatch(req, sub, w) + require.NoError(t, err) + mustBlock(t, w) + // Resource is not changed, nothing is returned + _ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"}) + mustBlock(t, w) + + _ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e", Policy: &endpoint.ClusterLoadAssignment_Policy{ + EndpointStaleAfter: durationpb.New(5 * time.Second), + }}) + // Resources a and b are still at the proper version, so not returned + verifyResponseResources(t, w, resource.EndpointType, "51d88a339c93515b", "e") + + _ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"}) + + // Another watch created with the proper version does not trigger + req2 := buildRequest([]string{"a", "b", "e"}, "ef89d29eb6398b90") + sub2 := subFromRequest(req2) + w = make(chan Response, 1) + _, err = cache.CreateWatch(req2, sub2, w) + require.NoError(t, err) + mustBlock(t, w) + }) + + t.Run("watch opened with the same last version and returning more resources", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{}, lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + verifyResponseResources(t, w, resource.EndpointType, "be5530af715f4980", "a", "b", "c", "e") + }) + + t.Run("watch opened with the same last version and returning less resources", func(t *testing.T) { + cache.log = log.NewTestLogger(t) + req := buildRequest([]string{"a", "d"}, lastVersion) + w := make(chan Response, 1) + _, err := cache.CreateWatch(req, subFromRequest(req), w) + require.NoError(t, err) + verifyResponseResources(t, w, resource.EndpointType, "0aa479b0bd7e5474", "a") + }) +} diff --git a/pkg/cache/v3/snapshot.go b/pkg/cache/v3/snapshot.go index bddcecb57..8770d6916 100644 --- a/pkg/cache/v3/snapshot.go +++ b/pkg/cache/v3/snapshot.go @@ -188,10 +188,16 @@ func (s *Snapshot) ConstructVersionMap() error { } for _, r := range resources.Items { - v, err := computeResourceStableVersion(r.Resource) + // Hash our version in here and build the version map. + marshaledResource, err := MarshalResource(r.Resource) if err != nil { + return err + } + v := HashResource(marshaledResource) + if v == "" { return fmt.Errorf("failed to build resource version: %w", err) } + s.VersionMap[typeURL][GetResourceName(r.Resource)] = v } } diff --git a/pkg/cache/v3/subscription.go b/pkg/cache/v3/subscription.go new file mode 100644 index 000000000..c528b7c38 --- /dev/null +++ b/pkg/cache/v3/subscription.go @@ -0,0 +1,37 @@ +package cache + +// watchSubscription is used to provide the minimum feature set we need from subscription. +// As only a few APIs are used, and we don't return the subscription, this allows the cache +// to store an altered version of the subscription without modifying the immutable one provided. +type watchSubscription struct { + returnedResources map[string]string + subscribedResources map[string]struct{} + isWildcard bool +} + +func (s watchSubscription) ReturnedResources() map[string]string { + return s.returnedResources +} + +func (s watchSubscription) SubscribedResources() map[string]struct{} { + return s.subscribedResources +} + +func (s watchSubscription) IsWildcard() bool { + return s.isWildcard +} + +func newWatchSubscription(s Subscription) watchSubscription { + clone := watchSubscription{ + isWildcard: s.IsWildcard(), + returnedResources: make(map[string]string, len(s.ReturnedResources())), + subscribedResources: make(map[string]struct{}, len(s.SubscribedResources())), + } + for name, version := range s.ReturnedResources() { + clone.returnedResources[name] = version + } + for name := range s.SubscribedResources() { + clone.subscribedResources[name] = struct{}{} + } + return clone +}