Skip to content

Commit

Permalink
xdsclient: stop caching xdsChannels for potential reuse after all ref…
Browse files Browse the repository at this point in the history
…erences are released
  • Loading branch information
easwars committed Dec 12, 2024
1 parent c1b6b37 commit a2e8a67
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 349 deletions.
12 changes: 3 additions & 9 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,12 +928,12 @@ func (s) TestResolverError(t *testing.T) {
}
}

// Tests that closing the cds LB policy results in the cluster resource watch
// being cancelled and the child policy being closed.
// Tests that closing the cds LB policy results in the the child policy being
// closed.
func (s) TestClose(t *testing.T) {
cdsBalancerCh := registerWrappedCDSPolicy(t)
_, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, _, _, _, cdsResourceCanceledCh := setupWithManagementServer(t)
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)

// Start a test service backend.
server := stubserver.StartTestService(t, nil)
Expand Down Expand Up @@ -967,12 +967,6 @@ func (s) TestClose(t *testing.T) {
}
cdsBal.Close()

// Wait for the CDS resource to be not requested anymore.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for CDS resource to be not requested")
case <-cdsResourceCanceledCh:
}
// Wait for the child policy to be closed.
select {
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
// immediately as well.
if state.md.Status == xdsresource.ServiceStatusNACKed {
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q was NACKed: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
a.logger.Infof("Resource type %q with resource name %q was NACKed", rType.TypeName(), resourceName)
}
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) })
}
Expand Down Expand Up @@ -687,7 +687,7 @@ func (a *authority) unwatchResource(rType xdsresource.Type, resourceName string,
delete(state.watchers, watcher)
if len(state.watchers) > 0 {
if a.logger.V(2) {
a.logger.Infof("%d more watchers exist for type %q, resource name %q", rType.TypeName(), resourceName)
a.logger.Infof("More watchers exist for type %q, resource name %q", rType.TypeName(), resourceName)
}
return
}
Expand Down
15 changes: 3 additions & 12 deletions xds/internal/xdsclient/client_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
xdsclientinternal "google.golang.org/grpc/xds/internal/xdsclient/internal"
Expand Down Expand Up @@ -61,11 +60,11 @@ func New(name string) (XDSClient, func(), error) {
if err != nil {
return nil, nil, fmt.Errorf("xds: failed to get xDS bootstrap config: %v", err)
}
return newRefCounted(name, config, defaultWatchExpiryTimeout, defaultIdleChannelExpiryTimeout, backoff.DefaultExponential.Backoff)
return newRefCounted(name, config, defaultWatchExpiryTimeout, backoff.DefaultExponential.Backoff)
}

// newClientImpl returns a new xdsClient with the given config.
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &clientImpl{
done: grpcsync.NewEvent(),
Expand All @@ -78,7 +77,6 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpi
transportBuilder: &grpctransport.Builder{},
resourceTypes: newResourceTypeRegistry(),
xdsActiveChannels: make(map[string]*channelState),
xdsIdleChannels: cache.NewTimeoutCache(idleChannelExpiryTimeout),
}

for name, cfg := range config.Authorities() {
Expand Down Expand Up @@ -121,10 +119,6 @@ type OptionsForTesting struct {
// unspecified, uses the default value used in non-test code.
WatchExpiryTimeout time.Duration

// IdleChannelExpiryTimeout is the timeout before idle xdsChannels are
// deleted. If unspecified, uses the default value used in non-test code.
IdleChannelExpiryTimeout time.Duration

// StreamBackoffAfterFailure is the backoff function used to determine the
// backoff duration after stream failures.
// If unspecified, uses the default value used in non-test code.
Expand All @@ -147,9 +141,6 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) {
if opts.WatchExpiryTimeout == 0 {
opts.WatchExpiryTimeout = defaultWatchExpiryTimeout
}
if opts.IdleChannelExpiryTimeout == 0 {
opts.IdleChannelExpiryTimeout = defaultIdleChannelExpiryTimeout
}
if opts.StreamBackoffAfterFailure == nil {
opts.StreamBackoffAfterFailure = defaultStreamBackoffFunc
}
Expand All @@ -158,7 +149,7 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) {
if err != nil {
return nil, nil, err
}
return newRefCounted(opts.Name, config, opts.WatchExpiryTimeout, opts.IdleChannelExpiryTimeout, opts.StreamBackoffAfterFailure)
return newRefCounted(opts.Name, config, opts.WatchExpiryTimeout, opts.StreamBackoffAfterFailure)
}

// GetForTesting returns an xDS client created earlier using the given name.
Expand Down
9 changes: 3 additions & 6 deletions xds/internal/xdsclient/client_refcounted.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ import (
"google.golang.org/grpc/internal/xds/bootstrap"
)

const (
defaultWatchExpiryTimeout = 15 * time.Second
defaultIdleChannelExpiryTimeout = 5 * time.Minute
)
const defaultWatchExpiryTimeout = 15 * time.Second

var (
// The following functions are no-ops in the actual code, but can be
Expand Down Expand Up @@ -62,7 +59,7 @@ func clientRefCountedClose(name string) {
// newRefCounted creates a new reference counted xDS client implementation for
// name, if one does not exist already. If an xDS client for the given name
// exists, it gets a reference to it and returns it.
func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) {
func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) {
clientsMu.Lock()
defer clientsMu.Unlock()

Expand All @@ -72,7 +69,7 @@ func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, id
}

// Create the new client implementation.
c, err := newClientImpl(config, watchExpiryTimeout, idleChannelExpiryTimeout, streamBackoff)
c, err := newClientImpl(config, watchExpiryTimeout, streamBackoff)
if err != nil {
return nil, nil, err
}
Expand Down
77 changes: 8 additions & 69 deletions xds/internal/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
Expand Down Expand Up @@ -63,14 +62,9 @@ type clientImpl struct {
// these channels, and forwards updates from the channels to each of these
// authorities.
//
// Once all references to a channel are dropped, the channel is moved to the
// idle cache where it lives for a configured duration before being closed.
// If the channel is required before the idle timeout fires, it is revived
// from the idle cache and used.
// Once all references to a channel are dropped, the channel is closed.
channelsMu sync.Mutex
xdsActiveChannels map[string]*channelState // Map from server config to in-use xdsChannels.
xdsIdleChannels *cache.TimeoutCache // Map from server config to idle xdsChannels.
closeCond *sync.Cond
}

// channelState represents the state of an xDS channel. It tracks the number of
Expand Down Expand Up @@ -173,21 +167,6 @@ func (c *clientImpl) close() {
c.close()
}

// Similarly, closing idle channels cannot be done with the lock held, for
// the same reason as described above. So, we clear the idle cache in a
// goroutine and use a condition variable to wait on the condition that the
// idle cache has zero entries. The Wait() method on the condition variable
// releases the lock and blocks the goroutine until signaled (which happens
// when an idle channel is removed from the cache and closed), and grabs the
// lock before returning.
c.channelsMu.Lock()
c.closeCond = sync.NewCond(&c.channelsMu)
go c.xdsIdleChannels.Clear(true)
for c.xdsIdleChannels.Len() > 0 {
c.closeCond.Wait()
}
c.channelsMu.Unlock()

c.serializerClose()
<-c.serializer.Done()

Expand Down Expand Up @@ -289,23 +268,11 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in
c.logger.Infof("Received request for a reference to an xdsChannel for server config %q", serverConfig)
}

// Use an active channel, if one exists for this server config.
// Use an existing channel, if one exists for this server config.
if state, ok := c.xdsActiveChannels[serverConfig.String()]; ok {
if c.logger.V(2) {
c.logger.Infof("Reusing an active xdsChannel for server config %q", serverConfig)
}
initLocked(state)
return state.channel, c.releaseChannel(serverConfig, state, deInitLocked), nil
}

// If an idle channel exists for this server config, remove it from the
// idle cache and add it to the map of active channels, and return it.
if s, ok := c.xdsIdleChannels.Remove(serverConfig.String()); ok {
if c.logger.V(2) {
c.logger.Infof("Reviving an xdsChannel from the idle cache for server config %q", serverConfig)
c.logger.Infof("Reusing an existing xdsChannel for server config %q", serverConfig)
}
state := s.(*channelState)
c.xdsActiveChannels[serverConfig.String()] = state
initLocked(state)
return state.channel, c.releaseChannel(serverConfig, state, deInitLocked), nil
}
Expand Down Expand Up @@ -345,9 +312,7 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in
}

// releaseChannel is a function that is called when a reference to an xdsChannel
// needs to be released. It handles the logic of moving the channel to an idle
// cache if there are no other active references, and closing the channel if it
// remains in the idle cache for the configured duration.
// needs to be released. It handles closing channels with no active references.
//
// The function takes the following parameters:
// - serverConfig: the server configuration for the xdsChannel
Expand All @@ -360,7 +325,6 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in
func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state *channelState, deInitLocked func(*channelState)) func() {
return grpcsync.OnceFunc(func() {
c.channelsMu.Lock()
defer c.channelsMu.Unlock()

if c.logger.V(2) {
c.logger.Infof("Received request to release a reference to an xdsChannel for server config %q", serverConfig)
Expand All @@ -372,40 +336,15 @@ func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state
if c.logger.V(2) {
c.logger.Infof("xdsChannel %p has other active references", state.channel)
}
c.channelsMu.Unlock()
return
}

// Move the channel to the idle cache instead of closing
// immediately. If the channel remains in the idle cache for
// the configured duration, it will get closed.
delete(c.xdsActiveChannels, serverConfig.String())
if c.logger.V(2) {
c.logger.Infof("Moving xdsChannel [%p] for server config %s to the idle cache", state.channel, serverConfig)
c.logger.Infof("Closing xdsChannel [%p] for server config %s", state.channel, serverConfig)
}

// The idle cache expiry timeout results in the channel getting
// closed in another serializer callback.
c.xdsIdleChannels.Add(serverConfig.String(), state, grpcsync.OnceFunc(func() {
c.channelsMu.Lock()
channelToClose := state.channel
c.channelsMu.Unlock()

if c.logger.V(2) {
c.logger.Infof("Idle cache expiry timeout fired for xdsChannel [%p] for server config %s", state.channel, serverConfig)
}
channelToClose.close()

// If the channel is being closed as a result of the xDS client
// being closed, closeCond is non-nil and we need to signal from
// here to unblock Close(). Holding the lock is not necessary
// to call Signal() on a condition variable. But the field
// `c.closeCond` needs to guarded by the lock, which is why we
// acquire it here.
c.channelsMu.Lock()
if c.closeCond != nil {
c.closeCond.Signal()
}
c.channelsMu.Unlock()
}))
c.channelsMu.Unlock()
state.channel.close()
})
}
49 changes: 26 additions & 23 deletions xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,9 @@ func (s) TestADS_NACK_InvalidFirstResponse(t *testing.T) {
// 1. A resource is requested and a good response is received. The test verifies
// that an ACK is sent for this resource.
// 2. The previously requested resource is no longer requested. The test
// verifies that a request with no resource names is sent out.
// verifies that the connection to the management server is closed.
// 3. The same resource is requested again. The test verifies that the request
// is sent with the previously ACKed version.
// is sent with an empty version string.
func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -349,7 +349,9 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) {
// the test goroutine to verify ACK version and nonce.
streamRequestCh := testutils.NewChannel()
streamResponseCh := testutils.NewChannel()
lis := testutils.NewListenerWrapper(t, nil)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
streamRequestCh.SendContext(ctx, req)
return nil
Expand Down Expand Up @@ -390,6 +392,14 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) {
ldsCancel := xdsresource.WatchListener(client, listenerName, lw)
defer ldsCancel()

// Grab the wrapped connection from the listener wrapper. This will be used
// to verify the connection is closed.
val, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
}
conn := val.(*testutils.ConnWrapper)

