From 5cf90eea3c6dc10d004103b7c8f56661f677237a Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Thu, 12 Dec 2024 11:33:15 +0530 Subject: [PATCH] Address review --- balancer_wrapper.go | 39 ++++++++++++++++++++++----------------- health/producer.go | 22 ++++++---------------- 2 files changed, 28 insertions(+), 33 deletions(-) diff --git a/balancer_wrapper.go b/balancer_wrapper.go index dbb202a17f51..c2688376ae74 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -34,7 +34,15 @@ import ( "google.golang.org/grpc/status" ) -var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) +var ( + setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) + // noOpRegisterHealthListenerFn is used when client side health checking is + // disabled. It sends a single READY update on the registered listener. + noOpRegisterHealthListenerFn = func(_ context.Context, listener func(balancer.SubConnState)) func() { + listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + return func() {} + } +) // ccBalancerWrapper sits between the ClientConn and the Balancer. // @@ -424,28 +432,31 @@ func (acbw *acBalancerWrapper) closeProducers() { // for registering listeners. type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func() -// healthServiceOpts returns the service name and a function to register -// listener for client side health checking. It returns a nil -// registerHealthListenerFn if client side health checks are disabled. +// healthListenerRegFn returns a function to register a listener for health +// updates. If client side health checks are disabled, the registered listener +// will get a single READY (raw connectivity state) update. +// // Client side health checking is enabled when all the following // conditions are satisfied: // 1. Health checking is not disabled using the dial option. // 2. The health package is imported. // 3. The health check config is present in the service config. -func (acbw *acBalancerWrapper) healthServiceOpts() (string, healthProducerRegisterFn) { +func (acbw *acBalancerWrapper) healthListenerRegFn() func(context.Context, func(balancer.SubConnState)) func() { if acbw.ccb.cc.dopts.disableHealthCheck { - return "", nil + return noOpRegisterHealthListenerFn } regHealthLisFn := internal.RegisterClientHealthCheckListener if regHealthLisFn == nil { // The health package is not imported. - return "", nil + return noOpRegisterHealthListenerFn } cfg := acbw.ac.cc.healthCheckConfig() if cfg == nil { - return "", nil + return noOpRegisterHealthListenerFn + } + return func(ctx context.Context, listener func(balancer.SubConnState)) func() { + return regHealthLisFn.(healthProducerRegisterFn)(ctx, acbw, cfg.ServiceName, listener) } - return cfg.ServiceName, regHealthLisFn.(healthProducerRegisterFn) } // RegisterHealthListener accepts a health listener from the LB policy. It sends @@ -472,7 +483,7 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub return } - serviceName, registerFn := acbw.healthServiceOpts() + registerFn := acbw.healthListenerRegFn() acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { if ctx.Err() != nil || acbw.ccb.balancer == nil { return @@ -483,12 +494,6 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub if acbw.healthData != hd { return } - // If client side health checks are disabled, send the raw connectivity - // state and return. - if registerFn == nil { - listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) - return - } // Serialize the health updates from the health producer with // other calls into the LB policy. listenerWrapper := func(scs balancer.SubConnState) { @@ -505,6 +510,6 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub }) } - hd.closeHealthProducer = registerFn(ctx, acbw, serviceName, listenerWrapper) + hd.closeHealthProducer = registerFn(ctx, listenerWrapper) }) } diff --git a/health/producer.go b/health/producer.go index ebe67ee51ce2..f938e5790c7b 100644 --- a/health/producer.go +++ b/health/producer.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/status" ) @@ -42,19 +41,14 @@ var producerBuilderSingleton *producerBuilder // Build constructs and returns a producer and its cleanup function. func (*producerBuilder) Build(cci any) (balancer.Producer, func()) { - doneCh := make(chan struct{}) p := &healthServiceProducer{ - cc: cci.(grpc.ClientConnInterface), - cancelDone: doneCh, - cancel: grpcsync.OnceFunc(func() { - close(doneCh) - }), + cc: cci.(grpc.ClientConnInterface), + cancel: func() {}, } return p, func() { p.mu.Lock() defer p.mu.Unlock() p.cancel() - <-p.cancelDone } } @@ -63,9 +57,8 @@ type healthServiceProducer struct { // that and therefore do not need to be guarded by a mutex. cc grpc.ClientConnInterface - mu sync.Mutex - cancel func() - cancelDone chan (struct{}) + mu sync.Mutex + cancel func() } // registerClientSideHealthCheckListener accepts a listener to provide server @@ -76,21 +69,18 @@ func registerClientSideHealthCheckListener(ctx context.Context, sc balancer.SubC p.mu.Lock() defer p.mu.Unlock() p.cancel() - <-p.cancelDone if listener == nil { return closeFn } - p.cancelDone = make(chan struct{}) ctx, cancel := context.WithCancel(ctx) p.cancel = cancel - go p.startHealthCheck(ctx, sc, serviceName, listener, p.cancelDone) + go p.startHealthCheck(ctx, sc, serviceName, listener) return closeFn } -func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState), closeCh chan struct{}) { - defer close(closeCh) +func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState)) { newStream := func(method string) (any, error) { return p.cc.NewStream(ctx, &grpc.StreamDesc{ServerStreams: true}, method) }