diff --git a/internal/component/prometheus/operator/common/component.go b/internal/component/prometheus/operator/common/component.go index 11a8c014bb..34f3741536 100644 --- a/internal/component/prometheus/operator/common/component.go +++ b/internal/component/prometheus/operator/common/component.go @@ -62,55 +62,70 @@ func (c *Component) CurrentHealth() component.Health { return c.health } +type cancelHandler struct { + mut sync.RWMutex + cancel context.CancelFunc + alreadyTrigger bool +} + +func (ch *cancelHandler) Set(cncl context.CancelFunc) { + ch.mut.Lock() + defer ch.mut.Unlock() + ch.cancel = cncl +} + +func (ch *cancelHandler) Cancel() { + ch.mut.Lock() + defer ch.mut.Unlock() + if ch.alreadyTrigger { + return + } + if ch.cancel == nil { + return + } + ch.cancel() + ch.alreadyTrigger = true +} + // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { // innerCtx gets passed to things we create, so we can restart everything anytime we get an update. // Ideally, this component has very little dynamic config, and won't have frequent updates. var innerCtx context.Context // cancel is the func we use to trigger a stop to all downstream processors we create - var cancel func() - defer func() { - if cancel != nil { - cancel() - } - }() + var cancel cancelHandler + + wg := sync.WaitGroup{} c.reportHealth(nil) errChan := make(chan error, 1) - runWg := sync.WaitGroup{} - defer runWg.Wait() + + defer wg.Wait() for { select { case <-ctx.Done(): - if cancel != nil { - cancel() - } + cancel.Cancel() return nil case err := <-errChan: c.reportHealth(err) case <-c.onUpdate: - c.mut.Lock() - manager := c.crdManagerFactory.New(c.opts, c.cluster, c.opts.Logger, c.config, c.kind, c.ls) - c.manager = manager + c.manager = c.crdManagerFactory.New(c.opts, c.cluster, c.opts.Logger, c.config, c.kind, c.ls) // Wait for the old manager to stop. // If we start the new manager before stopping the old one, // the new manager might not be able to register its debug metrics due to a duplicate registration error. - if cancel != nil { - cancel() - } - runWg.Wait() - - innerCtx, cancel = context.WithCancel(ctx) - runWg.Add(1) + cancel.Cancel() + var cncl context.CancelFunc + innerCtx, cncl = context.WithCancel(ctx) + cancel.Set(cncl) + wg.Add(1) go func() { - if err := manager.Run(innerCtx); err != nil { + if err := c.manager.Run(innerCtx); err != nil { level.Error(c.opts.Logger).Log("msg", "error running crd manager", "err", err) errChan <- err } - runWg.Done() + wg.Done() }() - c.mut.Unlock() } } } diff --git a/internal/component/prometheus/operator/common/component_test.go b/internal/component/prometheus/operator/common/component_test.go index 7b15853ba2..363266bb4f 100644 --- a/internal/component/prometheus/operator/common/component_test.go +++ b/internal/component/prometheus/operator/common/component_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "sync" "testing" "time" @@ -37,7 +38,8 @@ type crdManagerHungRun struct { } func (c *crdManagerHungRun) Run(ctx context.Context) error { - <-c.stopRun + <-ctx.Done() + c.stopRun <- struct{}{} return nil } @@ -94,25 +96,25 @@ func TestRunExit(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) cmpRunExited := atomic.Bool{} cmpRunExited.Store(false) + + wg := &sync.WaitGroup{} + wg.Add(1) go func() { + wg.Done() err := c.Run(ctx) require.NoError(t, err) cmpRunExited.Store(true) - fmt.Println("component.Run exited") }() - + wg.Wait() // Stop the component. // It shouldn't stop immediately, because the CRD Manager is hung. cancelFunc() + <-stopRun - // Make sure component.Run didn't exit for a few seconds - fmt.Println("start sleeping") - time.Sleep(5 * time.Second) - fmt.Println("finished sleeping") - - if cmpRunExited.Load() { - require.Fail(t, "component.Run exited") - } + require.Eventually(t, func() bool { + fmt.Println(cmpRunExited.Load()) + return cmpRunExited.Load() + }, 10*time.Second, 1*time.Second) // Make crdManager.Run exit close(stopRun)