Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

base: update base balancer for new APIs #6503

Merged
merged 1 commit into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {
// a is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
HealthCheckEnabled: b.config.HealthCheck,
StateListener: func(scs balancer.SubConnState) { b.updateSubConnState(sc, scs) },
}
sc, err := b.cc.NewSubConn([]resolver.Address{a}, opts)
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
Expand All @@ -121,10 +126,10 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
sc := sci.(balancer.SubConn)
// a was removed by resolver.
if _, ok := addrsSet.Get(a); !ok {
b.cc.RemoveSubConn(sc)
sc.Shutdown()
b.subConns.Delete(a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
// The entry will be deleted in updateSubConnState.
}
}
// If resolver state contains no addresses, return an error so ClientConn
Expand Down Expand Up @@ -177,7 +182,12 @@ func (b *baseBalancer) regeneratePicker() {
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}

// UpdateSubConnState is a nop because a StateListener is always set in NewSubConn.
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
logger.Errorf("base.baseBalancer: UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

func (b *baseBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
if logger.V(2) {
logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
Expand All @@ -204,8 +214,8 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
// When an address was removed by resolver, b called Shutdown but kept
// the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
case connectivity.TransientFailure:
// Save error to be reported via picker.
Expand All @@ -226,7 +236,7 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
}

// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
// and it doesn't need to call Shutdown for the SubConns.
func (b *baseBalancer) Close() {
}

Expand Down
28 changes: 23 additions & 5 deletions balancer/base/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package base

import (
"context"
"testing"
"time"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
Expand All @@ -38,7 +40,9 @@ func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewS

func (c *testClientConn) UpdateState(balancer.State) {}

type testSubConn struct{}
type testSubConn struct {
updateState func(balancer.SubConnState)
}

func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}

Expand All @@ -61,7 +65,11 @@ func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker {
}

func TestBaseBalancerReserveAttributes(t *testing.T) {
var v = func(info PickerBuildInfo) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
validated := make(chan struct{}, 1)
v := func(info PickerBuildInfo) {
defer func() { validated <- struct{}{} }()
for _, sc := range info.ReadySCs {
if sc.Address.Addr == "1.1.1.1" {
if sc.Address.Attributes == nil {
Expand All @@ -80,8 +88,8 @@ func TestBaseBalancerReserveAttributes(t *testing.T) {
}
pickBuilder := &testPickBuilder{validate: v}
b := (&baseBuilder{pickerBuilder: pickBuilder}).Build(&testClientConn{
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
return &testSubConn{}, nil
newSubConn: func(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
return &testSubConn{updateState: opts.StateListener}, nil
},
}, balancer.BuildOptions{}).(*baseBalancer)

Expand All @@ -93,8 +101,18 @@ func TestBaseBalancerReserveAttributes(t *testing.T) {
},
},
})
select {
case <-validated:
case <-ctx.Done():
t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build")
}

for sc := range b.scStates {
b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
sc.(*testSubConn).updateState(balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
select {
case <-validated:
case <-ctx.Done():
t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build")
}
}
}