Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pickfirst: Register a health listener when used as a leaf policy #7832

Merged
merged 11 commits into from
Dec 5, 2024
173 changes: 136 additions & 37 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
balancer.Register(pickfirstBuilder{})
}

// enableHealthListenerKeyType is a unique key type used in resolver attributes
// to indicate whether the health listener usage is enabled.
type enableHealthListenerKeyType struct{}

var (
logger = grpclog.Component("pick-first-leaf-lb")
// Name is the name of the pick_first_leaf balancer.
Expand Down Expand Up @@ -108,10 +112,8 @@
target: bo.Target.String(),
metricsRecorder: bo.MetricsRecorder, // ClientConn will always create a Metrics Recorder.

addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
cancelConnectionTimer: func() {},
}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
Expand All @@ -130,6 +132,13 @@
return cfg, nil
}

// EnableHealthListener updates the state to configure pickfirst for using a
// generic health listener.
func EnableHealthListener(state resolver.State) resolver.State {
state.Attributes = state.Attributes.WithValue(enableHealthListenerKeyType{}, true)
return state
}

type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`

Expand All @@ -147,15 +156,19 @@
subConn balancer.SubConn
addr resolver.Address

state connectivity.State
rawConnectivityState connectivity.State
// The effective connectivity state based on raw connectivity, health state
// and after following sticky TransientFailure behaviour defined in A62.
effectiveState connectivity.State
lastErr error
connectionFailedInFirstPass bool
}

func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
sd := &scData{
state: connectivity.Idle,
addr: addr,
rawConnectivityState: connectivity.Idle,
effectiveState: connectivity.Idle,
addr: addr,
}
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
Expand All @@ -180,14 +193,17 @@
// The mutex is used to ensure synchronization of updates triggered
// from the idle picker and the already serialized resolver,
// SubConn state updates.
mu sync.Mutex
mu sync.Mutex
// State reported to the channel based on SubConn states and resolver
// updates.
state connectivity.State
// scData for active subonns mapped by address.
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
cancelConnectionTimer func()
healthCheckingEnabled bool
}

// ResolverError is called by the ClientConn when the name resolver produces
Expand All @@ -213,7 +229,7 @@
return
}

b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
Expand All @@ -226,12 +242,12 @@
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// Cleanup state pertaining to the previous resolver state.
// Treat an empty address list like an error by calling b.ResolverError.
b.state = connectivity.TransientFailure
b.closeSubConnsLocked()
b.addressList.updateAddrs(nil)
b.resolverErrorLocked(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) != nil
cfg, ok := state.BalancerConfig.(pfConfig)
if state.BalancerConfig != nil && !ok {
return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
Expand Down Expand Up @@ -278,12 +294,15 @@
newAddrs = deDupAddresses(newAddrs)
newAddrs = interleaveAddresses(newAddrs)

// If the previous ready SubConn exists in new address list,
// keep this connection and don't create new SubConns.
prevAddr := b.addressList.currentAddress()
prevSCData, found := b.subConns.Get(prevAddr)
prevAddrsCount := b.addressList.size()
isPrevRawConnectivityStateReady := found && prevSCData.(*scData).rawConnectivityState == connectivity.Ready
b.addressList.updateAddrs(newAddrs)
if b.state == connectivity.Ready && b.addressList.seekTo(prevAddr) {

// If the previous ready SubConn exists in new address list,
// keep this connection and don't create new SubConns.
if isPrevRawConnectivityStateReady && b.addressList.seekTo(prevAddr) {
return nil
}

Expand All @@ -295,10 +314,9 @@
// we should still enter CONNECTING because the sticky TF behaviour
// mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
// due to connectivity failures.
if b.state == connectivity.Ready || b.state == connectivity.Connecting || prevAddrsCount == 0 {
if isPrevRawConnectivityStateReady || b.state == connectivity.Connecting || prevAddrsCount == 0 {
// Start connection attempt at first address.
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
b.forceUpdateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
Expand Down Expand Up @@ -497,7 +515,7 @@
}

scd := sd.(*scData)
switch scd.state {
switch scd.rawConnectivityState {
case connectivity.Idle:
scd.subConn.Connect()
b.scheduleNextConnectionLocked()
Expand All @@ -515,7 +533,7 @@
b.scheduleNextConnectionLocked()
return
default:
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.state)
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.rawConnectivityState)

Check warning on line 536 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L536

Added line #L536 was not covered by tests
return

}
Expand Down Expand Up @@ -558,16 +576,17 @@
func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
oldState := sd.state
sd.state = newState.ConnectivityState
oldState := sd.rawConnectivityState
sd.rawConnectivityState = newState.ConnectivityState
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
// is included to check if the current list of active SubConns includes this
// SubConn.
if activeSD, found := b.subConns.Get(sd.addr); !found || activeSD != sd {
if !b.isActiveSCData(sd) {
return
}
if newState.ConnectivityState == connectivity.Shutdown {
sd.effectiveState = connectivity.Shutdown

Check warning on line 589 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L589

Added line #L589 was not covered by tests
return
}

Expand All @@ -586,10 +605,30 @@
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
return
}
b.state = connectivity.Ready
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
if !b.healthCheckingEnabled {
if b.logger.V(2) {
b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn)
}

Check warning on line 611 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L610-L611

Added lines #L610 - L611 were not covered by tests

sd.effectiveState = connectivity.Ready
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
return
}
if b.logger.V(2) {
b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn)
}

Check warning on line 622 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L621-L622

Added lines #L621 - L622 were not covered by tests
// Send a CONNECTING update to take the SubConn out of sticky-TF if
// required.
sd.effectiveState = connectivity.Connecting
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) {
b.updateSubConnHealthState(sd, scs)
})
return
}
Expand All @@ -600,11 +639,11 @@
// a transport is successfully created, but the connection fails
// before the SubConn can send the notification for READY. We treat
// this as a successful connection and transition to IDLE.
if (b.state == connectivity.Ready && newState.ConnectivityState != connectivity.Ready) || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
if (oldState == connectivity.Ready && newState.ConnectivityState != connectivity.Ready) || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If oldState is Ready, doesn't newState have to be Idle?

And if newState is Idle, the previous state must have been Ready, right? We may have not gotten the update (because of the Connecting->Ready->Idle race), but....can this whole thing be replaced by just if newState.ConnectivityState == connectivity.Idle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If oldState is Ready, doesn't newState have to be Idle?

Yes, this is true because the health state is not sent via the connectivity listener anymore. So READY->TF->READY transitions are not possible and no duplicate READY updates are sent by addrConn.

And if newState is Idle, the previous state must have been Ready, right?

This is false as a transition from TF->IDLE is possible after the subchannel completes its backoff.

We can simplify the condition to oldState != TF && newState == IDLE, along with a comment explaining the assumptions that lead to this simplification. IMO, this may make it harder to understand. Do you think I should make the change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, please change to look like we want it to look (if we didn't have the bug) and add a clause that covers the bug:

if oldState == Ready || (oldState == Connecting && newState == Idle) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and add a TODO to remove the second half when the bug is fixed please)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Once a transport fails, the balancer enters IDLE and starts from
// the first address when the picker is used.
b.shutdownRemainingLocked(sd)
b.state = connectivity.Idle
sd.effectiveState = newState.ConnectivityState
// READY SubConn interspliced in between CONNECTING and IDLE, need to
// account for that.
if oldState == connectivity.Connecting {
Expand All @@ -615,7 +654,7 @@
}
disconnectionsMetric.Record(b.metricsRecorder, 1, b.target)
b.addressList.reset()
b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
})
Expand All @@ -625,19 +664,21 @@
if b.firstPass {
switch newState.ConnectivityState {
case connectivity.Connecting:
// The balancer can be in either IDLE, CONNECTING or
// TRANSIENT_FAILURE. If it's in TRANSIENT_FAILURE, stay in
// The effective state can be in either IDLE, CONNECTING or
// TRANSIENT_FAILURE. If it's TRANSIENT_FAILURE, stay in
// TRANSIENT_FAILURE until it's READY. See A62.
// If the balancer is already in CONNECTING, no update is needed.
if b.state == connectivity.Idle {
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
// If the SubConn is already reporting CONNECTING, no update is
// needed.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
if sd.effectiveState == connectivity.Idle {
sd.effectiveState = connectivity.Connecting
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
}
case connectivity.TransientFailure:
sd.lastErr = newState.ConnectionError
sd.effectiveState = connectivity.TransientFailure
// Since we're re-using common SubConns while handling resolver
// updates, we could receive an out of turn TRANSIENT_FAILURE from
// a pass over the previous address list. Happy Eyeballs will also
Expand All @@ -664,7 +705,7 @@
b.numTF = (b.numTF + 1) % b.subConns.Len()
sd.lastErr = newState.ConnectionError
if b.numTF%b.subConns.Len() == 0 {
b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: newState.ConnectionError},
})
Expand Down Expand Up @@ -694,21 +735,79 @@
}
}
b.firstPass = false
b.state = connectivity.TransientFailure

b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: lastErr},
})
// Start re-connecting all the SubConns that are already in IDLE.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if sd.state == connectivity.Idle {
if sd.rawConnectivityState == connectivity.Idle {
sd.subConn.Connect()
}
}
}

func (b *pickfirstBalancer) isActiveSCData(sd *scData) bool {
activeSD, found := b.subConns.Get(sd.addr)
return found && activeSD == sd
}

func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
// is included to check if the current list of active SubConns includes
// this SubConn.
if !b.isActiveSCData(sd) {
return
}

Check warning on line 765 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L764-L765

Added lines #L764 - L765 were not covered by tests
sd.effectiveState = state.ConnectivityState
switch state.ConnectivityState {
case connectivity.Ready:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
case connectivity.TransientFailure:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("pickfirst: health check failure: %v", state.ConnectionError)},
})
case connectivity.Connecting:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
default:
b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state)

Check warning on line 784 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L783-L784

Added lines #L783 - L784 were not covered by tests
}
}

// updateBalancerState stores the state reported to the channel and calls
// ClientConn.UpdateState(). As an optimization, it avoid sending duplicate
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// updates to the channel.
func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) {
// In case of TransientFailures allow the picker to be updated to update
// the connectivity error, in all other cases don't send duplicate state
// updates.
if newState.ConnectivityState == b.state && b.state != connectivity.TransientFailure {
return
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}
b.forceUpdateConcludedStateLocked(newState)
}

// forceUpdateConcludedStateLocked stores the state reported to the channel and
// calls ClientConn.UpdateState().
// A separate function is defined to force update the ClientConn state since the
// channel doesn't correctly assume that LB policies start in CONNECTING and
// relies on LB policy to send an initial CONNECTING update.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) {
b.state = newState.ConnectivityState
b.cc.UpdateState(newState)
}

type picker struct {
result balancer.PickResult
err error
Expand Down
Loading