diff --git a/option.go b/option.go index e6b82a1..e519ea2 100644 --- a/option.go +++ b/option.go @@ -171,7 +171,7 @@ func WithHTTPPProfService(enabled bool) Option { func WithHTTPHealthzService(enabled bool) Option { return func(inst *Server) { if config.GetBool(inst.Config(), "service.healthz.enabled", enabled)() { - service := NewDefaultServiceHTTPProbes(inst.probes) + service := NewDefaultServiceHTTPProbes(inst.probes()) inst.initServices = append(inst.initServices, service) inst.AddAlwaysHealthzers(service) } diff --git a/server.go b/server.go index b38c095..df43165 100644 --- a/server.go +++ b/server.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "os/signal" + "sync" "sync/atomic" "syscall" "time" @@ -39,8 +40,10 @@ type Server struct { shutdownSignals []os.Signal shutdownTimeout time.Duration running atomic.Bool - closers []interface{} - probes map[HealthzType][]interface{} + syncClosers []interface{} + syncClosersLock sync.RWMutex + syncProbes map[HealthzType][]interface{} + syncProbesLock sync.RWMutex ctx context.Context ctxCancel context.Context ctxCancelFn context.CancelFunc @@ -54,7 +57,7 @@ func NewServer(opts ...Option) *Server { inst := &Server{ shutdownTimeout: 30 * time.Second, shutdownSignals: []os.Signal{os.Interrupt, syscall.SIGTERM}, - probes: map[HealthzType][]interface{}{}, + syncProbes: map[HealthzType][]interface{}{}, ctx: context.Background(), c: config.Config(), l: log.Logger(), @@ -78,7 +81,7 @@ func NewServer(opts ...Option) *Server { defer timeoutCancel() // append internal closers - closers := append(inst.closers, inst.traceProvider, inst.meterProvider) //nolint:gocritic + closers := append(inst.closers(), inst.traceProvider, inst.meterProvider) for _, closer := range closers { l := inst.l.With(log.FName(fmt.Sprintf("%T", closer))) @@ -223,7 +226,7 @@ func (s *Server) AddServices(services ...Service) { // AddCloser adds a closer to be called on shutdown func (s *Server) AddCloser(closer interface{}) { - for _, value := range s.closers { + for _, value := range s.closers() { if value == closer { return } @@ -245,7 +248,7 @@ func (s *Server) AddCloser(closer interface{}) { ErrorUnsubscriber, UnsubscriberWithContext, ErrorUnsubscriberWithContext: - s.closers = append(s.closers, closer) + s.addClosers(closer) default: s.l.Warn("unable to add closer", log.FValue(fmt.Sprintf("%T", closer))) } @@ -267,7 +270,7 @@ func (s *Server) AddHealthzer(typ HealthzType, probe interface{}) { ErrorHealthzWithContext, ErrorPinger, ErrorPingerWithContext: - s.probes[typ] = append(s.probes[typ], probe) + s.addProbes(typ, probe) default: s.l.Debug("not a healthz probe", log.FValue(fmt.Sprintf("%T", probe))) } @@ -345,6 +348,30 @@ func (s *Server) Run() { s.l.Info("keel server stopped") } +func (s *Server) closers() []interface{} { + s.syncClosersLock.RLock() + defer s.syncClosersLock.RUnlock() + return s.syncClosers +} + +func (s *Server) addClosers(v ...interface{}) { + s.syncClosersLock.Lock() + defer s.syncClosersLock.Unlock() + s.syncClosers = append(s.syncClosers, v...) +} + +func (s *Server) probes() map[HealthzType][]interface{} { + s.syncProbesLock.RLock() + defer s.syncProbesLock.RUnlock() + return s.syncProbes +} + +func (s *Server) addProbes(typ HealthzType, v ...interface{}) { + s.syncProbesLock.Lock() + defer s.syncProbesLock.Unlock() + s.syncProbes[typ] = append(s.syncProbes[typ], v...) +} + // startService starts the given services func (s *Server) startService(services ...Service) { for _, service := range services { diff --git a/serviceenabler.go b/serviceenabler.go index dd01a1a..bfcdccc 100644 --- a/serviceenabler.go +++ b/serviceenabler.go @@ -2,6 +2,7 @@ package keel import ( "context" + "sync" "time" "go.uber.org/zap" @@ -12,23 +13,25 @@ import ( type ServiceFunc func() Service type ServiceEnabler struct { - l *zap.Logger - ctx context.Context - name string - service Service - serviceFn ServiceFunc - enabled bool - enabledFn func() bool - closed bool + l *zap.Logger + ctx context.Context + name string + service Service + serviceFn ServiceFunc + syncEnabled bool + syncEnabledLock sync.RWMutex + enabledFn func() bool + syncClosed bool + syncClosedLock sync.RWMutex } func NewServiceEnabler(l *zap.Logger, name string, serviceFn ServiceFunc, enabledFn func() bool) *ServiceEnabler { return &ServiceEnabler{ - l: log.WithServiceName(l, name), - name: name, - serviceFn: serviceFn, - enabled: enabledFn(), - enabledFn: enabledFn, + l: log.WithServiceName(l, name), + name: name, + serviceFn: serviceFn, + syncEnabled: enabledFn(), + enabledFn: enabledFn, } } @@ -36,15 +39,65 @@ func (w *ServiceEnabler) Name() string { return w.name } +func (w *ServiceEnabler) Start(ctx context.Context) error { + w.watch() + w.ctx = ctx + if w.enabled() { + if err := w.enable(w.ctx); err != nil { + return err + } + } else { + w.l.Info("skipping disabled dynamic service") + } + return nil +} + +func (w *ServiceEnabler) Close(ctx context.Context) error { + l := log.WithServiceName(w.l, w.Name()) + w.setClosed(true) + if w.enabled() { + if err := w.disable(w.ctx); err != nil { + return err + } + } else { + l.Info("skipping disabled dynamic service") + } + return nil +} + +func (w *ServiceEnabler) closed() bool { + w.syncClosedLock.RLock() + defer w.syncClosedLock.RUnlock() + return w.syncClosed +} + +func (w *ServiceEnabler) setClosed(v bool) { + w.syncClosedLock.Lock() + defer w.syncClosedLock.Unlock() + w.syncClosed = v +} + +func (w *ServiceEnabler) enabled() bool { + w.syncEnabledLock.RLock() + defer w.syncEnabledLock.RUnlock() + return w.syncEnabled +} + +func (w *ServiceEnabler) setEnabled(v bool) { + w.syncEnabledLock.Lock() + defer w.syncEnabledLock.Unlock() + w.syncEnabled = v +} + func (w *ServiceEnabler) enable(ctx context.Context) error { - w.enabled = true + w.setEnabled(true) w.service = w.serviceFn() w.l.Info("starting dynamic service") return w.service.Start(ctx) } func (w *ServiceEnabler) disable(ctx context.Context) error { - w.enabled = false + w.setEnabled(false) w.l.Info("stopping dynamic service") return w.service.Close(ctx) } @@ -52,11 +105,11 @@ func (w *ServiceEnabler) disable(ctx context.Context) error { func (w *ServiceEnabler) watch() { go func() { for { - if w.closed { + if w.closed() { break } time.Sleep(time.Second) - if value := w.enabledFn(); value != w.enabled { + if value := w.enabledFn(); value != w.enabled() { if value { go func() { if err := w.enable(w.ctx); err != nil { @@ -72,29 +125,3 @@ func (w *ServiceEnabler) watch() { } }() } - -func (w *ServiceEnabler) Start(ctx context.Context) error { - w.watch() - w.ctx = ctx - if w.enabled { - if err := w.enable(w.ctx); err != nil { - return err - } - } else { - w.l.Info("skipping disabled dynamic service") - } - return nil -} - -func (w *ServiceEnabler) Close(ctx context.Context) error { - l := log.WithServiceName(w.l, w.Name()) - w.closed = true - if w.enabled { - if err := w.disable(w.ctx); err != nil { - return err - } - } else { - l.Info("skipping disabled dynamic service") - } - return nil -}