Skip to content

Commit

Permalink
Address review
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Dec 12, 2024
1 parent cb0f4a6 commit 5cf90ee
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 33 deletions.
39 changes: 22 additions & 17 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ import (
"google.golang.org/grpc/status"
)

var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
var (
setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
// noOpRegisterHealthListenerFn is used when client side health checking is
// disabled. It sends a single READY update on the registered listener.
noOpRegisterHealthListenerFn = func(_ context.Context, listener func(balancer.SubConnState)) func() {
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
return func() {}
}
)

// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
Expand Down Expand Up @@ -424,28 +432,31 @@ func (acbw *acBalancerWrapper) closeProducers() {
// for registering listeners.
type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func()

// healthServiceOpts returns the service name and a function to register
// listener for client side health checking. It returns a nil
// registerHealthListenerFn if client side health checks are disabled.
// healthListenerRegFn returns a function to register a listener for health
// updates. If client side health checks are disabled, the registered listener
// will get a single READY (raw connectivity state) update.
//
// Client side health checking is enabled when all the following
// conditions are satisfied:
// 1. Health checking is not disabled using the dial option.
// 2. The health package is imported.
// 3. The health check config is present in the service config.
func (acbw *acBalancerWrapper) healthServiceOpts() (string, healthProducerRegisterFn) {
func (acbw *acBalancerWrapper) healthListenerRegFn() func(context.Context, func(balancer.SubConnState)) func() {
if acbw.ccb.cc.dopts.disableHealthCheck {
return "", nil
return noOpRegisterHealthListenerFn
}
regHealthLisFn := internal.RegisterClientHealthCheckListener
if regHealthLisFn == nil {
// The health package is not imported.
return "", nil
return noOpRegisterHealthListenerFn
}
cfg := acbw.ac.cc.healthCheckConfig()
if cfg == nil {
return "", nil
return noOpRegisterHealthListenerFn
}
return func(ctx context.Context, listener func(balancer.SubConnState)) func() {
return regHealthLisFn.(healthProducerRegisterFn)(ctx, acbw, cfg.ServiceName, listener)
}
return cfg.ServiceName, regHealthLisFn.(healthProducerRegisterFn)
}

// RegisterHealthListener accepts a health listener from the LB policy. It sends
Expand All @@ -472,7 +483,7 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
return
}

serviceName, registerFn := acbw.healthServiceOpts()
registerFn := acbw.healthListenerRegFn()
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
Expand All @@ -483,12 +494,6 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
if acbw.healthData != hd {
return
}
// If client side health checks are disabled, send the raw connectivity
// state and return.
if registerFn == nil {
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
return
}
// Serialize the health updates from the health producer with
// other calls into the LB policy.
listenerWrapper := func(scs balancer.SubConnState) {
Expand All @@ -505,6 +510,6 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
})
}

hd.closeHealthProducer = registerFn(ctx, acbw, serviceName, listenerWrapper)
hd.closeHealthProducer = registerFn(ctx, listenerWrapper)
})
}
22 changes: 6 additions & 16 deletions health/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/status"
)

Expand All @@ -42,19 +41,14 @@ var producerBuilderSingleton *producerBuilder

// Build constructs and returns a producer and its cleanup function.
func (*producerBuilder) Build(cci any) (balancer.Producer, func()) {
doneCh := make(chan struct{})
p := &healthServiceProducer{
cc: cci.(grpc.ClientConnInterface),
cancelDone: doneCh,
cancel: grpcsync.OnceFunc(func() {
close(doneCh)
}),
cc: cci.(grpc.ClientConnInterface),
cancel: func() {},
}
return p, func() {
p.mu.Lock()
defer p.mu.Unlock()
p.cancel()
<-p.cancelDone
}
}

Expand All @@ -63,9 +57,8 @@ type healthServiceProducer struct {
// that and therefore do not need to be guarded by a mutex.
cc grpc.ClientConnInterface

mu sync.Mutex
cancel func()
cancelDone chan (struct{})
mu sync.Mutex
cancel func()
}

// registerClientSideHealthCheckListener accepts a listener to provide server
Expand All @@ -76,21 +69,18 @@ func registerClientSideHealthCheckListener(ctx context.Context, sc balancer.SubC
p.mu.Lock()
defer p.mu.Unlock()
p.cancel()
<-p.cancelDone
if listener == nil {
return closeFn
}

Check warning on line 74 in health/producer.go

View check run for this annotation

Codecov / codecov/patch

health/producer.go#L73-L74

Added lines #L73 - L74 were not covered by tests

p.cancelDone = make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel

go p.startHealthCheck(ctx, sc, serviceName, listener, p.cancelDone)
go p.startHealthCheck(ctx, sc, serviceName, listener)
return closeFn
}

func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState), closeCh chan struct{}) {
defer close(closeCh)
func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState)) {
newStream := func(method string) (any, error) {
return p.cc.NewStream(ctx, &grpc.StreamDesc{ServerStreams: true}, method)
}
Expand Down

0 comments on commit 5cf90ee

Please sign in to comment.