Skip to content

Commit

Permalink
handle a few more test flakes
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Dec 12, 2024
1 parent a2e8a67 commit a4887ee
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 6 deletions.
11 changes: 11 additions & 0 deletions internal/testutils/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ func (c *Channel) ReceiveOrFail() (any, bool) {
}
}

// Drain drains the channel by repeatedly reading from it until it is empty.
func (c *Channel) Drain() {
for {
select {
case <-c.C:

Check warning on line 77 in internal/testutils/channel.go

View check run for this annotation

Codecov / codecov/patch

internal/testutils/channel.go#L77

Added line #L77 was not covered by tests
default:
return
}
}
}

// Receive returns the value received on the underlying channel, or the error
// returned by ctx if it is closed or cancelled.
func (c *Channel) Receive(ctx context.Context) (any, error) {
Expand Down
28 changes: 24 additions & 4 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -202,11 +203,18 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer {
// - a channel used to signal that previously requested cluster resources are
// no longer requested
func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
return setupWithManagementServerAndListener(t, nil)
}

// Same as setupWithManagementServer, but also allows the caller to specify
// a listener to be used by the management server.
func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
t.Helper()

cdsResourceRequestedCh := make(chan []string, 1)
cdsResourceCanceledCh := make(chan struct{}, 1)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
if req.GetTypeUrl() == version.V3ClusterURL {
switch len(req.GetResourceNames()) {
Expand Down Expand Up @@ -807,11 +815,20 @@ func (s) TestClusterUpdate_Failure(t *testing.T) {
// TRANSIENT_FAILURE. It is also expected to cancel the CDS watch.
func (s) TestResolverError(t *testing.T) {
_, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
lis := testutils.NewListenerWrapper(t, nil)
mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListener(t, lis)

// Verify that the specified cluster resource is requested.
// Grab the wrapped connection from the listener wrapper. This will be used
// to verify the connection is closed.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
}
conn := val.(*testutils.ConnWrapper)

// Verify that the specified cluster resource is requested.
wantNames := []string{clusterName}
if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
t.Fatal(err)
Expand All @@ -831,7 +848,7 @@ func (s) TestResolverError(t *testing.T) {

// Ensure that the resolver error is propagated to the RPC caller.
client := testgrpc.NewTestServiceClient(cc)
_, err := client.EmptyCall(ctx, &testpb.Empty{})
_, err = client.EmptyCall(ctx, &testpb.Empty{})
if code := status.Code(err); code != codes.Unavailable {
t.Fatalf("EmptyCall() failed with code: %v, want %v", code, codes.Unavailable)
}
Expand Down Expand Up @@ -901,11 +918,14 @@ func (s) TestResolverError(t *testing.T) {
resolverErr = xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds resource not found error")
r.ReportError(resolverErr)

// Wait for the CDS resource to be not requested anymore.
// Wait for the CDS resource to be not requested anymore, or the connection
// to the management server to be closed (which happens as part of the last
// resource watch being canceled).
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for CDS resource to be not requested")
case <-cdsResourceCanceledCh:
case <-conn.CloseCh.C:
}

// Verify that the resolver error is pushed to the child policy.
Expand Down
9 changes: 8 additions & 1 deletion xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,9 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
}
// state can only be accessed in the context of an
// xdsClientSerializer callback. Hence making a copy of the cached
// resource here for watchCallbackSerializer.
resource := state.cache
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) })
}
Expand All @@ -648,7 +651,11 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q was NACKed", rType.TypeName(), resourceName)

Check warning on line 652 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L652

Added line #L652 was not covered by tests
}
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) })
// state can only be accessed in the context of an
// xdsClientSerializer callback. Hence making a copy of the error
// here for watchCallbackSerializer.
err := state.md.ErrState.Err
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) })
}
// If the metadata field is updated to indicate that the management
// server does not have this resource, notify the new watcher.
Expand Down
1 change: 1 addition & 0 deletions xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) {
if _, err := conn.CloseCh.Receive(ctx); err != nil {
t.Fatalf("Timeout when expecting existing connection to be closed: %v", err)
}
streamRequestCh.Drain()

// Register a watch for the same listener resource.
lw = newListenerWatcher()
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/tests/lds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
// containing both the old good update and the latest NACK error. The test
// verifies that a when a new watch is registered for the same resource, the new
// watcher receives the good update followed by the NACK error.
func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

nodeID := uuid.New().String()
Expand Down

0 comments on commit a4887ee

Please sign in to comment.