Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pickfirst: Register a health listener when used as a leaf policy #7832

Merged
merged 11 commits into from
Dec 5, 2024
171 changes: 134 additions & 37 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
balancer.Register(pickfirstBuilder{})
}

// enableHealthListenerKeyType is a unique key type used in resolver attributes
// to indicate whether the health listener usage is enabled.
type enableHealthListenerKeyType struct{}

var (
logger = grpclog.Component("pick-first-leaf-lb")
// Name is the name of the pick_first_leaf balancer.
Expand Down Expand Up @@ -108,10 +112,8 @@
target: bo.Target.String(),
metricsRecorder: bo.MetricsRecorder, // ClientConn will always create a Metrics Recorder.

addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
cancelConnectionTimer: func() {},
}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
Expand All @@ -130,6 +132,13 @@
return cfg, nil
}

// EnableHealthListener updates the state to configure pickfirst for using a
// generic health listener.
func EnableHealthListener(state resolver.State) resolver.State {
state.Attributes = state.Attributes.WithValue(enableHealthListenerKeyType{}, true)
return state
}

type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`

Expand All @@ -147,15 +156,19 @@
subConn balancer.SubConn
addr resolver.Address

state connectivity.State
rawConnectivityState connectivity.State
// The effective connectivity state based on raw connectivity, health state
// and after following sticky TransientFailure behaviour defined in A62.
effectiveState connectivity.State
lastErr error
connectionFailedInFirstPass bool
}

func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
sd := &scData{
state: connectivity.Idle,
addr: addr,
rawConnectivityState: connectivity.Idle,
effectiveState: connectivity.Idle,
addr: addr,
}
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
Expand All @@ -180,14 +193,17 @@
// The mutex is used to ensure synchronization of updates triggered
// from the idle picker and the already serialized resolver,
// SubConn state updates.
mu sync.Mutex
mu sync.Mutex
// State reported to the channel based on SubConn states and resolver
// updates.
state connectivity.State
// scData for active subonns mapped by address.
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
cancelConnectionTimer func()
healthCheckingEnabled bool
}

// ResolverError is called by the ClientConn when the name resolver produces
Expand All @@ -213,7 +229,7 @@
return
}

b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
Expand All @@ -226,12 +242,12 @@
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// Cleanup state pertaining to the previous resolver state.
// Treat an empty address list like an error by calling b.ResolverError.
b.state = connectivity.TransientFailure
b.closeSubConnsLocked()
b.addressList.updateAddrs(nil)
b.resolverErrorLocked(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) != nil
cfg, ok := state.BalancerConfig.(pfConfig)
if state.BalancerConfig != nil && !ok {
return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
Expand Down Expand Up @@ -278,12 +294,15 @@
newAddrs = deDupAddresses(newAddrs)
newAddrs = interleaveAddresses(newAddrs)

// If the previous ready SubConn exists in new address list,
// keep this connection and don't create new SubConns.
prevAddr := b.addressList.currentAddress()
prevSCData, found := b.subConns.Get(prevAddr)
prevAddrsCount := b.addressList.size()
isPrevRawConnectivityStateReady := found && prevSCData.(*scData).rawConnectivityState == connectivity.Ready
b.addressList.updateAddrs(newAddrs)
if b.state == connectivity.Ready && b.addressList.seekTo(prevAddr) {

// If the previous ready SubConn exists in new address list,
// keep this connection and don't create new SubConns.
if isPrevRawConnectivityStateReady && b.addressList.seekTo(prevAddr) {
return nil
}

Expand All @@ -295,10 +314,9 @@
// we should still enter CONNECTING because the sticky TF behaviour
// mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
// due to connectivity failures.
if b.state == connectivity.Ready || b.state == connectivity.Connecting || prevAddrsCount == 0 {
if isPrevRawConnectivityStateReady || b.state == connectivity.Connecting || prevAddrsCount == 0 {
// Start connection attempt at first address.
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
b.forceUpdateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
Expand Down Expand Up @@ -497,7 +515,7 @@
}

scd := sd.(*scData)
switch scd.state {
switch scd.rawConnectivityState {
case connectivity.Idle:
scd.subConn.Connect()
b.scheduleNextConnectionLocked()
Expand All @@ -515,7 +533,7 @@
b.scheduleNextConnectionLocked()
return
default:
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.state)
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.rawConnectivityState)
return

}
Expand Down Expand Up @@ -558,16 +576,17 @@
func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
oldState := sd.state
sd.state = newState.ConnectivityState
oldState := sd.rawConnectivityState
sd.rawConnectivityState = newState.ConnectivityState
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
// is included to check if the current list of active SubConns includes this
// SubConn.
if activeSD, found := b.subConns.Get(sd.addr); !found || activeSD != sd {
if !b.isActiveSCData(sd) {
return
}
if newState.ConnectivityState == connectivity.Shutdown {
sd.effectiveState = connectivity.Shutdown
return
}

Expand All @@ -586,10 +605,30 @@
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
return
}
b.state = connectivity.Ready
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
if !b.healthCheckingEnabled {
if b.logger.V(2) {
b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn)
}

Check warning on line 611 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L608-L611

Added lines #L608 - L611 were not covered by tests

sd.effectiveState = connectivity.Ready
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Ready,

Check warning on line 615 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L614-L615

Added lines #L614 - L615 were not covered by tests
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
return
}
if b.logger.V(2) {
b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn)
}
// Send a CONNECTING update to take the SubConn out of sticky-TF if
// required.
sd.effectiveState = connectivity.Connecting
b.updateBalancerState(balancer.State{

Check warning on line 626 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L625-L626

Added lines #L625 - L626 were not covered by tests
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) {
b.updateSubConnHealthState(sd, scs)
})
return
}
Expand All @@ -600,11 +639,11 @@
// a transport is successfully created, but the connection fails
// before the SubConn can send the notification for READY. We treat
// this as a successful connection and transition to IDLE.
if (b.state == connectivity.Ready && newState.ConnectivityState != connectivity.Ready) || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
if (oldState == connectivity.Ready && newState.ConnectivityState != connectivity.Ready) || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If oldState is Ready, doesn't newState have to be Idle?

And if newState is Idle, the previous state must have been Ready, right? We may have not gotten the update (because of the Connecting->Ready->Idle race), but....can this whole thing be replaced by just if newState.ConnectivityState == connectivity.Idle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If oldState is Ready, doesn't newState have to be Idle?

Yes, this is true because the health state is not sent via the connectivity listener anymore. So READY->TF->READY transitions are not possible and no duplicate READY updates are sent by addrConn.

And if newState is Idle, the previous state must have been Ready, right?

This is false as a transition from TF->IDLE is possible after the subchannel completes its backoff.

We can simplify the condition to oldState != TF && newState == IDLE, along with a comment explaining the assumptions that lead to this simplification. IMO, this may make it harder to understand. Do you think I should make the change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, please change to look like we want it to look (if we didn't have the bug) and add a clause that covers the bug:

if oldState == Ready || (oldState == Connecting && newState == Idle) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and add a TODO to remove the second half when the bug is fixed please)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Once a transport fails, the balancer enters IDLE and starts from
// the first address when the picker is used.
b.shutdownRemainingLocked(sd)
b.state = connectivity.Idle
sd.effectiveState = newState.ConnectivityState
// READY SubConn interspliced in between CONNECTING and IDLE, need to
// account for that.
if oldState == connectivity.Connecting {
Expand All @@ -615,7 +654,7 @@
}
disconnectionsMetric.Record(b.metricsRecorder, 1, b.target)
b.addressList.reset()
b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{

Check warning on line 657 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L657

Added line #L657 was not covered by tests
ConnectivityState: connectivity.Idle,
Picker: &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
})
Expand All @@ -625,19 +664,19 @@
if b.firstPass {
switch newState.ConnectivityState {
case connectivity.Connecting:
// The balancer can be in either IDLE, CONNECTING or
// TRANSIENT_FAILURE. If it's in TRANSIENT_FAILURE, stay in
// The effective state can be in either IDLE, CONNECTING or
// TRANSIENT_FAILURE. If it's TRANSIENT_FAILURE, stay in
// TRANSIENT_FAILURE until it's READY. See A62.
// If the balancer is already in CONNECTING, no update is needed.
if b.state == connectivity.Idle {
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
if sd.effectiveState != connectivity.TransientFailure {
sd.effectiveState = connectivity.Connecting
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
}
case connectivity.TransientFailure:
sd.lastErr = newState.ConnectionError
sd.effectiveState = connectivity.TransientFailure
// Since we're re-using common SubConns while handling resolver
// updates, we could receive an out of turn TRANSIENT_FAILURE from
// a pass over the previous address list. Happy Eyeballs will also
Expand All @@ -664,7 +703,7 @@
b.numTF = (b.numTF + 1) % b.subConns.Len()
sd.lastErr = newState.ConnectionError
if b.numTF%b.subConns.Len() == 0 {
b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: newState.ConnectionError},
})
Expand Down Expand Up @@ -694,21 +733,79 @@
}
}
b.firstPass = false
b.state = connectivity.TransientFailure

b.cc.UpdateState(balancer.State{
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: lastErr},
})
// Start re-connecting all the SubConns that are already in IDLE.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if sd.state == connectivity.Idle {
if sd.rawConnectivityState == connectivity.Idle {
sd.subConn.Connect()
}
}
}

func (b *pickfirstBalancer) isActiveSCData(sd *scData) bool {

Check warning on line 749 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L749

Added line #L749 was not covered by tests
activeSD, found := b.subConns.Get(sd.addr)
return found && activeSD == sd
}

func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
// is included to check if the current list of active SubConns includes
// this SubConn.
if !b.isActiveSCData(sd) {
return
}
sd.effectiveState = state.ConnectivityState
switch state.ConnectivityState {
case connectivity.Ready:
b.updateBalancerState(balancer.State{

Check warning on line 767 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L766-L767

Added lines #L766 - L767 were not covered by tests
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
case connectivity.TransientFailure:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("pickfirst: health check failure: %v", state.ConnectionError)},
})
case connectivity.Connecting:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
default:
b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state)
}
}

// updateBalancerState stores the state reported to the channel and calls

Check warning on line 786 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L785-L786

Added lines #L785 - L786 were not covered by tests
// ClientConn.UpdateState(). As an optimization, it avoids sending duplicate
// updates to the channel.
func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) {
// In case of TransientFailures allow the picker to be updated to update
// the connectivity error, in all other cases don't send duplicate state
// updates.
if newState.ConnectivityState == b.state && b.state != connectivity.TransientFailure {
return
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}
b.forceUpdateConcludedStateLocked(newState)
}

// forceUpdateConcludedStateLocked stores the state reported to the channel and
// calls ClientConn.UpdateState().
// A separate function is defined to force update the ClientConn state since the
// channel doesn't correctly assume that LB policies start in CONNECTING and
// relies on LB policy to send an initial CONNECTING update.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) {
b.state = newState.ConnectivityState
b.cc.UpdateState(newState)
}

type picker struct {
result balancer.PickResult
err error
Expand Down
Loading
Loading