diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index a130e3e5b6..a453f97adf 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -56,12 +56,6 @@ type Subscription interface { // This considers subtleties related to the current migration of wildcard definitions within the protocol. // More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return IsWildcard() bool - - // WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription. - // It is currently only applicable to delta-xds. - // If the request is wildcard, it will always return true, - // otherwise it will compare the provided resources to the list of resources currently subscribed - WatchesResources(resourceNames map[string]struct{}) bool } // ConfigWatcher requests watches for configuration resources by a node, last diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index 93d572337e..9565634914 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -22,12 +22,11 @@ import ( // groups together resource-related arguments for the createDeltaResponse function type resourceContainer struct { - resourceMap map[string]types.Resource - versionMap map[string]string - systemVersion string + resourceMap map[string]types.Resource + versionMap map[string]string } -func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer) *RawDeltaResponse { +func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer, cacheVersion string) *RawDeltaResponse { // variables to build our response with var nextVersionMap map[string]string var filtered []types.Resource @@ -81,7 +80,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio Resources: filtered, RemovedResources: toRemove, NextVersionMap: nextVersionMap, - SystemVersionInfo: resources.systemVersion, + SystemVersionInfo: cacheVersion, Ctx: ctx, } } diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 850afa732f..e31e2f6202 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -21,46 +21,92 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/log" ) +// cachedResource is used to track resources added by the user in the cache. +// It contains the resource itself and its associated version (currently in two different modes). +type cachedResource struct { + types.Resource + + // cacheVersion is the version of the cache at the time of last update, used in sotw. + cacheVersion string + // stableVersion is the version of the resource itself (a hash of its content after deterministic marshaling). + // It is lazy initialized and should be accessed through getStableVersion. + stableVersion string +} + +func (c *cachedResource) getStableVersion() (string, error) { + if c.stableVersion != "" { + return c.stableVersion, nil + } + + // TODO(valerian-roche): store serialized resource as part of the cachedResource + // to reuse it when marshaling the responses instead of remarshaling and recomputing the version then. + marshaledResource, err := MarshalResource(c.Resource) + if err != nil { + return "", err + } + c.stableVersion = HashResource(marshaledResource) + return c.stableVersion, nil +} + +func (c *cachedResource) getVersion(useStableVersion bool) (string, error) { + if !useStableVersion { + return c.cacheVersion, nil + } + + return c.getStableVersion() +} + +type watches struct { + // sotw keeps track of current sotw watches, indexed per watch id. + sotw map[uint64]ResponseWatch + // delta keeps track of current delta watches, indexed per watch id. + delta map[uint64]DeltaResponseWatch +} + +func newWatches() watches { + return watches{ + sotw: make(map[uint64]ResponseWatch), + delta: make(map[uint64]DeltaResponseWatch), + } +} + +func (w *watches) empty() bool { + return len(w.sotw)+len(w.delta) == 0 +} + // LinearCache supports collections of opaque resources. This cache has a // single collection indexed by resource names and manages resource versions // internally. It implements the cache interface for a single type URL and // should be combined with other caches via type URL muxing. It can be used to // supply EDS entries, for example, uniformly across a fleet of proxies. type LinearCache struct { - // Type URL specific to the cache. + // typeURL provides the type of resources managed by the cache. + // This information is used to reject requests watching another type, as well as to make + // decisions based on resource type (e.g. whether sotw must return full-state). typeURL string - // Collection of resources indexed by name. - resources map[string]types.Resource - // Watches open by clients, indexed by resource name. Whenever resources - // are changed, the watch is triggered. - watches map[string]map[int64]ResponseWatch - // Set of watches for all resources in the collection, indexed by watch id. - // watch id is unique for sotw watches and is used to index them without requiring - // the watch itself to be hashable, as well as making logs easier to correlate. - watchAll map[int64]ResponseWatch - // Continuously incremented counter used to index sotw watches. - sotwWatchCount int64 - // Set of delta watches. A delta watch always contain the list of subscribed resources - // together with its current version - // version and versionPrefix fields are ignored for delta watches, because we always generate the resource version. - deltaWatches map[int64]DeltaResponseWatch - // Continuously incremented counter used to index delta watches. - deltaWatchCount int64 - // versionMap holds the current hash map of all resources in the cache when delta watches are present. - // versionMap is only to be used with delta xDS. - versionMap map[string]string - // Continuously incremented version. + + // resources contains all resources currently set in the cache and associated versions. + resources map[string]*cachedResource + + // resourceWatches keeps track of watches currently opened specifically tracking a resource. + // It does not contain wildcard watches. + // It can contain resources not present in resources. + resourceWatches map[string]watches + // wildcardWatches keeps track of all wildcard watches currently opened. + wildcardWatches watches + // currentWatchID is used to index new watches. + currentWatchID uint64 + + // version is the current version of the cache. It is incremented each time resources are updated. version uint64 - // Version prefix to be sent to the clients + // versionPrefix is used to modify the version returned to clients, and can be used to uniquely identify + // cache instances and avoid issues of version reuse. versionPrefix string - // Versions for each resource by name. - versionVector map[string]string log log.Logger @@ -84,7 +130,11 @@ func WithVersionPrefix(prefix string) LinearCacheOption { // WithInitialResources initializes the initial set of resources. func WithInitialResources(resources map[string]types.Resource) LinearCacheOption { return func(cache *LinearCache) { - cache.resources = resources + for name, resource := range resources { + cache.resources[name] = &cachedResource{ + Resource: resource, + } + } } } @@ -97,44 +147,58 @@ func WithLogger(log log.Logger) LinearCacheOption { // NewLinearCache creates a new cache. See the comments on the struct definition. func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { out := &LinearCache{ - typeURL: typeURL, - resources: make(map[string]types.Resource), - watches: make(map[string]map[int64]ResponseWatch), - watchAll: make(map[int64]ResponseWatch), - deltaWatches: make(map[int64]DeltaResponseWatch), - versionMap: nil, - version: 0, - versionVector: make(map[string]string), - log: log.NewDefaultLogger(), + typeURL: typeURL, + resources: make(map[string]*cachedResource), + resourceWatches: make(map[string]watches), + wildcardWatches: newWatches(), + version: 0, + currentWatchID: 0, + log: log.NewDefaultLogger(), } for _, opt := range opts { opt(out) } - for name := range out.resources { - out.versionVector[name] = out.getVersion() + for name, resource := range out.resources { + resource.cacheVersion = out.getVersion() + out.resources[name] = resource } return out } -func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturnedResources bool) *RawResponse { +// computeResourceChange compares the subscription known resources and the cache current state to compute the list of resources +// which have changed and should be notified to the user. +// +// The alwaysConsiderAllResources argument removes the consideration of the subscription known resources (e.g. if the version did not match), +// and return all known subscribed resources. +// +// The useStableVersion argument defines what version type to use for resources: +// - if set to false versions are based on when resources were updated in the cache. +// - if set to true versions are a stable property of the resource, with no regard to when it was added to the cache. +func (cache *LinearCache) computeResourceChange(sub Subscription, alwaysConsiderAllResources, useStableVersion bool) (updated, removed []string, err error) { var changedResources []string var removedResources []string - knownVersions := watch.subscription.ReturnedResources() - if ignoreReturnedResources { + knownVersions := sub.ReturnedResources() + if alwaysConsiderAllResources { // The response will include all resources, with no regards of resources potentially already returned. knownVersions = make(map[string]string) } - if watch.subscription.IsWildcard() { - for resourceName, version := range cache.versionVector { + if sub.IsWildcard() { + for resourceName, resource := range cache.resources { knownVersion, ok := knownVersions[resourceName] if !ok { // This resource is not yet known by the client (new resource added in the cache or newly subscribed). changedResources = append(changedResources, resourceName) - } else if knownVersion != version { - // The client knows an outdated version. - changedResources = append(changedResources, resourceName) + } else { + resourceVersion, err := resource.getVersion(useStableVersion) + if err != nil { + return nil, nil, fmt.Errorf("failed to compute version of %s: %w", resourceName, err) + } + if knownVersion != resourceVersion { + // The client knows an outdated version. + changedResources = append(changedResources, resourceName) + } } } @@ -142,13 +206,13 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne // Sotw does not support returning "deletions", but in the case of full state resources // a response must then be returned. for resourceName := range knownVersions { - if _, ok := cache.versionVector[resourceName]; !ok { + if _, ok := cache.resources[resourceName]; !ok { removedResources = append(removedResources, resourceName) } } } else { - for resourceName := range watch.subscription.SubscribedResources() { - version, exists := cache.versionVector[resourceName] + for resourceName := range sub.SubscribedResources() { + res, exists := cache.resources[resourceName] knownVersion, known := knownVersions[resourceName] if !exists { if known { @@ -162,24 +226,39 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne if !known { // This resource is not yet known by the client (new resource added in the cache or newly subscribed). changedResources = append(changedResources, resourceName) - } else if knownVersion != version { - // The client knows an outdated version. - changedResources = append(changedResources, resourceName) + } else { + resourceVersion, err := res.getVersion(useStableVersion) + if err != nil { + return nil, nil, fmt.Errorf("failed to compute version of %s: %w", resourceName, err) + } + if knownVersion != resourceVersion { + // The client knows an outdated version. + changedResources = append(changedResources, resourceName) + } } } for resourceName := range knownVersions { // If the subscription no longer watches a resource, // we mark it as unknown on the client side to ensure it will be resent to the client if subscribing again later on. - if _, ok := watch.subscription.SubscribedResources()[resourceName]; !ok { + if _, ok := sub.SubscribedResources()[resourceName]; !ok { removedResources = append(removedResources, resourceName) } } } - if len(changedResources) == 0 && len(removedResources) == 0 && !ignoreReturnedResources { + return changedResources, removedResources, nil +} + +func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConsiderAllResources bool) (*RawResponse, error) { + changedResources, removedResources, err := cache.computeResourceChange(watch.subscription, alwaysConsiderAllResources, false) + if err != nil { + return nil, err + } + + if len(changedResources) == 0 && len(removedResources) == 0 && !alwaysConsiderAllResources { // Nothing changed. - return nil + return nil, nil } returnedVersions := make(map[string]string, len(watch.subscription.ReturnedResources())) @@ -192,23 +271,22 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne var resources []types.ResourceWithTTL switch { - // Depending on the type, the response will only include changed resources or all of them + // For lds and cds, answers will always include all matching resources, with no regard to which resource was changed or removed. + // For other types, the response only includes updated resources (sotw cannot notify for deletion). case !ResourceRequiresFullStateInSotw(cache.typeURL): // changedResources is already filtered based on the subscription. - // TODO(valerian-roche): if the only change is a removal in the subscription, - // or a watched resource getting deleted, this might send an empty reply. - // While this does not violate the protocol, we might want to avoid it. resources = make([]types.ResourceWithTTL, 0, len(changedResources)) for _, resourceName := range changedResources { - resources = append(resources, types.ResourceWithTTL{Resource: cache.resources[resourceName]}) - returnedVersions[resourceName] = cache.versionVector[resourceName] + cachedResource := cache.resources[resourceName] + resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource}) + returnedVersions[resourceName] = cachedResource.cacheVersion } case watch.subscription.IsWildcard(): // Include all resources for the type. resources = make([]types.ResourceWithTTL, 0, len(cache.resources)) - for resourceName, resource := range cache.resources { - resources = append(resources, types.ResourceWithTTL{Resource: resource}) - returnedVersions[resourceName] = cache.versionVector[resourceName] + for resourceName, cachedResource := range cache.resources { + resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource}) + returnedVersions[resourceName] = cachedResource.cacheVersion } default: // Include all resources matching the subscription, with no concern on whether @@ -218,12 +296,12 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne // Therefore drives on the subscription requested resources. resources = make([]types.ResourceWithTTL, 0, len(requestedResources)) for resourceName := range requestedResources { - resource, ok := cache.resources[resourceName] + cachedResource, ok := cache.resources[resourceName] if !ok { continue } - resources = append(resources, types.ResourceWithTTL{Resource: resource}) - returnedVersions[resourceName] = cache.versionVector[resourceName] + resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource}) + returnedVersions[resourceName] = cachedResource.cacheVersion } } @@ -234,11 +312,11 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne delete(returnedVersions, resourceName) } - if !ignoreReturnedResources && !ResourceRequiresFullStateInSotw(cache.typeURL) && len(resources) == 0 { - // If the request is not the initial one, and the type for not require full updates, - // do not return if noting is to be set - // For full-state resources an empty response does have a semantic meaning - return nil + if !alwaysConsiderAllResources && !ResourceRequiresFullStateInSotw(cache.typeURL) && len(resources) == 0 { + // If the request is not the initial one, and the type does not require full updates, + // do not return if nothing is to be set. + // For full-state resources an empty response does have a semantic meaning. + return nil, nil } return &RawResponse{ @@ -247,76 +325,145 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne ReturnedResources: returnedVersions, Version: cacheVersion, Ctx: context.Background(), + }, nil +} + +func (cache *LinearCache) computeDeltaResponse(watch DeltaResponseWatch) (*RawDeltaResponse, error) { + changedResources, removedResources, err := cache.computeResourceChange(watch.subscription, false, true) + if err != nil { + return nil, err } + + // On first request on a wildcard subscription, envoy does expect a response to come in to + // conclude initialization. + isFirstWildcardRequest := watch.subscription.IsWildcard() && watch.Request.GetResponseNonce() == "" + if len(changedResources) == 0 && len(removedResources) == 0 && !isFirstWildcardRequest { + // Nothing changed. + return nil, nil + } + + returnedVersions := make(map[string]string, len(watch.subscription.ReturnedResources())) + // Clone the current returned versions. The cache should not alter the subscription + for resourceName, version := range watch.subscription.ReturnedResources() { + returnedVersions[resourceName] = version + } + + cacheVersion := cache.getVersion() + resources := make([]types.Resource, 0, len(changedResources)) + for _, resourceName := range changedResources { + resource := cache.resources[resourceName] + resources = append(resources, resource.Resource) + version, err := resource.getStableVersion() + if err != nil { + return nil, fmt.Errorf("failed to compute stable version of %s: %w", resourceName, err) + } + returnedVersions[resourceName] = version + } + // Cleanup resources no longer existing in the cache or no longer subscribed. + for _, resourceName := range removedResources { + delete(returnedVersions, resourceName) + } + + return &RawDeltaResponse{ + DeltaRequest: watch.Request, + Resources: resources, + RemovedResources: removedResources, + NextVersionMap: returnedVersions, + SystemVersionInfo: cacheVersion, + Ctx: context.Background(), + }, nil } -func (cache *LinearCache) notifyAll(modified map[string]struct{}) { - // Gather the list of non-wildcard watches impacted by the modified resources. - watches := make(map[int64]ResponseWatch) - for name := range modified { - for watchID, watch := range cache.watches[name] { - watches[watchID] = watch +func (cache *LinearCache) notifyAll(modified []string) error { + // Gather the list of watches impacted by the modified resources. + sotwWatches := make(map[uint64]ResponseWatch) + deltaWatches := make(map[uint64]DeltaResponseWatch) + for _, name := range modified { + for watchID, watch := range cache.resourceWatches[name].sotw { + sotwWatches[watchID] = watch + } + for watchID, watch := range cache.resourceWatches[name].delta { + deltaWatches[watchID] = watch } } - for watchID, watch := range watches { - response := cache.computeSotwResponse(watch, false) + + // sotw watches + for watchID, watch := range sotwWatches { + response, err := cache.computeSotwResponse(watch, false) + if err != nil { + return err + } + if response != nil { watch.Response <- response cache.removeWatch(watchID, watch.subscription) } else { - cache.log.Warnf("[Linear cache] Watch %d detected as triggered did not get notified", watchID) + cache.log.Warnf("[Linear cache] Watch %d detected as triggered but no change was found", watchID) } } - for watchID, watch := range cache.watchAll { - response := cache.computeSotwResponse(watch, false) + for watchID, watch := range cache.wildcardWatches.sotw { + response, err := cache.computeSotwResponse(watch, false) + if err != nil { + return err + } + if response != nil { watch.Response <- response - delete(cache.watchAll, watchID) + delete(cache.wildcardWatches.sotw, watchID) } else { - cache.log.Warnf("[Linear cache] Watch %d detected as triggered did not get notified", watchID) + cache.log.Warnf("[Linear cache] Wildcard watch %d detected as triggered but no change was found", watchID) } } - // Building the version map has a very high cost when using SetResources to do full updates. - // As it is only used with delta watches, it is only maintained when applicable. - if cache.versionMap != nil { - err := cache.updateVersionMap(modified) + // delta watches + for watchID, watch := range deltaWatches { + response, err := cache.computeDeltaResponse(watch) if err != nil { - cache.log.Errorf("failed to update version map: %v", err) + return err } - for id, watch := range cache.deltaWatches { - if !watch.subscription.WatchesResources(modified) { - continue - } + if response != nil { + watch.Response <- response + cache.removeDeltaWatch(watchID, watch.subscription) + } else { + cache.log.Warnf("[Linear cache] Delta watch %d detected as triggered but no change was found", watchID) + } + } - res := cache.respondDelta(watch.Request, watch.Response, watch.subscription) - if res != nil { - delete(cache.deltaWatches, id) - } + for watchID, watch := range cache.wildcardWatches.delta { + response, err := cache.computeDeltaResponse(watch) + if err != nil { + return err + } + + if response != nil { + watch.Response <- response + delete(cache.wildcardWatches.delta, watchID) + } else { + cache.log.Warnf("[Linear cache] Wildcard delta watch %d detected as triggered but no change was found", watchID) } } + + return nil } -func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, sub Subscription) *RawDeltaResponse { - resp := createDeltaResponse(context.Background(), request, sub, resourceContainer{ - resourceMap: cache.resources, - versionMap: cache.versionMap, - systemVersion: cache.getVersion(), - }) +func computeResourceStableVersion(res types.Resource) (string, error) { + // TODO(valerian-roche): store serialized resource as part of the cachedResource + // to reuse it when marshaling the responses instead of remarshaling and recomputing the version then. + marshaledResource, err := MarshalResource(res) + if err != nil { + return "", err + } + return HashResource(marshaledResource), nil +} - // Only send a response if there were changes - // We want to respond immediately for the first wildcard request in a stream, even if the response is empty - // otherwise, envoy won't complete initialization - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (sub.IsWildcard() && request.ResponseNonce == "") { - if cache.log != nil { - cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard()) - } - value <- resp - return resp +func (cache *LinearCache) addResourceToCache(name string, res types.Resource) error { + update := &cachedResource{ + Resource: res, + cacheVersion: cache.getVersion(), } + cache.resources[name] = update return nil } @@ -329,13 +476,11 @@ func (cache *LinearCache) UpdateResource(name string, res types.Resource) error defer cache.mu.Unlock() cache.version++ - cache.versionVector[name] = cache.getVersion() - cache.resources[name] = res - - // TODO: batch watch closures to prevent rapid updates - cache.notifyAll(map[string]struct{}{name: {}}) + if err := cache.addResourceToCache(name, res); err != nil { + return err + } - return nil + return cache.notifyAll([]string{name}) } // DeleteResource removes a resource in the collection. @@ -344,12 +489,9 @@ func (cache *LinearCache) DeleteResource(name string) error { defer cache.mu.Unlock() cache.version++ - delete(cache.versionVector, name) delete(cache.resources, name) - // TODO: batch watch closures to prevent rapid updates - cache.notifyAll(map[string]struct{}{name: {}}) - return nil + return cache.notifyAll([]string{name}) } // UpdateResources updates/deletes a list of resources in the cache. @@ -360,55 +502,51 @@ func (cache *LinearCache) UpdateResources(toUpdate map[string]types.Resource, to defer cache.mu.Unlock() cache.version++ - cacheVersion := cache.getVersion() - - modified := make(map[string]struct{}, len(toUpdate)+len(toDelete)) + modified := make([]string, 0, len(toUpdate)+len(toDelete)) for name, resource := range toUpdate { - cache.versionVector[name] = cacheVersion - cache.resources[name] = resource - modified[name] = struct{}{} + if err := cache.addResourceToCache(name, resource); err != nil { + return err + } + modified = append(modified, name) } for _, name := range toDelete { - delete(cache.versionVector, name) delete(cache.resources, name) - modified[name] = struct{}{} + modified = append(modified, name) } - cache.notifyAll(modified) - - return nil + return cache.notifyAll(modified) } // SetResources replaces current resources with a new set of resources. -// This function is useful for wildcard xDS subscriptions. -// This way watches that are subscribed to all resources are triggered only once regardless of how many resources are changed. +// If only some resources are to be updated, UpdateResources is more efficient. func (cache *LinearCache) SetResources(resources map[string]types.Resource) { cache.mu.Lock() defer cache.mu.Unlock() cache.version++ - cacheVersion := cache.getVersion() - modified := map[string]struct{}{} + modified := make([]string, 0, len(resources)) // Collect deleted resource names. for name := range cache.resources { if _, found := resources[name]; !found { - delete(cache.versionVector, name) - modified[name] = struct{}{} + delete(cache.resources, name) + modified = append(modified, name) } } - cache.resources = resources - // Collect changed resource names. // We assume all resources passed to SetResources are changed. // Otherwise we would have to do proto.Equal on resources which is pretty expensive operation - for name := range resources { - cache.versionVector[name] = cacheVersion - modified[name] = struct{}{} + for name, resource := range resources { + if err := cache.addResourceToCache(name, resource); err != nil { + cache.log.Errorf("Failed to add resources to the cache: %s", err) + } + modified = append(modified, name) } - cache.notifyAll(modified) + if err := cache.notifyAll(modified); err != nil { + cache.log.Errorf("Failed to notify watches: %s", err.Error()) + } } // GetResources returns current resources stored in the cache @@ -420,11 +558,15 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { // involving mutations of our backing map resources := make(map[string]types.Resource, len(cache.resources)) for k, v := range cache.resources { - resources[k] = v + resources[k] = v.Resource } return resources } +// The implementations of sotw and delta watches handling is nearly identical. The main distinctions are: +// - handling of version in sotw when the request is the first of a subscription. Delta has a proper handling based on the request providing known versions. +// - building the initial resource versions in delta if they've not been computed yet. +// - computeSotwResponse and computeDeltaResponse has slightly different implementations due to sotw requirements to return full state for certain resources only. func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value chan Response) (func(), error) { if request.GetTypeUrl() != cache.typeURL { return nil, fmt.Errorf("request type %s does not match cache type %s", request.GetTypeUrl(), cache.typeURL) @@ -455,33 +597,37 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value cache.mu.Lock() defer cache.mu.Unlock() - response := cache.computeSotwResponse(watch, ignoreCurrentSubscriptionResources) + response, err := cache.computeSotwResponse(watch, ignoreCurrentSubscriptionResources) + if err != nil { + return nil, fmt.Errorf("failed to compute the watch respnse: %w", err) + } + if response != nil { - cache.log.Debugf("replying to the watch with resources %v (subscription values %v, known %v)", response.GetReturnedResources(), sub.SubscribedResources(), sub.ReturnedResources()) + cache.log.Debugf("[linear cache] replying to the watch with resources %v (subscription values %v, known %v)", response.GetReturnedResources(), sub.SubscribedResources(), sub.ReturnedResources()) watch.Response <- response return func() {}, nil } - watchID := cache.nextSotwWatchID() + watchID := cache.nextWatchID() // Create open watches since versions are up to date. if sub.IsWildcard() { cache.log.Infof("[linear cache] open watch %d for %s all resources, system version %q", watchID, cache.typeURL, cache.getVersion()) - cache.watchAll[watchID] = watch + cache.wildcardWatches.sotw[watchID] = watch return func() { cache.mu.Lock() defer cache.mu.Unlock() - delete(cache.watchAll, watchID) + delete(cache.wildcardWatches.sotw, watchID) }, nil } cache.log.Infof("[linear cache] open watch %d for %s resources %v, system version %q", watchID, cache.typeURL, sub.SubscribedResources(), cache.getVersion()) for name := range sub.SubscribedResources() { - set, exists := cache.watches[name] + watches, exists := cache.resourceWatches[name] if !exists { - set = make(map[int64]ResponseWatch) - cache.watches[name] = set + watches = newWatches() + cache.resourceWatches[name] = watches } - set[watchID] = watch + watches.sotw[watchID] = watch } return func() { cache.mu.Lock() @@ -490,87 +636,66 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value }, nil } -func (cache *LinearCache) nextSotwWatchID() int64 { - next := atomic.AddInt64(&cache.sotwWatchCount, 1) - if next < 0 { - panic("watch id count overflow") - } - return next -} - // Must be called under lock -func (cache *LinearCache) removeWatch(watchID int64, sub Subscription) { +func (cache *LinearCache) removeWatch(watchID uint64, sub Subscription) { // Make sure we clean the watch for ALL resources it might be associated with, // as the channel will no longer be listened to for resource := range sub.SubscribedResources() { - resourceWatches := cache.watches[resource] - delete(resourceWatches, watchID) - if len(resourceWatches) == 0 { - delete(cache.watches, resource) + resourceWatches := cache.resourceWatches[resource] + delete(resourceWatches.sotw, watchID) + if resourceWatches.empty() { + delete(cache.resourceWatches, resource) } } } func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { - cache.mu.Lock() - defer cache.mu.Unlock() - - if cache.versionMap == nil { - // If we had no previously open delta watches, we need to build the version map for the first time. - // The version map will not be destroyed when the last delta watch is removed. - // This avoids constantly rebuilding when only a few delta watches are open. - modified := map[string]struct{}{} - for name := range cache.resources { - modified[name] = struct{}{} - } - err := cache.updateVersionMap(modified) - if err != nil && cache.log != nil { - cache.log.Errorf("failed to update version map: %v", err) - } + if request.GetTypeUrl() != cache.typeURL { + return nil, fmt.Errorf("request type %s does not match cache type %s", request.GetTypeUrl(), cache.typeURL) } - response := cache.respondDelta(request, value, sub) - // if respondDelta returns nil this means that there is no change in any resource version - // create a new watch accordingly - if response == nil { - watchID := cache.nextDeltaWatchID() - if cache.log != nil { - cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, - cache.typeURL, sub.SubscribedResources(), cache.getVersion()) - } + watch := DeltaResponseWatch{Request: request, Response: value, subscription: sub} - cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscription: sub} + cache.mu.Lock() + defer cache.mu.Unlock() - return cache.cancelDeltaWatch(watchID), nil + response, err := cache.computeDeltaResponse(watch) + if err != nil { + return nil, fmt.Errorf("failed to compute the watch respnse: %w", err) } - return nil, nil -} + if response != nil { + cache.log.Debugf("[linear cache] replying to the delta watch (subscription values %v, known %v)", sub.SubscribedResources(), sub.ReturnedResources()) + watch.Response <- response + return nil, nil + } -func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { - if cache.versionMap == nil { - cache.versionMap = make(map[string]string, len(modified)) + watchID := cache.nextWatchID() + // Create open watches since versions are up to date. + if sub.IsWildcard() { + cache.log.Infof("[linear cache] open delta watch %d for all %s resources, system version %q", watchID, cache.typeURL, cache.getVersion()) + cache.wildcardWatches.delta[watchID] = watch + return func() { + cache.mu.Lock() + defer cache.mu.Unlock() + delete(cache.wildcardWatches.delta, watchID) + }, nil } - for name := range modified { - r, ok := cache.resources[name] - if !ok { - // The resource was deleted - delete(cache.versionMap, name) - continue - } - // hash our version in here and build the version map - marshaledResource, err := MarshalResource(r) - if err != nil { - return err - } - v := HashResource(marshaledResource) - if v == "" { - return errors.New("failed to build resource version") - } - cache.versionMap[name] = v + cache.log.Infof("[linear cache] open delta watch %d for %s resources %v, system version %q", watchID, cache.typeURL, sub.SubscribedResources(), cache.getVersion()) + for name := range sub.SubscribedResources() { + watches, exists := cache.resourceWatches[name] + if !exists { + watches = newWatches() + cache.resourceWatches[name] = watches + } + watches.delta[watchID] = watch } - return nil + return func() { + cache.mu.Lock() + defer cache.mu.Unlock() + cache.removeDeltaWatch(watchID, watch.subscription) + }, nil } func (cache *LinearCache) getVersion() string { @@ -578,23 +703,31 @@ func (cache *LinearCache) getVersion() string { } // cancellation function for cleaning stale watches -func (cache *LinearCache) cancelDeltaWatch(watchID int64) func() { - return func() { - cache.mu.Lock() - defer cache.mu.Unlock() - delete(cache.deltaWatches, watchID) +func (cache *LinearCache) removeDeltaWatch(watchID uint64, sub Subscription) { + // Make sure we clean the watch for ALL resources it might be associated with, + // as the channel will no longer be listened to + for resource := range sub.SubscribedResources() { + resourceWatches := cache.resourceWatches[resource] + delete(resourceWatches.delta, watchID) + if resourceWatches.empty() { + delete(cache.resourceWatches, resource) + } } } -func (cache *LinearCache) nextDeltaWatchID() int64 { - return atomic.AddInt64(&cache.deltaWatchCount, 1) +func (cache *LinearCache) nextWatchID() uint64 { + cache.currentWatchID++ + if cache.currentWatchID == 0 { + panic("watch id count overflow") + } + return cache.currentWatchID } func (cache *LinearCache) Fetch(context.Context, *Request) (Response, error) { return nil, errors.New("not implemented") } -// Number of resources currently on the cache. +// NumResources returns the number of resources currently in the cache. // As GetResources is building a clone it is expensive to get metrics otherwise. func (cache *LinearCache) NumResources() int { cache.mu.RLock() @@ -602,16 +735,30 @@ func (cache *LinearCache) NumResources() int { return len(cache.resources) } -// Number of active watches for a resource name. +// NumWatches returns the number of active sotw watches for a resource name. func (cache *LinearCache) NumWatches(name string) int { cache.mu.RLock() defer cache.mu.RUnlock() - return len(cache.watches[name]) + len(cache.watchAll) + return len(cache.resourceWatches[name].sotw) + len(cache.wildcardWatches.sotw) } -// Number of active delta watches. +// NumDeltaWatchesForResource returns the number of active delta watches for a resource name. +func (cache *LinearCache) NumDeltaWatchesForResource(name string) int { + cache.mu.RLock() + defer cache.mu.RUnlock() + return len(cache.resourceWatches[name].delta) + len(cache.wildcardWatches.delta) +} + +// NumDeltaWatches returns the total number of active delta watches. +// Warning: it is quite inefficient, and NumDeltaWatchesForResource should be preferred. func (cache *LinearCache) NumDeltaWatches() int { - cache.mu.Lock() - defer cache.mu.Unlock() - return len(cache.deltaWatches) + cache.mu.RLock() + defer cache.mu.RUnlock() + uniqueWatches := map[uint64]struct{}{} + for _, watches := range cache.resourceWatches { + for id := range watches.delta { + uniqueWatches[id] = struct{}{} + } + } + return len(uniqueWatches) + len(cache.wildcardWatches.delta) } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 4b3ed099b3..4ff844255e 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -15,7 +15,6 @@ package cache import ( - "errors" "fmt" "reflect" "testing" @@ -184,7 +183,7 @@ func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []reso var r DeltaResponse select { case r = <-ch: - case <-time.After(5 * time.Second): + case <-time.After(1 * time.Second): t.Error("timeout waiting for delta response") return nil } @@ -206,19 +205,17 @@ func checkDeltaWatchCount(t *testing.T, c *LinearCache, count int) { } } -func checkVersionMapNotSet(t *testing.T, c *LinearCache) { +func checkStableVersionsAreNotComputed(t *testing.T, c *LinearCache, resources ...string) { t.Helper() - if c.versionMap != nil { - t.Errorf("version map is set on the cache with %d elements", len(c.versionMap)) + for _, res := range resources { + assert.Empty(t, c.resources[res].stableVersion, "stable version not set on resource %s", res) } } -func checkVersionMapSet(t *testing.T, c *LinearCache) { +func checkStableVersionsAreComputed(t *testing.T, c *LinearCache, resources ...string) { t.Helper() - if c.versionMap == nil { - t.Errorf("version map is not set on the cache") - } else if len(c.versionMap) != len(c.resources) { - t.Errorf("version map has the wrong number of elements: %d instead of %d expected", len(c.versionMap), len(c.resources)) + for _, res := range resources { + assert.NotEmpty(t, c.resources[res].stableVersion, "stable version not set on resource %s", res) } } @@ -246,11 +243,7 @@ func hashResource(t *testing.T, resource types.Resource) string { if err != nil { t.Fatal(err) } - v := HashResource(marshaledResource) - if v == "" { - t.Fatal(errors.New("failed to build resource version")) - } - return v + return HashResource(marshaledResource) } func createWildcardDeltaWatch(t *testing.T, initialReq bool, c *LinearCache, w chan DeltaResponse) { @@ -299,7 +292,6 @@ func TestLinearInitialResources(t *testing.T) { _, err = c.CreateWatch(req, sub, w) require.NoError(t, err) verifyResponse(t, w, "0", 2) - checkVersionMapNotSet(t, c) } func TestLinearCornerCases(t *testing.T) { @@ -326,7 +318,6 @@ func TestLinearBasic(t *testing.T) { _, err := c.CreateWatch(req1, sub1, w1) require.NoError(t, err) mustBlock(t, w1) - checkVersionMapNotSet(t, c) w2 := make(chan Response, 1) req2 := &Request{TypeUrl: testType, VersionInfo: "0"} @@ -373,8 +364,6 @@ func TestLinearBasic(t *testing.T) { _, err = c.CreateWatch(req2, sub2, w2) require.NoError(t, err) verifyResponse(t, w2, "3", 2) - // Ensure the version map was not created as we only ever used stow watches - checkVersionMapNotSet(t, c) } func TestLinearSetResources(t *testing.T) { @@ -483,7 +472,7 @@ func TestLinearVersionPrefix(t *testing.T) { func TestLinearDeletion(t *testing.T) { t.Run("non full-state resource", func(t *testing.T) { - c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) + c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}), WithLogger(log.NewTestLogger(t))) w := make(chan Response, 1) req := &Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"} sub := subFromRequest(req) @@ -515,7 +504,7 @@ func TestLinearDeletion(t *testing.T) { require.NoError(t, err) // b is watched by wildcard, but for non-full-state resources we cannot report deletions mustBlock(t, w) - assert.Len(t, c.watchAll, 1) + assert.Len(t, c.wildcardWatches.sotw, 1) }) t.Run("full-state resource", func(t *testing.T) { @@ -717,7 +706,7 @@ func TestLinearDeltaWildcard(t *testing.T) { } func TestLinearDeltaExistingResources(t *testing.T) { - c := NewLinearCache(testType) + c := NewLinearCache(testType, WithLogger(log.NewTestLogger(t))) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) @@ -744,7 +733,7 @@ func TestLinearDeltaExistingResources(t *testing.T) { } func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { - c := NewLinearCache(testType) + c := NewLinearCache(testType, WithLogger(log.NewTestLogger(t))) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) @@ -786,7 +775,7 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) // There is currently no delta watch - checkVersionMapNotSet(t, c) + checkStableVersionsAreNotComputed(t, c, "a", "b") req := &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}} w := make(chan DeltaResponse, 1) @@ -794,7 +783,7 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) - checkVersionMapSet(t, c) + checkStableVersionsAreComputed(t, c, "a", "b") req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} w = make(chan DeltaResponse, 1) @@ -810,7 +799,7 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { err = c.UpdateResource("a", a) require.NoError(t, err) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) - checkVersionMapSet(t, c) + checkStableVersionsAreComputed(t, c, "a") } func TestLinearDeltaResourceDelete(t *testing.T) { @@ -850,7 +839,6 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c := NewLinearCache(testType) w := make(chan DeltaResponse, 1) - checkVersionMapNotSet(t, c) assert.Equal(t, 0, c.NumResources()) // Initial update @@ -860,8 +848,6 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) - // The version map should now be created, even if empty - checkVersionMapSet(t, c) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} @@ -870,7 +856,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { require.NoError(t, err) resp := <-w validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) - checkVersionMapSet(t, c) + checkStableVersionsAreComputed(t, c, "a", "b") assert.Equal(t, 2, c.NumResources()) sub.SetReturnedResources(resp.GetNextVersionMap()) @@ -893,7 +879,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { require.NoError(t, err) resp = <-w validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) - checkVersionMapSet(t, c) + checkStableVersionsAreComputed(t, c, "a", "b") assert.Equal(t, 2, c.NumResources()) sub.SetReturnedResources(resp.GetNextVersionMap()) @@ -913,7 +899,9 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { assert.NotContains(t, c.resources, "b", "resource with name b was found in cache") resp = <-w validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"}) - checkVersionMapSet(t, c) + checkStableVersionsAreComputed(t, c, "a") + // d is not watched currently + checkStableVersionsAreNotComputed(t, c, "d") assert.Equal(t, 2, c.NumResources()) sub.SetReturnedResources(resp.GetNextVersionMap()) @@ -930,7 +918,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { assert.NotContains(t, c.resources, "d", "resource with name d was found in cache") resp = <-w validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned - checkVersionMapSet(t, c) + checkStableVersionsAreComputed(t, c, "b") assert.Equal(t, 2, c.NumResources()) sub.SetReturnedResources(resp.GetNextVersionMap()) @@ -947,7 +935,8 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { err = c.UpdateResources(map[string]types.Resource{"b": b, "d": d}, nil) require.NoError(t, err) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"d", hashD}}, nil) - checkVersionMapSet(t, c) + // d is now watched and should be returned + checkStableVersionsAreComputed(t, c, "b", "d") assert.Equal(t, 3, c.NumResources()) // Wildcard update/delete @@ -964,8 +953,6 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, []string{"d"}) checkDeltaWatchCount(t, c, 0) - // Confirm that the map is still set even though there is currently no watch - checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) } @@ -987,7 +974,8 @@ func TestLinearMixedWatches(t *testing.T) { _, err = c.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) - checkVersionMapNotSet(t, c) + // Only sotw watches, should not have triggered stable resource computation + checkStableVersionsAreNotComputed(t, c, "a", "b") a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 25}, @@ -998,13 +986,13 @@ func TestLinearMixedWatches(t *testing.T) { // This behavior is currently invalid for cds and lds, but due to a current limitation of linear cache sotw implementation resp := verifyResponseResources(t, w, resource.EndpointType, c.getVersion(), "a") updateFromSotwResponse(resp, &sotwSub, sotwReq) - checkVersionMapNotSet(t, c) + checkStableVersionsAreNotComputed(t, c, "a", "b") sotwReq.VersionInfo = c.getVersion() _, err = c.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) - checkVersionMapNotSet(t, c) + checkStableVersionsAreNotComputed(t, c, "a", "b") deltaReq := &DeltaRequest{TypeUrl: resource.EndpointType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} wd := make(chan DeltaResponse, 1) @@ -1014,11 +1002,10 @@ func TestLinearMixedWatches(t *testing.T) { require.NoError(t, err) mustBlockDelta(t, wd) checkDeltaWatchCount(t, c, 1) - checkVersionMapSet(t, c) + checkStableVersionsAreComputed(t, c, "a", "b") err = c.UpdateResources(nil, []string{"b"}) require.NoError(t, err) - checkVersionMapSet(t, c) mustBlock(t, w) // For sotw with non full-state resources, we don't report deletions verifyDeltaResponse(t, wd, nil, []string{"b"}, responseType(resource.EndpointType)) } @@ -1045,11 +1032,10 @@ func TestLinearSotwWatches(t *testing.T) { _, err = cache.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) - checkVersionMapNotSet(t, cache) - assert.Len(t, cache.watches["a"], 1) - assert.Len(t, cache.watches["b"], 1) - assert.Len(t, cache.watches["c"], 1) + assert.Len(t, cache.resourceWatches["a"].sotw, 1) + assert.Len(t, cache.resourceWatches["b"].sotw, 1) + assert.Len(t, cache.resourceWatches["c"].sotw, 1) // Update a and c without touching b a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -1059,11 +1045,10 @@ func TestLinearSotwWatches(t *testing.T) { require.NoError(t, err) resp := verifyResponseResources(t, w, testType, cache.getVersion(), "a") updateFromSotwResponse(resp, &sotwSub, sotwReq) - checkVersionMapNotSet(t, cache) - assert.Empty(t, cache.watches["a"]) - assert.Empty(t, cache.watches["b"]) - assert.Empty(t, cache.watches["c"]) + assert.Empty(t, cache.resourceWatches["a"].sotw) + assert.Empty(t, cache.resourceWatches["b"].sotw) + assert.Empty(t, cache.resourceWatches["c"].sotw) // c no longer watched w = make(chan Response, 1) @@ -1072,21 +1057,19 @@ func TestLinearSotwWatches(t *testing.T) { _, err = cache.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) - checkVersionMapNotSet(t, cache) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 15}, }} err = cache.UpdateResources(map[string]types.Resource{"b": b}, nil) - assert.Empty(t, cache.watches["a"]) - assert.Empty(t, cache.watches["b"]) - assert.Empty(t, cache.watches["c"]) + assert.Empty(t, cache.resourceWatches["a"].sotw) + assert.Empty(t, cache.resourceWatches["b"].sotw) + assert.Empty(t, cache.resourceWatches["c"].sotw) require.NoError(t, err) resp = verifyResponseResources(t, w, testType, cache.getVersion(), "b") updateFromSotwResponse(resp, &sotwSub, sotwReq) - checkVersionMapNotSet(t, cache) w = make(chan Response, 1) sotwReq.ResourceNames = []string{"c"} @@ -1094,7 +1077,6 @@ func TestLinearSotwWatches(t *testing.T) { _, err = cache.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) - checkVersionMapNotSet(t, cache) c := &endpoint.ClusterLoadAssignment{ClusterName: "c", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 15}, @@ -1102,11 +1084,10 @@ func TestLinearSotwWatches(t *testing.T) { err = cache.UpdateResources(map[string]types.Resource{"c": c}, nil) require.NoError(t, err) verifyResponseResources(t, w, testType, cache.getVersion(), "c") - checkVersionMapNotSet(t, cache) - assert.Empty(t, cache.watches["a"]) - assert.Empty(t, cache.watches["b"]) - assert.Empty(t, cache.watches["c"]) + assert.Empty(t, cache.resourceWatches["a"].sotw) + assert.Empty(t, cache.resourceWatches["b"].sotw) + assert.Empty(t, cache.resourceWatches["c"].sotw) }) t.Run("watches return full state for types requesting it", func(t *testing.T) { @@ -1129,7 +1110,6 @@ func TestLinearSotwWatches(t *testing.T) { _, err := cache.CreateWatch(nonWildcardReq, nonWildcardSub, w1) require.NoError(t, err) mustBlock(t, w1) - checkVersionMapNotSet(t, cache) // wildcard request wildcardReq := &Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()} @@ -1139,7 +1119,6 @@ func TestLinearSotwWatches(t *testing.T) { _, err = cache.CreateWatch(wildcardReq, wildcardSub, w2) require.NoError(t, err) mustBlock(t, w2) - checkVersionMapNotSet(t, cache) // request not requesting b otherReq := &Request{ResourceNames: []string{"a", "c", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()} @@ -1149,7 +1128,6 @@ func TestLinearSotwWatches(t *testing.T) { _, err = cache.CreateWatch(otherReq, otherSub, w3) require.NoError(t, err) mustBlock(t, w3) - checkVersionMapNotSet(t, cache) b.AltStatName = "othername" err = cache.UpdateResources(map[string]types.Resource{"b": b}, nil) @@ -1319,12 +1297,12 @@ func TestLinearSotwNonWildcard(t *testing.T) { checkPendingWatch(4) // Cancel two watches to change resources - assert.Len(t, cache.watches["c"], 2) + assert.Len(t, cache.resourceWatches["c"].sotw, 2) c2() - assert.Len(t, cache.watches["c"], 1) - assert.Len(t, cache.watches["b"], 1) + assert.Len(t, cache.resourceWatches["c"].sotw, 1) + assert.Len(t, cache.resourceWatches["b"].sotw, 1) c3() - assert.Empty(t, cache.watches["b"]) + assert.Empty(t, cache.resourceWatches["b"].sotw) // Remove a resource from 2 (was a, c, d) updateReqResources(2, []string{"a", "d"}) @@ -1445,12 +1423,12 @@ func TestLinearSotwNonWildcard(t *testing.T) { checkPendingWatch(4) // Cancel two watches to change resources - assert.Len(t, cache.watches["c"], 2) + assert.Len(t, cache.resourceWatches["c"].sotw, 2) c2() - assert.Len(t, cache.watches["c"], 1) - assert.Len(t, cache.watches["b"], 1) + assert.Len(t, cache.resourceWatches["c"].sotw, 1) + assert.Len(t, cache.resourceWatches["b"].sotw, 1) c3() - assert.Empty(t, cache.watches["b"]) + assert.Empty(t, cache.resourceWatches["b"].sotw) // Remove a resource from 2 (was a, c, d) updateReqResources(2, []string{"a", "d"}) diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 887f687b1b..e043637961 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -590,10 +590,9 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, sub Subscrip // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, sub Subscription) (*RawDeltaResponse, error) { resp := createDeltaResponse(ctx, request, sub, resourceContainer{ - resourceMap: snapshot.GetResources(request.GetTypeUrl()), - versionMap: snapshot.GetVersionMap(request.GetTypeUrl()), - systemVersion: snapshot.GetVersion(request.GetTypeUrl()), - }) + resourceMap: snapshot.GetResources(request.GetTypeUrl()), + versionMap: snapshot.GetVersionMap(request.GetTypeUrl()), + }, snapshot.GetVersion(request.GetTypeUrl())) // Only send a response if there were changes // We want to respond immediately for the first wildcard request in a stream, even if the response is empty diff --git a/pkg/cache/v3/snapshot.go b/pkg/cache/v3/snapshot.go index 8770d6916a..bddcecb57f 100644 --- a/pkg/cache/v3/snapshot.go +++ b/pkg/cache/v3/snapshot.go @@ -188,16 +188,10 @@ func (s *Snapshot) ConstructVersionMap() error { } for _, r := range resources.Items { - // Hash our version in here and build the version map. - marshaledResource, err := MarshalResource(r.Resource) + v, err := computeResourceStableVersion(r.Resource) if err != nil { - return err - } - v := HashResource(marshaledResource) - if v == "" { return fmt.Errorf("failed to build resource version: %w", err) } - s.VersionMap[typeURL][GetResourceName(r.Resource)] = v } } diff --git a/pkg/server/stream/v3/subscription.go b/pkg/server/stream/v3/subscription.go index bf878ed788..5536c62ace 100644 --- a/pkg/server/stream/v3/subscription.go +++ b/pkg/server/stream/v3/subscription.go @@ -163,21 +163,6 @@ func (s Subscription) IsWildcard() bool { return s.wildcard } -// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription. -// If the request is wildcard, it will always return true, -// otherwise it will compare the provided resources to the list of resources currently subscribed -func (s Subscription) WatchesResources(resourceNames map[string]struct{}) bool { - if s.wildcard { - return true - } - for resourceName := range resourceNames { - if _, ok := s.subscribedResourceNames[resourceName]; ok { - return true - } - } - return false -} - // ReturnedResources returns the list of resources returned to the client // and their version func (s Subscription) ReturnedResources() map[string]string {