Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(delta): force eds response after cds change #8

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetResourceVersions()[name]
if !found || (prevVersion != version) {
if !found || (prevVersion != version) || state.ShouldForcePushResource(name) {
filtered = append(filtered, r)
}
}
Expand All @@ -67,7 +67,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
prevVersion, found := state.GetResourceVersions()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
if prevVersion != nextVersion || state.ShouldForcePushResource(name) {
filtered = append(filtered, r)
}
nextVersionMap[name] = nextVersion
Expand Down
89 changes: 89 additions & 0 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/durationpb"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
Expand Down Expand Up @@ -296,3 +297,91 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
t.Errorf("should not return a status for unknown key: got %#v", s)
}
}

func TestSnapshotCacheDeltaWatchWithForceEDSOfRelevantEndpoints(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
watches := make(map[string]chan cache.DeltaResponse)

// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(true, nil), watches[typ])
}

if err := c.SetSnapshot(context.Background(), key, fixture.snapshotTwoClusters()); err != nil {
t.Fatal(err)
}

versionMap := make(map[string]map[string]string)
for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
select {
case out := <-watches[typ]:
snapshot := fixture.snapshotTwoClusters()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
vMap := out.GetNextVersionMap()
versionMap[typ] = vMap
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
}
})
}

// On re-request we want to use non-wildcard so we can verify the logic path of not requesting
// all resources as well as individual resource removals
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, versionMap[typ])
for resource := range versionMap[typ] {
state.GetSubscribedResourceNames()[resource] = struct{}{}
}
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, watches[typ])
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
t.Errorf("watches should be created for the latest version, saw %d watches expected %d", count, len(testTypes))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be useful to print the types that were missing?

}

// set partially-versioned snapshot
snapshot2 := fixture.snapshotTwoClusters()
cluster := resource.MakeCluster(resource.Ads, clusterName)
cluster.ConnectTimeout = durationpb.New(99 * time.Second)
snapshot2.Resources[types.Cluster] = cache.NewResources(fixture.version2, []types.Resource{cluster, anotherTestCluster})
if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil {
t.Fatal(err)
}
if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes)-2 {
t.Errorf("watches should be preserved for all but two, got: %d open watches instead of the expected %d open watches", count, len(testTypes)-1)
}
Comment on lines +365 to +367
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this, we check equality for -2 but we print -1 why? also, nitpick can we have 1/2 as a const and name it descriptively?


// validate response for endpoints
select {
case out := <-watches[testTypes[1]]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assign this testTypes[1] to a variable so I know what type that is? is this rsrc.ClusterType?

snapshot2 := fixture.snapshotTwoClusters()
snapshot2.Resources[types.Cluster] = cache.NewResources(fixture.version2, []types.Resource{cluster})
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.ClusterType))
vMap := out.GetNextVersionMap()
versionMap[testTypes[1]] = vMap
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we writing something when this code block is supposed to validate things? 🤔

case out := <-watches[testTypes[0]]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above ☝️

for _, resource := range out.(*cache.RawDeltaResponse).Resources {
// should send only endpoint of the changed cluster
assert.Equal(t, resource, testEndpoint)
}
vMap := out.GetNextVersionMap()
versionMap[testTypes[0]] = vMap
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
}
}
22 changes: 22 additions & 0 deletions pkg/cache/v3/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,25 @@ func (f *fixtureGenerator) snapshot() *cache.Snapshot {

return snapshot
}

func (f *fixtureGenerator) snapshotTwoClusters() *cache.Snapshot {
snapshot, err := cache.NewSnapshot(
f.version,
map[rsrc.Type][]types.Resource{
rsrc.EndpointType: {testEndpoint, anotherTestEndpoint},
rsrc.ClusterType: {testCluster, anotherTestCluster},
rsrc.RouteType: {testRoute, testEmbeddedRoute},
rsrc.ScopedRouteType: {testScopedRoute},
rsrc.VirtualHostType: {testVirtualHost},
rsrc.ListenerType: {testScopedListener, testListener},
rsrc.RuntimeType: {testRuntime},
rsrc.SecretType: {testSecret[0]},
rsrc.ExtensionConfigType: {testExtensionConfig},
},
)
if err != nil {
panic(err.Error())
}

return snapshot
}
3 changes: 3 additions & 0 deletions pkg/cache/v3/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

