Skip to content

Commit

Permalink
[Linear cache] Support use of stable versions in sotw
Browse files Browse the repository at this point in the history
When using sotw watches, the current behavior of the linear cache is to use the current version of the cache (monotonically increased on resource updates) as the version returned. In the past the request-provided version was compared to the version of the last resource update to compute the returned resources, allowing watch resumption when a client would reconnect.
This behavior was actually not working as the subscribed resources could change while no cache updates occurred, or the newly requested resources were at an older cache version. PR #10 therefore no longer use this behavior to track resources to be returned, instead relying on the subscription state. A side effect is that watch resumption would always return all known resources.
Delta watches have a mechanism to avoid this issue, by tracking per resource version and sending them as part of the initial request of a new subscription. Sotw do not allow per resource version in requests and responses, but by encoding the current subscription state through a hash of the returned versions map, this PR now allows resumption if the hash matches the response we would otherwise return. It still has two main limitations: it is less efficient (as we compute an entire response to then not reply) and we cannot track which resource (if any) changed, and will therefore return them all if anything has changed.

Signed-off-by: Valerian Roche <[email protected]>
  • Loading branch information
valerian-roche committed Feb 6, 2024
1 parent e76a085 commit e1b22c6
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 50 deletions.
6 changes: 0 additions & 6 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ type Subscription interface {
// This considers subtleties related to the current migration of wildcard definitions within the protocol.
// More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return
IsWildcard() bool

// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription.
// It is currently only applicable to delta-xds.
// If the request is wildcard, it will always return true,
// otherwise it will compare the provided resources to the list of resources currently subscribed
WatchesResources(resourceNames map[string]struct{}) bool
}

// ConfigWatcher requests watches for configuration resources by a node, last
Expand Down
159 changes: 116 additions & 43 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package cache

import (
"context"
"encoding/hex"
"errors"
"fmt"
"hash/fnv"
"sort"
"strconv"
"strings"
"sync"
Expand All @@ -37,6 +40,45 @@ type cachedResource struct {
// resourceVersion is the version of the resource itself (built through stable marshaling).
// It is only set if computeResourceVersion is set to true on the cache.
resourceVersion string

// marshaledResource contains the marshaled version of the resource if it has been
// computed already. If it has not it will not be set
marshaledResource []byte
}

func newCachedResource(res types.Resource, cacheVersion string, computeStableVersion bool) (cachedResource, error) {
r := cachedResource{
Resource: res,
cacheVersion: cacheVersion,
}
if computeStableVersion {
if err := r.computeStableVersion(); err != nil {
return r, err
}
}
return r, nil
}

func (c cachedResource) getVersion(useStableVersion bool) string {
if useStableVersion {
return c.resourceVersion
}
return c.cacheVersion
}

func (c *cachedResource) computeStableVersion() error {
// hash our version in here and build the version map
marshaledResource, err := MarshalResource(c.Resource)
if err != nil {
return err
}
v := HashResource(marshaledResource)
if v == "" {
return errors.New("failed to build resource version")
}
c.marshaledResource = marshaledResource
c.resourceVersion = v
return nil
}

type watches struct {
Expand Down Expand Up @@ -89,6 +131,10 @@ type LinearCache struct {
// computeResourceVersion indicates whether the cache is currently computing and storing stable resource versions.
computeResourceVersion bool

// useStableVersionsInSotw switches to a new version model for sotw watches.
// The version is then encoding the known resources of the subscription
useStableVersionsInSotw bool

log log.Logger

mu sync.RWMutex
Expand Down Expand Up @@ -133,6 +179,17 @@ func WithComputeStableVersions() LinearCacheOption {
}
}

// WithSotwStableVersions changes the versions returned in sotw to encode the list of resources known
// in the subscription.
// The use of stable versions for sotw also deduplicates updates to clients if the cache updates are
// not changing the content of the resource.
func WithSotwStableVersions() LinearCacheOption {
return func(cache *LinearCache) {
cache.computeResourceVersion = true
cache.useStableVersionsInSotw = true
}
}

// NewLinearCache creates a new cache. See the comments on the struct definition.
func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
out := &LinearCache{
Expand All @@ -150,11 +207,10 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
for name, resource := range out.resources {
resource.cacheVersion = out.getVersion()
if out.computeResourceVersion {
version, err := computeResourceStableVersion(resource.Resource)
err := resource.computeStableVersion()
if err != nil {
out.log.Errorf("failed to build stable versions for resource %s: %s", name, err)
}
resource.resourceVersion = version
}
out.resources[name] = resource
}
Expand All @@ -171,18 +227,13 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, ignoreReturned
knownVersions = make(map[string]string)
}

getVersion := func(c cachedResource) string { return c.cacheVersion }
if useStableVersion {
getVersion = func(c cachedResource) string { return c.resourceVersion }
}

if sub.IsWildcard() {
for resourceName, resource := range cache.resources {
knownVersion, ok := knownVersions[resourceName]
if !ok {
// This resource is not yet known by the client (new resource added in the cache or newly subscribed).
changedResources = append(changedResources, resourceName)
} else if knownVersion != getVersion(resource) {
} else if knownVersion != resource.getVersion(useStableVersion) {
// The client knows an outdated version.
changedResources = append(changedResources, resourceName)
}
Expand Down Expand Up @@ -212,7 +263,7 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, ignoreReturned
if !known {
// This resource is not yet known by the client (new resource added in the cache or newly subscribed).
changedResources = append(changedResources, resourceName)
} else if knownVersion != getVersion(res) {
} else if knownVersion != res.getVersion(useStableVersion) {
// The client knows an outdated version.
changedResources = append(changedResources, resourceName)
}
Expand All @@ -230,8 +281,26 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, ignoreReturned
return changedResources, removedResources
}

func computeSotwStableVersion(versionMap map[string]string) string {
keys := make([]string, 0, len(versionMap))
for key := range versionMap {
keys = append(keys, key)
}
sort.Strings(keys)

// Reuse the hash used on resources.
hasher := fnv.New64a()
for _, key := range keys {
hasher.Write([]byte(key))
hasher.Write([]byte("/"))
hasher.Write([]byte(versionMap[key]))
hasher.Write([]byte("^"))
}
return hex.EncodeToString(hasher.Sum(nil))
}

func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturnedResources bool) *RawResponse {
changedResources, removedResources := cache.computeResourceChange(watch.subscription, ignoreReturnedResources, false)
changedResources, removedResources := cache.computeResourceChange(watch.subscription, ignoreReturnedResources, cache.useStableVersionsInSotw)
if len(changedResources) == 0 && len(removedResources) == 0 && !ignoreReturnedResources {
// Nothing changed.
return nil
Expand All @@ -257,14 +326,14 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne
for _, resourceName := range changedResources {
cachedResource := cache.resources[resourceName]
resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource})
returnedVersions[resourceName] = cachedResource.cacheVersion
returnedVersions[resourceName] = cachedResource.getVersion(cache.useStableVersionsInSotw)
}
case watch.subscription.IsWildcard():
// Include all resources for the type.
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
for resourceName, cachedResource := range cache.resources {
resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource})
returnedVersions[resourceName] = cachedResource.cacheVersion
returnedVersions[resourceName] = cachedResource.getVersion(cache.useStableVersionsInSotw)
}
default:
// Include all resources matching the subscription, with no concern on whether
Expand All @@ -279,7 +348,7 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne
continue
}
resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource})
returnedVersions[resourceName] = cachedResource.cacheVersion
returnedVersions[resourceName] = cachedResource.getVersion(cache.useStableVersionsInSotw)
}
}

