From a4887ee972553f9251748b1249c9f823c171738d Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 12 Dec 2024 23:20:51 +0000 Subject: [PATCH] handle a few more test flakes --- internal/testutils/channel.go | 11 ++++++++ .../balancer/cdsbalancer/cdsbalancer_test.go | 28 ++++++++++++++++--- xds/internal/xdsclient/authority.go | 9 +++++- .../tests/ads_stream_ack_nack_test.go | 1 + .../xdsclient/tests/lds_watchers_test.go | 2 +- 5 files changed, 45 insertions(+), 6 deletions(-) diff --git a/internal/testutils/channel.go b/internal/testutils/channel.go index 720d8537e666..d1bb0458eb13 100644 --- a/internal/testutils/channel.go +++ b/internal/testutils/channel.go @@ -70,6 +70,17 @@ func (c *Channel) ReceiveOrFail() (any, bool) { } } +// Drain drains the channel by repeatedly reading from it until it is empty. +func (c *Channel) Drain() { + for { + select { + case <-c.C: + default: + return + } + } +} + // Receive returns the value received on the underlying channel, or the error // returned by ctx if it is closed or cancelled. func (c *Channel) Receive(ctx context.Context) (any, error) { diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 75abbe81fc3d..9c17cee62cdf 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "net" "strings" "testing" "time" @@ -202,11 +203,18 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer { // - a channel used to signal that previously requested cluster resources are // no longer requested func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { + return setupWithManagementServerAndListener(t, nil) +} + +// Same as setupWithManagementServer, but also allows the caller to specify +// a listener to be used by the management server. +func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { t.Helper() cdsResourceRequestedCh := make(chan []string, 1) cdsResourceCanceledCh := make(chan struct{}, 1) mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { if req.GetTypeUrl() == version.V3ClusterURL { switch len(req.GetResourceNames()) { @@ -807,11 +815,20 @@ func (s) TestClusterUpdate_Failure(t *testing.T) { // TRANSIENT_FAILURE. It is also expected to cancel the CDS watch. func (s) TestResolverError(t *testing.T) { _, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t) + lis := testutils.NewListenerWrapper(t, nil) + mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListener(t, lis) - // Verify that the specified cluster resource is requested. + // Grab the wrapped connection from the listener wrapper. This will be used + // to verify the connection is closed. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + val, err := lis.NewConnCh.Receive(ctx) + if err != nil { + t.Fatalf("Failed to receive new connection from wrapped listener: %v", err) + } + conn := val.(*testutils.ConnWrapper) + + // Verify that the specified cluster resource is requested. wantNames := []string{clusterName} if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil { t.Fatal(err) @@ -831,7 +848,7 @@ func (s) TestResolverError(t *testing.T) { // Ensure that the resolver error is propagated to the RPC caller. client := testgrpc.NewTestServiceClient(cc) - _, err := client.EmptyCall(ctx, &testpb.Empty{}) + _, err = client.EmptyCall(ctx, &testpb.Empty{}) if code := status.Code(err); code != codes.Unavailable { t.Fatalf("EmptyCall() failed with code: %v, want %v", code, codes.Unavailable) } @@ -901,11 +918,14 @@ func (s) TestResolverError(t *testing.T) { resolverErr = xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds resource not found error") r.ReportError(resolverErr) - // Wait for the CDS resource to be not requested anymore. + // Wait for the CDS resource to be not requested anymore, or the connection + // to the management server to be closed (which happens as part of the last + // resource watch being canceled). select { case <-ctx.Done(): t.Fatal("Timeout when waiting for CDS resource to be not requested") case <-cdsResourceCanceledCh: + case <-conn.CloseCh.C: } // Verify that the resolver error is pushed to the child policy. diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 9262b905a7ea..de799eb2269e 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -639,6 +639,9 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w if a.logger.V(2) { a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON()) } + // state can only be accessed in the context of an + // xdsClientSerializer callback. Hence making a copy of the cached + // resource here for watchCallbackSerializer. resource := state.cache a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) }) } @@ -648,7 +651,11 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w if a.logger.V(2) { a.logger.Infof("Resource type %q with resource name %q was NACKed", rType.TypeName(), resourceName) } - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) }) + // state can only be accessed in the context of an + // xdsClientSerializer callback. Hence making a copy of the error + // here for watchCallbackSerializer. + err := state.md.ErrState.Err + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) }) } // If the metadata field is updated to indicate that the management // server does not have this resource, notify the new watcher. diff --git a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go index 71237bdd81ad..2a13c77e8ed1 100644 --- a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go @@ -459,6 +459,7 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { if _, err := conn.CloseCh.Receive(ctx); err != nil { t.Fatalf("Timeout when expecting existing connection to be closed: %v", err) } + streamRequestCh.Drain() // Register a watch for the same listener resource. lw = newListenerWatcher() diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 3fcc385c2851..b05b9caf4adc 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -1048,7 +1048,7 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { // containing both the old good update and the latest NACK error. The test // verifies that a when a new watch is registered for the same resource, the new // watcher receives the good update followed by the NACK error. -func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { +func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) nodeID := uuid.New().String()