From 2c9c51a9eb5e5035797f67beb8582c98430f68d6 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 1 Aug 2023 15:46:48 -0700 Subject: [PATCH] base: update base balancer for new APIs --- balancer/base/balancer.go | 22 ++++++++++++++++------ balancer/base/balancer_test.go | 28 +++++++++++++++++++++++----- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index 3929c26d31e1..a7f1eeec8e6a 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -105,7 +105,12 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { addrsSet.Set(a, nil) if _, ok := b.subConns.Get(a); !ok { // a is a new address (not existing in b.subConns). - sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck}) + var sc balancer.SubConn + opts := balancer.NewSubConnOptions{ + HealthCheckEnabled: b.config.HealthCheck, + StateListener: func(scs balancer.SubConnState) { b.updateSubConnState(sc, scs) }, + } + sc, err := b.cc.NewSubConn([]resolver.Address{a}, opts) if err != nil { logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) continue @@ -121,10 +126,10 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { sc := sci.(balancer.SubConn) // a was removed by resolver. if _, ok := addrsSet.Get(a); !ok { - b.cc.RemoveSubConn(sc) + sc.Shutdown() b.subConns.Delete(a) // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. - // The entry will be deleted in UpdateSubConnState. + // The entry will be deleted in updateSubConnState. } } // If resolver state contains no addresses, return an error so ClientConn @@ -177,7 +182,12 @@ func (b *baseBalancer) regeneratePicker() { b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs}) } +// UpdateSubConnState is a nop because a StateListener is always set in NewSubConn. func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + logger.Errorf("base.baseBalancer: UpdateSubConnState(%v, %+v) called unexpectedly", sc, state) +} + +func (b *baseBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { s := state.ConnectivityState if logger.V(2) { logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s) @@ -204,8 +214,8 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su case connectivity.Idle: sc.Connect() case connectivity.Shutdown: - // When an address was removed by resolver, b called RemoveSubConn but - // kept the sc's state in scStates. Remove state for this sc here. + // When an address was removed by resolver, b called Shutdown but kept + // the sc's state in scStates. Remove state for this sc here. delete(b.scStates, sc) case connectivity.TransientFailure: // Save error to be reported via picker. @@ -226,7 +236,7 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su } // Close is a nop because base balancer doesn't have internal state to clean up, -// and it doesn't need to call RemoveSubConn for the SubConns. +// and it doesn't need to call Shutdown for the SubConns. func (b *baseBalancer) Close() { } diff --git a/balancer/base/balancer_test.go b/balancer/base/balancer_test.go index 7bf4d92f8f0a..8a97b4220a5c 100644 --- a/balancer/base/balancer_test.go +++ b/balancer/base/balancer_test.go @@ -19,7 +19,9 @@ package base import ( + "context" "testing" + "time" "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" @@ -38,7 +40,9 @@ func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewS func (c *testClientConn) UpdateState(balancer.State) {} -type testSubConn struct{} +type testSubConn struct { + updateState func(balancer.SubConnState) +} func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {} @@ -61,7 +65,11 @@ func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker { } func TestBaseBalancerReserveAttributes(t *testing.T) { - var v = func(info PickerBuildInfo) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + validated := make(chan struct{}, 1) + v := func(info PickerBuildInfo) { + defer func() { validated <- struct{}{} }() for _, sc := range info.ReadySCs { if sc.Address.Addr == "1.1.1.1" { if sc.Address.Attributes == nil { @@ -80,8 +88,8 @@ func TestBaseBalancerReserveAttributes(t *testing.T) { } pickBuilder := &testPickBuilder{validate: v} b := (&baseBuilder{pickerBuilder: pickBuilder}).Build(&testClientConn{ - newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) { - return &testSubConn{}, nil + newSubConn: func(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + return &testSubConn{updateState: opts.StateListener}, nil }, }, balancer.BuildOptions{}).(*baseBalancer) @@ -93,8 +101,18 @@ func TestBaseBalancerReserveAttributes(t *testing.T) { }, }, }) + select { + case <-validated: + case <-ctx.Done(): + t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build") + } for sc := range b.scStates { - b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil}) + sc.(*testSubConn).updateState(balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil}) + select { + case <-validated: + case <-ctx.Done(): + t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build") + } } }