Skip to content

Commit

Permalink
xdsclient: stop caching xdsChannels for potential reuse, after all re…
Browse files Browse the repository at this point in the history
…ferences are released (#7924)
  • Loading branch information
easwars authored Dec 13, 2024
1 parent 7ee073d commit 3f76275
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 355 deletions.
11 changes: 11 additions & 0 deletions internal/testutils/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ func (c *Channel) ReceiveOrFail() (any, bool) {
}
}

// Drain drains the channel by repeatedly reading from it until it is empty.
func (c *Channel) Drain() {
for {
select {
case <-c.C:
default:
return
}
}
}

// Receive returns the value received on the underlying channel, or the error
// returned by ctx if it is closed or cancelled.
func (c *Channel) Receive(ctx context.Context) (any, error) {
Expand Down
40 changes: 27 additions & 13 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -202,11 +203,18 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer {
// - a channel used to signal that previously requested cluster resources are
// no longer requested
func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
return setupWithManagementServerAndListener(t, nil)
}

// Same as setupWithManagementServer, but also allows the caller to specify
// a listener to be used by the management server.
func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
t.Helper()

cdsResourceRequestedCh := make(chan []string, 1)
cdsResourceCanceledCh := make(chan struct{}, 1)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
if req.GetTypeUrl() == version.V3ClusterURL {
switch len(req.GetResourceNames()) {
Expand Down Expand Up @@ -807,11 +815,20 @@ func (s) TestClusterUpdate_Failure(t *testing.T) {
// TRANSIENT_FAILURE. It is also expected to cancel the CDS watch.
func (s) TestResolverError(t *testing.T) {
_, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
lis := testutils.NewListenerWrapper(t, nil)
mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListener(t, lis)

// Verify that the specified cluster resource is requested.
// Grab the wrapped connection from the listener wrapper. This will be used
// to verify the connection is closed.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
}
conn := val.(*testutils.ConnWrapper)

// Verify that the specified cluster resource is requested.
wantNames := []string{clusterName}
if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
t.Fatal(err)
Expand All @@ -831,7 +848,7 @@ func (s) TestResolverError(t *testing.T) {

// Ensure that the resolver error is propagated to the RPC caller.
client := testgrpc.NewTestServiceClient(cc)
_, err := client.EmptyCall(ctx, &testpb.Empty{})
_, err = client.EmptyCall(ctx, &testpb.Empty{})
if code := status.Code(err); code != codes.Unavailable {
t.Fatalf("EmptyCall() failed with code: %v, want %v", code, codes.Unavailable)
}
Expand Down Expand Up @@ -901,11 +918,14 @@ func (s) TestResolverError(t *testing.T) {
resolverErr = xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds resource not found error")
r.ReportError(resolverErr)

// Wait for the CDS resource to be not requested anymore.
// Wait for the CDS resource to be not requested anymore, or the connection
// to the management server to be closed (which happens as part of the last
// resource watch being canceled).
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for CDS resource to be not requested")
case <-cdsResourceCanceledCh:
case <-conn.CloseCh.C:
}

// Verify that the resolver error is pushed to the child policy.
Expand All @@ -928,12 +948,12 @@ func (s) TestResolverError(t *testing.T) {
}
}

// Tests that closing the cds LB policy results in the cluster resource watch
// being cancelled and the child policy being closed.
// Tests that closing the cds LB policy results in the the child policy being
// closed.
func (s) TestClose(t *testing.T) {
cdsBalancerCh := registerWrappedCDSPolicy(t)
_, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, _, _, _, cdsResourceCanceledCh := setupWithManagementServer(t)
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)

// Start a test service backend.
server := stubserver.StartTestService(t, nil)
Expand Down Expand Up @@ -967,12 +987,6 @@ func (s) TestClose(t *testing.T) {
}
cdsBal.Close()

// Wait for the CDS resource to be not requested anymore.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for CDS resource to be not requested")
case <-cdsResourceCanceledCh:
}
// Wait for the child policy to be closed.
select {
case <-ctx.Done():
Expand Down
13 changes: 10 additions & 3 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,16 +639,23 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
}
// state can only be accessed in the context of an
// xdsClientSerializer callback. Hence making a copy of the cached
// resource here for watchCallbackSerializer.
resource := state.cache
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) })
}
// If last update was NACK'd, notify the new watcher of error
// immediately as well.
if state.md.Status == xdsresource.ServiceStatusNACKed {
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q was NACKed: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
a.logger.Infof("Resource type %q with resource name %q was NACKed", rType.TypeName(), resourceName)
}
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) })
// state can only be accessed in the context of an
// xdsClientSerializer callback. Hence making a copy of the error
// here for watchCallbackSerializer.
err := state.md.ErrState.Err
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) })
}
// If the metadata field is updated to indicate that the management
// server does not have this resource, notify the new watcher.
Expand Down Expand Up @@ -687,7 +694,7 @@ func (a *authority) unwatchResource(rType xdsresource.Type, resourceName string,
delete(state.watchers, watcher)
if len(state.watchers) > 0 {
if a.logger.V(2) {
a.logger.Infof("%d more watchers exist for type %q, resource name %q", rType.TypeName(), resourceName)
a.logger.Infof("Other watchers exist for type %q, resource name %q", rType.TypeName(), resourceName)
}
return
}
Expand Down
15 changes: 3 additions & 12 deletions xds/internal/xdsclient/client_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
xdsclientinternal "google.golang.org/grpc/xds/internal/xdsclient/internal"
Expand Down Expand Up @@ -61,11 +60,11 @@ func New(name string) (XDSClient, func(), error) {
if err != nil {
return nil, nil, fmt.Errorf("xds: failed to get xDS bootstrap config: %v", err)
}
return newRefCounted(name, config, defaultWatchExpiryTimeout, defaultIdleChannelExpiryTimeout, backoff.DefaultExponential.Backoff)
return newRefCounted(name, config, defaultWatchExpiryTimeout, backoff.DefaultExponential.Backoff)
}

// newClientImpl returns a new xdsClient with the given config.
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &clientImpl{
done: grpcsync.NewEvent(),
Expand All @@ -78,7 +77,6 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpi
transportBuilder: &grpctransport.Builder{},
resourceTypes: newResourceTypeRegistry(),
xdsActiveChannels: make(map[string]*channelState),
xdsIdleChannels: cache.NewTimeoutCache(idleChannelExpiryTimeout),
}

for name, cfg := range config.Authorities() {
Expand Down Expand Up @@ -121,10 +119,6 @@ type OptionsForTesting struct {
// unspecified, uses the default value used in non-test code.
WatchExpiryTimeout time.Duration

// IdleChannelExpiryTimeout is the timeout before idle xdsChannels are
// deleted. If unspecified, uses the default value used in non-test code.
IdleChannelExpiryTimeout time.Duration

// StreamBackoffAfterFailure is the backoff function used to determine the
// backoff duration after stream failures.
// If unspecified, uses the default value used in non-test code.
Expand All @@ -147,9 +141,6 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) {
if opts.WatchExpiryTimeout == 0 {
opts.WatchExpiryTimeout = defaultWatchExpiryTimeout
}
if opts.IdleChannelExpiryTimeout == 0 {
opts.IdleChannelExpiryTimeout = defaultIdleChannelExpiryTimeout
}
if opts.StreamBackoffAfterFailure == nil {
opts.StreamBackoffAfterFailure = defaultStreamBackoffFunc
}
Expand All @@ -158,7 +149,7 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) {
if err != nil {
return nil, nil, err
}
return newRefCounted(opts.Name, config, opts.WatchExpiryTimeout, opts.IdleChannelExpiryTimeout, opts.StreamBackoffAfterFailure)
return newRefCounted(opts.Name, config, opts.WatchExpiryTimeout, opts.StreamBackoffAfterFailure)
}

// GetForTesting returns an xDS client created earlier using the given name.
Expand Down
9 changes: 3 additions & 6 deletions xds/internal/xdsclient/client_refcounted.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ import (
"google.golang.org/grpc/internal/xds/bootstrap"
)

const (
defaultWatchExpiryTimeout = 15 * time.Second
defaultIdleChannelExpiryTimeout = 5 * time.Minute
)
const defaultWatchExpiryTimeout = 15 * time.Second

var (
// The following functions are no-ops in the actual code, but can be
Expand Down Expand Up @@ -62,7 +59,7 @@ func clientRefCountedClose(name string) {
// newRefCounted creates a new reference counted xDS client implementation for
// name, if one does not exist already. If an xDS client for the given name
// exists, it gets a reference to it and returns it.
func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) {
func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) {
clientsMu.Lock()
defer clientsMu.Unlock()

Expand All @@ -72,7 +69,7 @@ func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, id
}

// Create the new client implementation.
c, err := newClientImpl(config, watchExpiryTimeout, idleChannelExpiryTimeout, streamBackoff)
c, err := newClientImpl(config, watchExpiryTimeout, streamBackoff)
if err != nil {
return nil, nil, err
}
Expand Down
77 changes: 9 additions & 68 deletions xds/internal/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
Expand Down Expand Up @@ -63,14 +62,9 @@ type clientImpl struct {
// these channels, and forwards updates from the channels to each of these
// authorities.
//
// Once all references to a channel are dropped, the channel is moved to the
// idle cache where it lives for a configured duration before being closed.
// If the channel is required before the idle timeout fires, it is revived
// from the idle cache and used.
// Once all references to a channel are dropped, the channel is closed.
channelsMu sync.Mutex
xdsActiveChannels map[string]*channelState // Map from server config to in-use xdsChannels.
xdsIdleChannels *cache.TimeoutCache // Map from server config to idle xdsChannels.
closeCond *sync.Cond
}

// channelState represents the state of an xDS channel. It tracks the number of
Expand Down Expand Up @@ -173,21 +167,6 @@ func (c *clientImpl) close() {
c.close()
}

// Similarly, closing idle channels cannot be done with the lock held, for
// the same reason as described above. So, we clear the idle cache in a
// goroutine and use a condition variable to wait on the condition that the
// idle cache has zero entries. The Wait() method on the condition variable
// releases the lock and blocks the goroutine until signaled (which happens
// when an idle channel is removed from the cache and closed), and grabs the
// lock before returning.
c.channelsMu.Lock()
c.closeCond = sync.NewCond(&c.channelsMu)
go c.xdsIdleChannels.Clear(true)
for c.xdsIdleChannels.Len() > 0 {
c.closeCond.Wait()
}
c.channelsMu.Unlock()

c.serializerClose()
<-c.serializer.Done()

Expand Down Expand Up @@ -289,27 +268,15 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in
c.logger.Infof("Received request for a reference to an xdsChannel for server config %q", serverConfig)
}

// Use an active channel, if one exists for this server config.
// Use an existing channel, if one exists for this server config.
if state, ok := c.xdsActiveChannels[serverConfig.String()]; ok {
if c.logger.V(2) {
c.logger.Infof("Reusing an active xdsChannel for server config %q", serverConfig)
c.logger.Infof("Reusing an existing xdsChannel for server config %q", serverConfig)
}
initLocked(state)
return state.channel, c.releaseChannel(serverConfig, state, deInitLocked), nil
}

// If an idle channel exists for this server config, remove it from the
// idle cache and add it to the map of active channels, and return it.
if s, ok := c.xdsIdleChannels.Remove(serverConfig.String()); ok {
if c.logger.V(2) {
c.logger.Infof("Reviving an xdsChannel from the idle cache for server config %q", serverConfig)
}
state := s.(*channelState)
c.xdsActiveChannels[serverConfig.String()] = state
initLocked(state)
return state.channel, c.releaseChannel(serverConfig, state, deInitLocked), nil
}

if c.logger.V(2) {
c.logger.Infof("Creating a new xdsChannel for server config %q", serverConfig)
}
Expand Down Expand Up @@ -345,9 +312,7 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in
}

// releaseChannel is a function that is called when a reference to an xdsChannel
// needs to be released. It handles the logic of moving the channel to an idle
// cache if there are no other active references, and closing the channel if it
// remains in the idle cache for the configured duration.
// needs to be released. It handles closing channels with no active references.
//
// The function takes the following parameters:
// - serverConfig: the server configuration for the xdsChannel
Expand All @@ -360,7 +325,6 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in
func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state *channelState, deInitLocked func(*channelState)) func() {
return grpcsync.OnceFunc(func() {
c.channelsMu.Lock()
defer c.channelsMu.Unlock()

if c.logger.V(2) {
c.logger.Infof("Received request to release a reference to an xdsChannel for server config %q", serverConfig)
Expand All @@ -372,40 +336,17 @@ func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state
if c.logger.V(2) {
c.logger.Infof("xdsChannel %p has other active references", state.channel)
}
c.channelsMu.Unlock()
return
}

// Move the channel to the idle cache instead of closing
// immediately. If the channel remains in the idle cache for
// the configured duration, it will get closed.
delete(c.xdsActiveChannels, serverConfig.String())
if c.logger.V(2) {
c.logger.Infof("Moving xdsChannel [%p] for server config %s to the idle cache", state.channel, serverConfig)
c.logger.Infof("Closing xdsChannel [%p] for server config %s", state.channel, serverConfig)
}
channelToClose := state.channel
c.channelsMu.Unlock()

// The idle cache expiry timeout results in the channel getting
// closed in another serializer callback.
c.xdsIdleChannels.Add(serverConfig.String(), state, grpcsync.OnceFunc(func() {
c.channelsMu.Lock()
channelToClose := state.channel
c.channelsMu.Unlock()

if c.logger.V(2) {
c.logger.Infof("Idle cache expiry timeout fired for xdsChannel [%p] for server config %s", state.channel, serverConfig)
}
channelToClose.close()

// If the channel is being closed as a result of the xDS client
// being closed, closeCond is non-nil and we need to signal from
// here to unblock Close(). Holding the lock is not necessary
// to call Signal() on a condition variable. But the field
// `c.closeCond` needs to guarded by the lock, which is why we
// acquire it here.
c.channelsMu.Lock()
if c.closeCond != nil {
c.closeCond.Signal()
}
c.channelsMu.Unlock()
}))
channelToClose.close()
})
}
Loading

0 comments on commit 3f76275

Please sign in to comment.