const (
clusterName = "cluster0"
anotherClusterName = "cluster1"
routeName = "route0"
embeddedRouteName = "embeddedRoute0"
scopedRouteName = "scopedRoute0"
Expand All @@ -45,7 +46,9 @@ const (

var (
testEndpoint = resource.MakeEndpoint(clusterName, 8080)
anotherTestEndpoint = resource.MakeEndpoint(anotherClusterName, 9090)
testCluster = resource.MakeCluster(resource.Ads, clusterName)
anotherTestCluster = resource.MakeCluster(resource.Ads, anotherClusterName)
testRoute = resource.MakeRouteConfig(routeName, clusterName)
testEmbeddedRoute = resource.MakeRouteConfig(embeddedRouteName, clusterName)
testScopedRoute = resource.MakeScopedRouteConfig(scopedRouteName, routeName, []string{"1.2.3.4"})
Expand Down
18 changes: 18 additions & 0 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,14 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseDeltaWatches()
forcePushResources := map[types.ResponseType][]string{}
for _, key := range info.orderedDeltaWatches {
watch := info.deltaWatches[key.ID]
responseType := GetResponseType(watch.Request.TypeUrl)
if resources, found := forcePushResources[responseType]; found {
watch.StreamState.SetForcePushResource(resources)
}

res, err := cache.respondDelta(
ctx,
snapshot,
Expand All @@ -319,6 +325,9 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu
// so we don't want to respond or remove any existing resource watches
if res != nil {
delete(info.deltaWatches, key.ID)
if indexed, typ, ok := getForcePushEdsResourcesNames(responseType, res); ok {
forcePushResources[typ] = indexed
}
}
}
} else {
Expand Down Expand Up @@ -496,6 +505,15 @@ func (cache *snapshotCache) respond(ctx context.Context, request *Request, value
}
}

func getForcePushEdsResourcesNames(typ types.ResponseType, response *RawDeltaResponse) ([]string, types.ResponseType, bool) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need type? it's either endpoint or unknown and we already have the last boolean 🤔

switch typ {
case types.Cluster:
return GetResourceNames(response.Resources), types.Endpoint, true
default:
return []string{}, types.UnknownType, false
}
}

func createResponse(ctx context.Context, request *Request, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) Response {
filtered := make([]types.ResourceWithTTL, 0, len(resources))

Expand Down
19 changes: 19 additions & 0 deletions pkg/server/stream/v3/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ type StreamState struct { // nolint:golint,revive

// Ordered indicates whether we want an ordered ADS stream or not
ordered bool

// ForcePush indicates if should push the response even when the version is the same
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// ForcePush indicates if should push the response even when the version is the same
// ForcePush indicates if the control plane should push the response even when the version is the same

// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#resource-warming
// This is required in the situation:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This is required in the situation:
// This is required in the following situations:

// 1) There is a Cluster change and control plane responds with CDS
// 2) Envoy has a cluster in the warming phase until there is a EDS response, if endpoints haven't changed
// there is no EDS send and changes to the clusters are blocked
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// there is no EDS send and changes to the clusters are blocked
// there is no EDS sent and changes to the clusters are blocked

forcePushResource map[string]bool
}

// NewStreamState initializes a stream state.
Expand All @@ -54,6 +62,7 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St
first: true,
knownResourceNames: map[string]map[string]struct{}{},
ordered: false, // Ordered comes from the first request since that's when we discover if they want ADS
forcePushResource: map[string]bool{},
}

if initialResourceVersions == nil {
Expand Down Expand Up @@ -97,6 +106,12 @@ func (s *StreamState) SetWildcard(wildcard bool) {
s.wildcard = wildcard
}

func (s *StreamState) SetForcePushResource(forcePushResources []string) {
for _, resName := range forcePushResources {
s.forcePushResource[resName] = true
}
}

// GetResourceVersions returns a map of current resources grouped by type URL.
func (s *StreamState) GetResourceVersions() map[string]string {
return s.resourceVersions
Expand All @@ -119,6 +134,10 @@ func (s *StreamState) IsWildcard() bool {
return s.wildcard
}

func (s *StreamState) ShouldForcePushResource(resourceName string) bool {
return s.forcePushResource[resourceName]
}

// GetKnownResourceNames returns the current known list of resources on a SOTW stream.
func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} {
return s.knownResourceNames[url]
Expand Down