From a2e8a67baa9d87f6388265e6361313b3d79386fb Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 12 Dec 2024 01:46:47 +0000 Subject: [PATCH] xdsclient: stop caching xdsChannels for potential reuse after all references are released --- .../balancer/cdsbalancer/cdsbalancer_test.go | 12 +- xds/internal/xdsclient/authority.go | 4 +- xds/internal/xdsclient/client_new.go | 15 +-- xds/internal/xdsclient/client_refcounted.go | 9 +- xds/internal/xdsclient/clientimpl.go | 77 ++--------- .../tests/ads_stream_ack_nack_test.go | 49 +++---- .../tests/ads_stream_restart_test.go | 49 +------ .../xdsclient/tests/authority_test.go | 127 ++---------------- xds/internal/xdsclient/tests/fallback_test.go | 27 ++-- xds/internal/xdsclient/tests/helpers_test.go | 7 +- .../xdsclient/tests/lds_watchers_test.go | 50 +------ 11 files changed, 77 insertions(+), 349 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index bd9cd5573805..75abbe81fc3d 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -928,12 +928,12 @@ func (s) TestResolverError(t *testing.T) { } } -// Tests that closing the cds LB policy results in the cluster resource watch -// being cancelled and the child policy being closed. +// Tests that closing the cds LB policy results in the the child policy being +// closed. func (s) TestClose(t *testing.T) { cdsBalancerCh := registerWrappedCDSPolicy(t) _, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, _, cdsResourceCanceledCh := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -967,12 +967,6 @@ func (s) TestClose(t *testing.T) { } cdsBal.Close() - // Wait for the CDS resource to be not requested anymore. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for CDS resource to be not requested") - case <-cdsResourceCanceledCh: - } // Wait for the child policy to be closed. select { case <-ctx.Done(): diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 24673a8d9077..9262b905a7ea 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -646,7 +646,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // immediately as well. if state.md.Status == xdsresource.ServiceStatusNACKed { if a.logger.V(2) { - a.logger.Infof("Resource type %q with resource name %q was NACKed: %s", rType.TypeName(), resourceName, state.cache.ToJSON()) + a.logger.Infof("Resource type %q with resource name %q was NACKed", rType.TypeName(), resourceName) } a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) }) } @@ -687,7 +687,7 @@ func (a *authority) unwatchResource(rType xdsresource.Type, resourceName string, delete(state.watchers, watcher) if len(state.watchers) > 0 { if a.logger.V(2) { - a.logger.Infof("%d more watchers exist for type %q, resource name %q", rType.TypeName(), resourceName) + a.logger.Infof("More watchers exist for type %q, resource name %q", rType.TypeName(), resourceName) } return } diff --git a/xds/internal/xdsclient/client_new.go b/xds/internal/xdsclient/client_new.go index d8f9d6c9417b..55299c457b25 100644 --- a/xds/internal/xdsclient/client_new.go +++ b/xds/internal/xdsclient/client_new.go @@ -26,7 +26,6 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/backoff" - "google.golang.org/grpc/internal/cache" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/bootstrap" xdsclientinternal "google.golang.org/grpc/xds/internal/xdsclient/internal" @@ -61,11 +60,11 @@ func New(name string) (XDSClient, func(), error) { if err != nil { return nil, nil, fmt.Errorf("xds: failed to get xDS bootstrap config: %v", err) } - return newRefCounted(name, config, defaultWatchExpiryTimeout, defaultIdleChannelExpiryTimeout, backoff.DefaultExponential.Backoff) + return newRefCounted(name, config, defaultWatchExpiryTimeout, backoff.DefaultExponential.Backoff) } // newClientImpl returns a new xdsClient with the given config. -func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) { +func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) { ctx, cancel := context.WithCancel(context.Background()) c := &clientImpl{ done: grpcsync.NewEvent(), @@ -78,7 +77,6 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpi transportBuilder: &grpctransport.Builder{}, resourceTypes: newResourceTypeRegistry(), xdsActiveChannels: make(map[string]*channelState), - xdsIdleChannels: cache.NewTimeoutCache(idleChannelExpiryTimeout), } for name, cfg := range config.Authorities() { @@ -121,10 +119,6 @@ type OptionsForTesting struct { // unspecified, uses the default value used in non-test code. WatchExpiryTimeout time.Duration - // IdleChannelExpiryTimeout is the timeout before idle xdsChannels are - // deleted. If unspecified, uses the default value used in non-test code. - IdleChannelExpiryTimeout time.Duration - // StreamBackoffAfterFailure is the backoff function used to determine the // backoff duration after stream failures. // If unspecified, uses the default value used in non-test code. @@ -147,9 +141,6 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) { if opts.WatchExpiryTimeout == 0 { opts.WatchExpiryTimeout = defaultWatchExpiryTimeout } - if opts.IdleChannelExpiryTimeout == 0 { - opts.IdleChannelExpiryTimeout = defaultIdleChannelExpiryTimeout - } if opts.StreamBackoffAfterFailure == nil { opts.StreamBackoffAfterFailure = defaultStreamBackoffFunc } @@ -158,7 +149,7 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) { if err != nil { return nil, nil, err } - return newRefCounted(opts.Name, config, opts.WatchExpiryTimeout, opts.IdleChannelExpiryTimeout, opts.StreamBackoffAfterFailure) + return newRefCounted(opts.Name, config, opts.WatchExpiryTimeout, opts.StreamBackoffAfterFailure) } // GetForTesting returns an xDS client created earlier using the given name. diff --git a/xds/internal/xdsclient/client_refcounted.go b/xds/internal/xdsclient/client_refcounted.go index 1c105ac4e061..5a256c1bfada 100644 --- a/xds/internal/xdsclient/client_refcounted.go +++ b/xds/internal/xdsclient/client_refcounted.go @@ -27,10 +27,7 @@ import ( "google.golang.org/grpc/internal/xds/bootstrap" ) -const ( - defaultWatchExpiryTimeout = 15 * time.Second - defaultIdleChannelExpiryTimeout = 5 * time.Minute -) +const defaultWatchExpiryTimeout = 15 * time.Second var ( // The following functions are no-ops in the actual code, but can be @@ -62,7 +59,7 @@ func clientRefCountedClose(name string) { // newRefCounted creates a new reference counted xDS client implementation for // name, if one does not exist already. If an xDS client for the given name // exists, it gets a reference to it and returns it. -func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) { +func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) { clientsMu.Lock() defer clientsMu.Unlock() @@ -72,7 +69,7 @@ func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, id } // Create the new client implementation. - c, err := newClientImpl(config, watchExpiryTimeout, idleChannelExpiryTimeout, streamBackoff) + c, err := newClientImpl(config, watchExpiryTimeout, streamBackoff) if err != nil { return nil, nil, err } diff --git a/xds/internal/xdsclient/clientimpl.go b/xds/internal/xdsclient/clientimpl.go index df0949e23cc7..29a844f14f12 100644 --- a/xds/internal/xdsclient/clientimpl.go +++ b/xds/internal/xdsclient/clientimpl.go @@ -25,7 +25,6 @@ import ( "sync/atomic" "time" - "google.golang.org/grpc/internal/cache" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/bootstrap" @@ -63,14 +62,9 @@ type clientImpl struct { // these channels, and forwards updates from the channels to each of these // authorities. // - // Once all references to a channel are dropped, the channel is moved to the - // idle cache where it lives for a configured duration before being closed. - // If the channel is required before the idle timeout fires, it is revived - // from the idle cache and used. + // Once all references to a channel are dropped, the channel is closed. channelsMu sync.Mutex xdsActiveChannels map[string]*channelState // Map from server config to in-use xdsChannels. - xdsIdleChannels *cache.TimeoutCache // Map from server config to idle xdsChannels. - closeCond *sync.Cond } // channelState represents the state of an xDS channel. It tracks the number of @@ -173,21 +167,6 @@ func (c *clientImpl) close() { c.close() } - // Similarly, closing idle channels cannot be done with the lock held, for - // the same reason as described above. So, we clear the idle cache in a - // goroutine and use a condition variable to wait on the condition that the - // idle cache has zero entries. The Wait() method on the condition variable - // releases the lock and blocks the goroutine until signaled (which happens - // when an idle channel is removed from the cache and closed), and grabs the - // lock before returning. - c.channelsMu.Lock() - c.closeCond = sync.NewCond(&c.channelsMu) - go c.xdsIdleChannels.Clear(true) - for c.xdsIdleChannels.Len() > 0 { - c.closeCond.Wait() - } - c.channelsMu.Unlock() - c.serializerClose() <-c.serializer.Done() @@ -289,23 +268,11 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in c.logger.Infof("Received request for a reference to an xdsChannel for server config %q", serverConfig) } - // Use an active channel, if one exists for this server config. + // Use an existing channel, if one exists for this server config. if state, ok := c.xdsActiveChannels[serverConfig.String()]; ok { if c.logger.V(2) { - c.logger.Infof("Reusing an active xdsChannel for server config %q", serverConfig) - } - initLocked(state) - return state.channel, c.releaseChannel(serverConfig, state, deInitLocked), nil - } - - // If an idle channel exists for this server config, remove it from the - // idle cache and add it to the map of active channels, and return it. - if s, ok := c.xdsIdleChannels.Remove(serverConfig.String()); ok { - if c.logger.V(2) { - c.logger.Infof("Reviving an xdsChannel from the idle cache for server config %q", serverConfig) + c.logger.Infof("Reusing an existing xdsChannel for server config %q", serverConfig) } - state := s.(*channelState) - c.xdsActiveChannels[serverConfig.String()] = state initLocked(state) return state.channel, c.releaseChannel(serverConfig, state, deInitLocked), nil } @@ -345,9 +312,7 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in } // releaseChannel is a function that is called when a reference to an xdsChannel -// needs to be released. It handles the logic of moving the channel to an idle -// cache if there are no other active references, and closing the channel if it -// remains in the idle cache for the configured duration. +// needs to be released. It handles closing channels with no active references. // // The function takes the following parameters: // - serverConfig: the server configuration for the xdsChannel @@ -360,7 +325,6 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state *channelState, deInitLocked func(*channelState)) func() { return grpcsync.OnceFunc(func() { c.channelsMu.Lock() - defer c.channelsMu.Unlock() if c.logger.V(2) { c.logger.Infof("Received request to release a reference to an xdsChannel for server config %q", serverConfig) @@ -372,40 +336,15 @@ func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state if c.logger.V(2) { c.logger.Infof("xdsChannel %p has other active references", state.channel) } + c.channelsMu.Unlock() return } - // Move the channel to the idle cache instead of closing - // immediately. If the channel remains in the idle cache for - // the configured duration, it will get closed. delete(c.xdsActiveChannels, serverConfig.String()) if c.logger.V(2) { - c.logger.Infof("Moving xdsChannel [%p] for server config %s to the idle cache", state.channel, serverConfig) + c.logger.Infof("Closing xdsChannel [%p] for server config %s", state.channel, serverConfig) } - - // The idle cache expiry timeout results in the channel getting - // closed in another serializer callback. - c.xdsIdleChannels.Add(serverConfig.String(), state, grpcsync.OnceFunc(func() { - c.channelsMu.Lock() - channelToClose := state.channel - c.channelsMu.Unlock() - - if c.logger.V(2) { - c.logger.Infof("Idle cache expiry timeout fired for xdsChannel [%p] for server config %s", state.channel, serverConfig) - } - channelToClose.close() - - // If the channel is being closed as a result of the xDS client - // being closed, closeCond is non-nil and we need to signal from - // here to unblock Close(). Holding the lock is not necessary - // to call Signal() on a condition variable. But the field - // `c.closeCond` needs to guarded by the lock, which is why we - // acquire it here. - c.channelsMu.Lock() - if c.closeCond != nil { - c.closeCond.Signal() - } - c.channelsMu.Unlock() - })) + c.channelsMu.Unlock() + state.channel.close() }) } diff --git a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go index feebc1adc915..71237bdd81ad 100644 --- a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go @@ -337,9 +337,9 @@ func (s) TestADS_NACK_InvalidFirstResponse(t *testing.T) { // 1. A resource is requested and a good response is received. The test verifies // that an ACK is sent for this resource. // 2. The previously requested resource is no longer requested. The test -// verifies that a request with no resource names is sent out. +// verifies that the connection to the management server is closed. // 3. The same resource is requested again. The test verifies that the request -// is sent with the previously ACKed version. +// is sent with an empty version string. func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -349,7 +349,9 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { // the test goroutine to verify ACK version and nonce. streamRequestCh := testutils.NewChannel() streamResponseCh := testutils.NewChannel() + lis := testutils.NewListenerWrapper(t, nil) mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { streamRequestCh.SendContext(ctx, req) return nil @@ -390,6 +392,14 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { ldsCancel := xdsresource.WatchListener(client, listenerName, lw) defer ldsCancel() + // Grab the wrapped connection from the listener wrapper. This will be used + // to verify the connection is closed. + val, err := lis.NewConnCh.Receive(ctx) + if err != nil { + t.Fatalf("Failed to receive new connection from wrapped listener: %v", err) + } + conn := val.(*testutils.ConnWrapper) + // Verify that the initial discovery request matches expectation. r, err := streamRequestCh.Receive(ctx) if err != nil { @@ -425,9 +435,10 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { t.Fatal("Timeout when waiting for ACK") } gotReq = r.(*v3discoverypb.DiscoveryRequest) - wantReq.VersionInfo = gotResp.GetVersionInfo() - wantReq.ResponseNonce = gotResp.GetNonce() - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + wantACKReq := proto.Clone(wantReq).(*v3discoverypb.DiscoveryRequest) + wantACKReq.VersionInfo = gotResp.GetVersionInfo() + wantACKReq.ResponseNonce = gotResp.GetNonce() + if diff := cmp.Diff(gotReq, wantACKReq, protocmp.Transform()); diff != "" { t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) } @@ -442,19 +453,11 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { t.Fatal(err) } - // Cancel the watch on the listener resource. This should result in a - // discovery request with no resource names. + // Cancel the watch on the listener resource. This should result in the + // existing connection to be management server getting closed. ldsCancel() - - // Verify that the discovery request matches expectation. - r, err = streamRequestCh.Receive(ctx) - if err != nil { - t.Fatal("Timeout when waiting for discovery request") - } - gotReq = r.(*v3discoverypb.DiscoveryRequest) - wantReq.ResourceNames = nil - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + if _, err := conn.CloseCh.Receive(ctx); err != nil { + t.Fatalf("Timeout when expecting existing connection to be closed: %v", err) } // Register a watch for the same listener resource. @@ -462,19 +465,19 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { ldsCancel = xdsresource.WatchListener(client, listenerName, lw) defer ldsCancel() - // Verify that the discovery request contains the version from the - // previously received response. + // Verify that the discovery request is identical to the first one sent out + // to the management server. r, err = streamRequestCh.Receive(ctx) if err != nil { t.Fatal("Timeout when waiting for discovery request") } gotReq = r.(*v3discoverypb.DiscoveryRequest) - wantReq.ResourceNames = []string{listenerName} if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) } - // TODO(https://github.com/envoyproxy/go-control-plane/issues/1002): Once - // this bug is fixed, we need to verify that the update is received by the - // watcher. + // Verify the update received by the watcher. + if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } } diff --git a/xds/internal/xdsclient/tests/ads_stream_restart_test.go b/xds/internal/xdsclient/tests/ads_stream_restart_test.go index f0da932f5fd8..7e15218ea403 100644 --- a/xds/internal/xdsclient/tests/ads_stream_restart_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_restart_test.go @@ -124,6 +124,7 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { // Register a watch for a listener resource. lw := newListenerWatcher() ldsCancel := xdsresource.WatchListener(client, listenerName, lw) + defer ldsCancel() // Verify that an ADS stream is opened and an LDS request with the above // resource name is sent. @@ -147,52 +148,6 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { t.Fatal(err) } - // Cancel the watch for the above listener resource, and verify that an LDS - // request with no resource names is sent. - ldsCancel() - if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{}); err != nil { - t.Fatal(err) - } - - // Stop the restartable listener and wait for the stream to close. - lis.Stop() - select { - case <-streamClosed: - case <-ctx.Done(): - t.Fatal("Timeout when waiting for ADS stream to close") - } - - // Restart the restartable listener and wait for the stream to open. - lis.Restart() - select { - case <-streamOpened: - case <-ctx.Done(): - t.Fatal("Timeout when waiting for ADS stream to open") - } - - // Wait for a short duration and verify that no LDS request is sent, since - // there are no resources being watched. - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - select { - case <-sCtx.Done(): - case names := <-ldsResourcesCh: - t.Fatalf("LDS request sent for resource names %v, when expecting no request", names) - } - - // Register another watch for the same listener resource, and verify that an - // LDS request with the above resource name is sent. - ldsCancel = xdsresource.WatchListener(client, listenerName, lw) - if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { - t.Fatal(err) - } - defer ldsCancel() - - // Verify the update received by the watcher. - if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil { - t.Fatal(err) - } - // Create a cluster resource on the management server, in addition to the // existing listener resource. const clusterName = "cluster" @@ -255,7 +210,7 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { // Wait for a short duration and verify that no CDS request is sent, since // there are no resources being watched. - sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() select { case <-sCtx.Done(): diff --git a/xds/internal/xdsclient/tests/authority_test.go b/xds/internal/xdsclient/tests/authority_test.go index 1947daf6dffe..3570be59d46f 100644 --- a/xds/internal/xdsclient/tests/authority_test.go +++ b/xds/internal/xdsclient/tests/authority_test.go @@ -23,7 +23,6 @@ import ( "encoding/json" "fmt" "testing" - "time" "github.com/google/uuid" "google.golang.org/grpc/internal/testutils" @@ -65,7 +64,7 @@ var ( // // Returns two listeners used by the default and non-default management servers // respectively, and the xDS client and its close function. -func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.Duration) (*testutils.ListenerWrapper, *testutils.ListenerWrapper, xdsclient.XDSClient, func()) { +func setupForAuthorityTests(ctx context.Context, t *testing.T) (*testutils.ListenerWrapper, *testutils.ListenerWrapper, xdsclient.XDSClient, func()) { // Create listener wrappers which notify on to a channel whenever a new // connection is accepted. We use this to track the number of transports // used by the xDS client. @@ -102,10 +101,9 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time. t.Fatalf("Failed to create bootstrap configuration: %v", err) } client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, - WatchExpiryTimeout: defaultTestWatchExpiryTimeout, - IdleChannelExpiryTimeout: idleTimeout, + Name: t.Name(), + Contents: bootstrapContents, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) @@ -137,7 +135,7 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time. func (s) TestAuthority_XDSChannelSharing(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - lis, _, client, close := setupForAuthorityTests(ctx, t, time.Duration(0)) + lis, _, client, close := setupForAuthorityTests(ctx, t) defer close() // Verify that no connection is established to the management server at this @@ -176,13 +174,12 @@ func (s) TestAuthority_XDSChannelSharing(t *testing.T) { } } -// Test the xdsChannel idle timeout logic. The test verifies that the xDS client -// does not close xdsChannels immediately after the last watch is canceled, but -// waits for the configured idle timeout to expire before closing them. -func (s) TestAuthority_XDSChannelIdleTimeout(t *testing.T) { +// Test the xdsChannel close logic. The test verifies that the xDS client +// closes an xdsChannel immediately after the last watch is canceled. +func (s) TestAuthority_XDSChannelClose(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - lis, _, client, close := setupForAuthorityTests(ctx, t, defaultTestIdleChannelExpiryTimeout) + lis, _, client, close := setupForAuthorityTests(ctx, t) defer close() // Request the first resource. Verify that a new transport is created. @@ -203,110 +200,10 @@ func (s) TestAuthority_XDSChannelIdleTimeout(t *testing.T) { } // Cancel both watches, and verify that the connection to the management - // server is not closed immediately. + // server is closed. cdsCancel1() cdsCancel2() - sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := conn.CloseCh.Receive(sCtx); err != context.DeadlineExceeded { - t.Fatal("Connection to management server closed unexpectedly") - } - - // Ensure the transport is closed once the idle timeout fires. - select { - case <-conn.CloseCh.C: - case <-time.After(2 * defaultTestIdleChannelExpiryTimeout): - t.Fatal("Connection to management server not closed after idle timeout expiry") - } -} - -// Tests that xdsChannels in use and in the idle cache are all closed when the -// xDS client is closed. -func (s) TestAuthority_XDSChannelCloseOnClientClose(t *testing.T) { - // Set the idle timeout to twice the defaultTestTimeout. This will ensure - // that idle channels stay in the cache for the duration of this test, until - // explicitly closed. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - lisDefault, lisNonDefault, client, close := setupForAuthorityTests(ctx, t, time.Duration(2*defaultTestTimeout)) - - // Request the first resource. Verify that a new transport is created to the - // default management server. - watcher := noopClusterWatcher{} - cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher) - val, err := lisDefault.NewConnCh.Receive(ctx) - if err != nil { - t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) - } - connDefault := val.(*testutils.ConnWrapper) - - // Request another resource which is served by the non-default authority. - // Verify that a new transport is created to the non-default management - // server. - xdsresource.WatchCluster(client, authorityTestResourceName3, watcher) - val, err = lisNonDefault.NewConnCh.Receive(ctx) - if err != nil { - t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) - } - connNonDefault := val.(*testutils.ConnWrapper) - - // Cancel the first watch. This should move the default authority to the - // idle cache, but the connection should not be closed yet, because the idle - // timeout would not have fired. - cdsCancel1() - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := connDefault.CloseCh.Receive(sCtx); err != context.DeadlineExceeded { - t.Fatal("Connection to management server closed unexpectedly") - } - - // Closing the xDS client should close the connection to both management - // servers, even though we have an open watch to one of them. - close() - if _, err := connDefault.CloseCh.Receive(ctx); err != nil { - t.Fatal("Connection to management server not closed after client close") - } - if _, err := connNonDefault.CloseCh.Receive(ctx); err != nil { - t.Fatal("Connection to management server not closed after client close") - } -} - -// Tests that an xdsChannel in the idle cache is revived when a new watch is -// started on an authority. -func (s) TestAuthority_XDSChannelRevive(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - lis, _, client, close := setupForAuthorityTests(ctx, t, defaultTestIdleChannelExpiryTimeout) - defer close() - - // Request the first resource. Verify that a new transport is created. - watcher := noopClusterWatcher{} - cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher) - val, err := lis.NewConnCh.Receive(ctx) - if err != nil { - t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) - } - conn := val.(*testutils.ConnWrapper) - - // Cancel the above watch. This should move the authority to the idle cache. - cdsCancel1() - - // Request the second resource. Verify that no new transport is created. - // This should move the authority out of the idle cache. - cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher) - defer cdsCancel2() - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := lis.NewConnCh.Receive(sCtx); err != context.DeadlineExceeded { - t.Fatal("Unexpected new transport created to management server") - } - - // Wait for double the idle timeout, and the connection to the management - // server should not be closed, since it was revived from the idle cache. - time.Sleep(2 * defaultTestIdleChannelExpiryTimeout) - sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := conn.CloseCh.Receive(sCtx); err != context.DeadlineExceeded { - t.Fatal("Connection to management server closed unexpectedly") + if _, err := conn.CloseCh.Receive(ctx); err != nil { + t.Fatal("Timeout when waiting for connection to management server to be closed") } } diff --git a/xds/internal/xdsclient/tests/fallback_test.go b/xds/internal/xdsclient/tests/fallback_test.go index 514945f833d0..18c382dbd146 100644 --- a/xds/internal/xdsclient/tests/fallback_test.go +++ b/xds/internal/xdsclient/tests/fallback_test.go @@ -160,13 +160,10 @@ func (s) TestFallback_OnStartup(t *testing.T) { t.Fatalf("Failed to create bootstrap file: %v", err) } - // Create an xDS client with the above bootstrap configuration and a short - // idle channel expiry timeout. This ensures that connections to lower - // priority servers get closed quickly, for the test to verify. + // Create an xDS client with the above bootstrap configuration. xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, - IdleChannelExpiryTimeout: defaultTestIdleChannelExpiryTimeout, + Name: t.Name(), + Contents: bootstrapContents, }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -363,13 +360,10 @@ func (s) TestFallback_MidUpdate(t *testing.T) { t.Fatalf("Failed to create bootstrap file: %v", err) } - // Create an xDS client with the above bootstrap configuration and a short - // idle channel expiry timeout. This ensures that connections to lower - // priority servers get closed quickly, for the test to verify. + // Create an xDS client with the above bootstrap configuration. xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, - IdleChannelExpiryTimeout: defaultTestIdleChannelExpiryTimeout, + Name: t.Name(), + Contents: bootstrapContents, }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -556,13 +550,10 @@ func (s) TestFallback_MidStartup(t *testing.T) { t.Fatalf("Failed to create bootstrap file: %v", err) } - // Create an xDS client with the above bootstrap configuration and a short - // idle channel expiry timeout. This ensures that connections to lower - // priority servers get closed quickly, for the test to verify. + // Create an xDS client with the above bootstrap configuration. xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, - IdleChannelExpiryTimeout: defaultTestIdleChannelExpiryTimeout, + Name: t.Name(), + Contents: bootstrapContents, }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/xdsclient/tests/helpers_test.go b/xds/internal/xdsclient/tests/helpers_test.go index 5c4175b04df0..ab060d539a57 100644 --- a/xds/internal/xdsclient/tests/helpers_test.go +++ b/xds/internal/xdsclient/tests/helpers_test.go @@ -36,10 +36,9 @@ func Test(t *testing.T) { } const ( - defaultTestWatchExpiryTimeout = 500 * time.Millisecond - defaultTestIdleChannelExpiryTimeout = 50 * time.Millisecond - defaultTestTimeout = 10 * time.Second - defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. + defaultTestWatchExpiryTimeout = 500 * time.Millisecond + defaultTestTimeout = 10 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. ldsName = "xdsclient-test-lds-resource" rdsName = "xdsclient-test-rds-resource" diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 7b49b9b17b74..3fcc385c2851 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -1029,7 +1029,6 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - // Verify that the expected error is propagated to the watcher. // Verify that the expected error is propagated to the existing watcher. if err := verifyUnknownListenerError(ctx, lw.updateCh, wantListenerNACKErr); err != nil { t.Fatal(err) @@ -1039,45 +1038,18 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { lw2 := newListenerWatcher() ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) defer ldsCancel2() - // Verify that the expected error is propagated to the existing watcher. if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil { t.Fatal(err) } } -// TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is -// registered for a resource which is already present in the cache with an old -// good update as well as latest NACK error. The test verifies that new watcher -// receives both good update and error without a new resource request being -// sent to the management server. +// Tests the scenario where a watch registered for a resource results in a good +// update followed by a bad update. This results in the resource cache +// containing both the old good update and the latest NACK error. The test +// verifies that a when a new watch is registered for the same resource, the new +// watcher receives the good update followed by the NACK error. func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { - firstRequestReceived := false - firstAckReceived := grpcsync.NewEvent() - secondAckReceived := grpcsync.NewEvent() - secondRequestReceived := grpcsync.NewEvent() - - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { - // The first request has an empty version string. - if !firstRequestReceived && req.GetVersionInfo() == "" { - firstRequestReceived = true - return nil - } - // The first ack has a non-empty version string. - if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" { - firstAckReceived.Fire() - return nil - } - // The second ack has a non-empty version string. - if !secondAckReceived.HasFired() && req.GetVersionInfo() != "" { - secondAckReceived.Fire() - return nil - } - // Any requests after the first request and two acks, are not expected. - secondRequestReceived.Fire() - return nil - }, - }) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) @@ -1150,16 +1122,6 @@ func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil { t.Fatal(err) } - - // No request should get sent out as part of this watch. - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - select { - case <-sCtx.Done(): - case <-secondRequestReceived.Done(): - t.Fatal("xdsClient sent out request instead of using update from cache") - default: - } } // TestLDSWatch_PartialValid covers the case where a response from the