Expand All @@ -297,11 +366,16 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturne
return nil
}

responseVersion := cacheVersion
if cache.useStableVersionsInSotw {
responseVersion = cache.versionPrefix + computeSotwStableVersion(returnedVersions)
}

return &RawResponse{
Request: watch.Request,
Resources: resources,
ReturnedResources: returnedVersions,
Version: cacheVersion,
Version: responseVersion,
Ctx: context.Background(),
}
}
Expand All @@ -322,9 +396,9 @@ func (cache *LinearCache) computeDeltaResponse(watch DeltaResponseWatch) *RawDel
cacheVersion := cache.getVersion()
resources := make([]types.Resource, 0, len(changedResources))
for _, resourceName := range changedResources {
resource := cache.resources[resourceName]
resources = append(resources, resource.Resource)
returnedVersions[resourceName] = resource.resourceVersion
cachedResource := cache.resources[resourceName]
resources = append(resources, cachedResource.Resource)
returnedVersions[resourceName] = cachedResource.getVersion(true)
}
// Cleanup resources no longer existing in the cache or no longer subscribed.
for _, resourceName := range removedResources {
Expand Down Expand Up @@ -397,32 +471,12 @@ func (cache *LinearCache) notifyAll(modified []string) {
}
}

func computeResourceStableVersion(res types.Resource) (string, error) {
// hash our version in here and build the version map
marshaledResource, err := MarshalResource(res)
if err != nil {
return "", err
}
v := HashResource(marshaledResource)
if v == "" {
return "", errors.New("failed to build resource version")
}
return v, nil
}

func (cache *LinearCache) addResourceToCache(name string, res types.Resource) error {
update := cachedResource{
Resource: res,
cacheVersion: cache.getVersion(),
}
if cache.computeResourceVersion {
version, err := computeResourceStableVersion(res)
if err != nil {
return err
}
update.resourceVersion = version
r, err := newCachedResource(res, cache.getVersion(), cache.computeResourceVersion)
if err != nil {
return err
}
cache.resources[name] = update
cache.resources[name] = r
return nil
}

Expand Down Expand Up @@ -561,7 +615,27 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value
defer cache.mu.Unlock()

response := cache.computeSotwResponse(watch, ignoreCurrentSubscriptionResources)
shouldReply := false
if response != nil {
// If the request
// - is the first
// - provides a non-empty version, matching the version prefix
// and the cache uses stable versions, if the generated versions are the same as the previous one, we do not return the response.
// This avoids resending all data if the new subscription is just a resumption of the previous one.
if cache.useStableVersionsInSotw && request.GetResponseNonce() == "" && !ignoreCurrentSubscriptionResources {
shouldReply = request.GetVersionInfo() != response.Version

// We confirmed the content of the known resources, store them in the watch we create.
subscription := newWatchSubscription(sub)
subscription.returnedResources = response.ReturnedResources
watch.subscription = subscription
sub = subscription
} else {
shouldReply = true
}
}

if shouldReply {
cache.log.Debugf("[linear cache] replying to the watch with resources %v (subscription values %v, known %v)", response.GetReturnedResources(), sub.SubscribedResources(), sub.ReturnedResources())
watch.Response <- response
return func() {}, nil
Expand Down Expand Up @@ -626,11 +700,10 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscripti
cache.log.Infof("[linear cache] activating stable resource version computation for %s", cache.typeURL)

for name, cachedResource := range cache.resources {
version, err := computeResourceStableVersion(cachedResource.Resource)
err := cachedResource.computeStableVersion()
if err != nil {
return func() {}, fmt.Errorf("failed to build stable versions for resource %s: %w", name, err)
}
cachedResource.resourceVersion = version
cache.resources[name] = cachedResource
}
}
Expand Down
Loading

0 comments on commit e1b22c6

Please sign in to comment.