// Verify that the initial discovery request matches expectation.
r, err := streamRequestCh.Receive(ctx)
if err != nil {
Expand Down Expand Up @@ -425,9 +435,10 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) {
t.Fatal("Timeout when waiting for ACK")
}
gotReq = r.(*v3discoverypb.DiscoveryRequest)
wantReq.VersionInfo = gotResp.GetVersionInfo()
wantReq.ResponseNonce = gotResp.GetNonce()
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
wantACKReq := proto.Clone(wantReq).(*v3discoverypb.DiscoveryRequest)
wantACKReq.VersionInfo = gotResp.GetVersionInfo()
wantACKReq.ResponseNonce = gotResp.GetNonce()
if diff := cmp.Diff(gotReq, wantACKReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}

Expand All @@ -442,39 +453,31 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) {
t.Fatal(err)
}

// Cancel the watch on the listener resource. This should result in a
// discovery request with no resource names.
// Cancel the watch on the listener resource. This should result in the
// existing connection to be management server getting closed.
ldsCancel()

// Verify that the discovery request matches expectation.
r, err = streamRequestCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for discovery request")
}
gotReq = r.(*v3discoverypb.DiscoveryRequest)
wantReq.ResourceNames = nil
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
if _, err := conn.CloseCh.Receive(ctx); err != nil {
t.Fatalf("Timeout when expecting existing connection to be closed: %v", err)
}

// Register a watch for the same listener resource.
lw = newListenerWatcher()
ldsCancel = xdsresource.WatchListener(client, listenerName, lw)
defer ldsCancel()

// Verify that the discovery request contains the version from the
// previously received response.
// Verify that the discovery request is identical to the first one sent out
// to the management server.
r, err = streamRequestCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for discovery request")
}
gotReq = r.(*v3discoverypb.DiscoveryRequest)
wantReq.ResourceNames = []string{listenerName}
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}

// TODO(https://github.com/envoyproxy/go-control-plane/issues/1002): Once
// this bug is fixed, we need to verify that the update is received by the
// watcher.
// Verify the update received by the watcher.
if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
}
Loading

0 comments on commit a2e8a67

Please sign in to comment.