Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pickfirst: Register a health listener when used as a leaf policy #7832

Merged
merged 11 commits into from
Dec 5, 2024
150 changes: 117 additions & 33 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"sync"
"time"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst/internal"
"google.golang.org/grpc/connectivity"
Expand All @@ -52,12 +53,21 @@
balancer.Register(pickfirstBuilder{})
}

// enableHealthListenerKeyType is a unique key type used in resolver attributes
// to indicate whether the health listener usage is enabled.
type enableHealthListenerKeyType struct{}

var (
logger = grpclog.Component("pick-first-leaf-lb")
// Name is the name of the pick_first_leaf balancer.
// It is changed to "pick_first" in init() if this balancer is to be
// registered as the default pickfirst.
Name = "pick_first_leaf"

// enableHealthListenerValue is the resolver state attribute value used to
// enable pickfirst to listen for health updates when operating under a
// petiole policy.
enableHealthListenerValue = &struct{}{}
)

const (
Expand Down Expand Up @@ -85,7 +95,8 @@
cc: cc,
addressList: addressList{},
dfawley marked this conversation as resolved.
Show resolved Hide resolved
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
connectivityState: connectivity.Connecting,
concludedState: connectivity.Connecting,
mu: sync.Mutex{},
cancelConnectionTimer: func() {},
}
Expand All @@ -105,6 +116,12 @@
return cfg, nil
}

// EnableHealthListener updates the state to configure pickfirst for using a
// generic health listener.
func EnableHealthListener(attrs *attributes.Attributes) *attributes.Attributes {
return attrs.WithValue(enableHealthListenerKeyType{}, enableHealthListenerValue)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these types of functions we prefer to make them operate on the thing that contains the attributes. In this case, that would be a resolver.State.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the function to accept resolver.State.


type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`

Expand All @@ -122,15 +139,15 @@
subConn balancer.SubConn
addr resolver.Address

state connectivity.State
connectivityState connectivity.State
lastErr error
connectionFailedInFirstPass bool
}

func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
sd := &scData{
state: connectivity.Idle,
addr: addr,
connectivityState: connectivity.Idle,
addr: addr,
}
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
Expand All @@ -153,14 +170,18 @@
// The mutex is used to ensure synchronization of updates triggered
// from the idle picker and the already serialized resolver,
// SubConn state updates.
mu sync.Mutex
state connectivity.State
mu sync.Mutex
connectivityState connectivity.State
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this deserves a comment, too, since there are now two very similar fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this kind of tracking be done in the subconn struct (scData) and not here? I would expect the lb policy only has the concludedState and each subchannel needs to track its real state and its effective state, accounting for sticky-TF and health reporting? It seems confusing to me that the LB policy itself is tracking two different states, but I'm willing to believe it's simpler this way if you tried it the other way already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring the Java implementation of Pickfirst and they were handling the sticky TF behaviour in the LB Policy. I don't see any issue with handling sticky TF in the subchannel state. I've update the PR to reflect the suggestions.

// State reported to the channel. It will be the health state when being
// used as a leaf policy and the connectivityState is READY.
concludedState connectivity.State
// scData for active subonns mapped by address.
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
cancelConnectionTimer func()
healthCheckingEnabled bool
}

// ResolverError is called by the ClientConn when the name resolver produces
Expand All @@ -179,14 +200,15 @@
// The picker will not change since the balancer does not currently
// report an error. If the balancer hasn't received a single good resolver
// update yet, transition to TRANSIENT_FAILURE.
if b.state != connectivity.TransientFailure && b.addressList.size() > 0 {
if b.connectivityState != connectivity.TransientFailure && b.addressList.size() > 0 {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
if b.logger.V(2) {
b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.")
}
return
}

b.cc.UpdateState(balancer.State{
b.connectivityState = connectivity.TransientFailure
easwars marked this conversation as resolved.
Show resolved Hide resolved
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
Expand All @@ -199,12 +221,13 @@
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// Cleanup state pertaining to the previous resolver state.
// Treat an empty address list like an error by calling b.ResolverError.
b.state = connectivity.TransientFailure
b.connectivityState = connectivity.TransientFailure
b.closeSubConnsLocked()
b.addressList.updateAddrs(nil)
b.resolverErrorLocked(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) == enableHealthListenerValue
easwars marked this conversation as resolved.
Show resolved Hide resolved
cfg, ok := state.BalancerConfig.(pfConfig)
if state.BalancerConfig != nil && !ok {
return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
Expand Down Expand Up @@ -256,7 +279,7 @@
prevAddr := b.addressList.currentAddress()
prevAddrsCount := b.addressList.size()
b.addressList.updateAddrs(newAddrs)
if b.state == connectivity.Ready && b.addressList.seekTo(prevAddr) {
if b.connectivityState == connectivity.Ready && b.addressList.seekTo(prevAddr) {
return nil
}

Expand All @@ -268,15 +291,15 @@
// we should still enter CONNECTING because the sticky TF behaviour
// mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
// due to connectivity failures.
if b.state == connectivity.Ready || b.state == connectivity.Connecting || prevAddrsCount == 0 {
if b.connectivityState == connectivity.Ready || b.connectivityState == connectivity.Connecting || prevAddrsCount == 0 {
// Start connection attempt at first address.
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
b.connectivityState = connectivity.Connecting
b.forceUpdateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
b.startFirstPassLocked()
} else if b.state == connectivity.TransientFailure {
} else if b.connectivityState == connectivity.TransientFailure {
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
// we're READY. See A62.
b.startFirstPassLocked()
Expand All @@ -295,7 +318,7 @@
defer b.mu.Unlock()
b.closeSubConnsLocked()
b.cancelConnectionTimer()
b.state = connectivity.Shutdown
b.connectivityState = connectivity.Shutdown
}

// ExitIdle moves the balancer out of idle state. It can be called concurrently
Expand All @@ -304,7 +327,7 @@
func (b *pickfirstBalancer) ExitIdle() {
b.mu.Lock()
defer b.mu.Unlock()
if b.state == connectivity.Idle {
if b.connectivityState == connectivity.Idle {
b.startFirstPassLocked()
}
}
Expand Down Expand Up @@ -470,7 +493,7 @@
}

scd := sd.(*scData)
switch scd.state {
switch scd.connectivityState {
case connectivity.Idle:
scd.subConn.Connect()
b.scheduleNextConnectionLocked()
Expand All @@ -488,7 +511,7 @@
b.scheduleNextConnectionLocked()
return
default:
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.state)
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.connectivityState)

Check warning on line 514 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L514

Added line #L514 was not covered by tests
return

}
Expand Down Expand Up @@ -531,12 +554,12 @@
func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
oldState := sd.state
oldState := sd.connectivityState
// Record a connection attempt when exiting CONNECTING.
if newState.ConnectivityState == connectivity.TransientFailure {
sd.connectionFailedInFirstPass = true
}
sd.state = newState.ConnectivityState
sd.connectivityState = newState.ConnectivityState
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
// is included to check if the current list of active SubConns includes this
Expand All @@ -556,10 +579,23 @@
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
return
}
b.state = connectivity.Ready
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
b.connectivityState = connectivity.Ready
if !b.healthCheckingEnabled {
if b.logger.V(2) {
b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn)
}

Check warning on line 586 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L585-L586

Added lines #L585 - L586 were not covered by tests

b.updateConcludedStateLocked(balancer.State{
easwars marked this conversation as resolved.
Show resolved Hide resolved
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
easwars marked this conversation as resolved.
Show resolved Hide resolved
return
}
if b.logger.V(2) {
b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn)
}

Check warning on line 596 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L595-L596

Added lines #L595 - L596 were not covered by tests
sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) {
b.updateSubConnHealthState(sd, scs)
})
return
}
Expand All @@ -570,13 +606,13 @@
// a transport is successfully created, but the connection fails
// before the SubConn can send the notification for READY. We treat
// this as a successful connection and transition to IDLE.
if (b.state == connectivity.Ready && newState.ConnectivityState != connectivity.Ready) || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
if (b.connectivityState == connectivity.Ready && newState.ConnectivityState != connectivity.Ready) || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
// Once a transport fails, the balancer enters IDLE and starts from
// the first address when the picker is used.
b.shutdownRemainingLocked(sd)
b.state = connectivity.Idle
b.connectivityState = connectivity.Idle
b.addressList.reset()
b.cc.UpdateState(balancer.State{
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
})
Expand All @@ -590,9 +626,9 @@
// TRANSIENT_FAILURE. If it's in TRANSIENT_FAILURE, stay in
// TRANSIENT_FAILURE until it's READY. See A62.
// If the balancer is already in CONNECTING, no update is needed.
if b.state == connectivity.Idle {
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
if b.connectivityState == connectivity.Idle {
b.connectivityState = connectivity.Connecting
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
Expand Down Expand Up @@ -625,7 +661,7 @@
b.numTF = (b.numTF + 1) % b.subConns.Len()
sd.lastErr = newState.ConnectionError
if b.numTF%b.subConns.Len() == 0 {
b.cc.UpdateState(balancer.State{
b.updateConcludedStateLocked(balancer.State{
easwars marked this conversation as resolved.
Show resolved Hide resolved
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: newState.ConnectionError},
})
Expand Down Expand Up @@ -655,7 +691,7 @@
}
}
b.firstPass = false
b.state = connectivity.TransientFailure
b.connectivityState = connectivity.TransientFailure

b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Expand All @@ -664,12 +700,60 @@
// Start re-connecting all the SubConns that are already in IDLE.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if sd.state == connectivity.Idle {
if sd.connectivityState == connectivity.Idle {
sd.subConn.Connect()
}
}
}

func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
// is included to check if the current list of active SubConns includes
// this SubConn.
if activeSD, found := b.subConns.Get(sd.addr); !found || activeSD != sd {
return
}

Check warning on line 718 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L717-L718

Added lines #L717 - L718 were not covered by tests
easwars marked this conversation as resolved.
Show resolved Hide resolved
switch state.ConnectivityState {
case connectivity.Ready:
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
case connectivity.TransientFailure:
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("health check failure: %v", state.ConnectionError)},
easwars marked this conversation as resolved.
Show resolved Hide resolved
})
case connectivity.Connecting:
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
easwars marked this conversation as resolved.
Show resolved Hide resolved
default:
b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state)

Check warning on line 736 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L735-L736

Added lines #L735 - L736 were not covered by tests
}
}

func (b *pickfirstBalancer) updateConcludedStateLocked(newState balancer.State) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
// Optimization to not send duplicate CONNECTING and IDLE updates.
easwars marked this conversation as resolved.
Show resolved Hide resolved
if newState.ConnectivityState == b.concludedState && b.concludedState == connectivity.Connecting {
return
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 744 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L743-L744

Added lines #L743 - L744 were not covered by tests
b.forceUpdateConcludedStateLocked(newState)
}

// A separate function to force update the ClientConn state is required to send
// the first CONNECTING update since the channel doesn't correctly assume that
// LB policies start in CONNECTING and relies on LB policy to send an initial
// CONNECTING update.
func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) {
b.concludedState = newState.ConnectivityState
b.cc.UpdateState(newState)
}

type picker struct {
result balancer.PickResult
err error
Expand Down
Loading