Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdurham committed Oct 16, 2024
1 parent e6001c3 commit 8856717
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 35 deletions.
63 changes: 39 additions & 24 deletions internal/component/prometheus/operator/common/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,55 +62,70 @@ func (c *Component) CurrentHealth() component.Health {
return c.health
}

type cancelHandler struct {
mut sync.RWMutex
cancel context.CancelFunc
alreadyTrigger bool
}

func (ch *cancelHandler) Set(cncl context.CancelFunc) {
ch.mut.Lock()
defer ch.mut.Unlock()
ch.cancel = cncl
}

func (ch *cancelHandler) Cancel() {
ch.mut.Lock()
defer ch.mut.Unlock()
if ch.alreadyTrigger {
return
}
if ch.cancel == nil {
return
}
ch.cancel()
ch.alreadyTrigger = true
}

// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
// innerCtx gets passed to things we create, so we can restart everything anytime we get an update.
// Ideally, this component has very little dynamic config, and won't have frequent updates.
var innerCtx context.Context
// cancel is the func we use to trigger a stop to all downstream processors we create
var cancel func()
defer func() {
if cancel != nil {
cancel()
}
}()
var cancel cancelHandler

wg := sync.WaitGroup{}

c.reportHealth(nil)
errChan := make(chan error, 1)
runWg := sync.WaitGroup{}
defer runWg.Wait()

defer wg.Wait()
for {
select {
case <-ctx.Done():
if cancel != nil {
cancel()
}
cancel.Cancel()
return nil
case err := <-errChan:
c.reportHealth(err)
case <-c.onUpdate:
c.mut.Lock()
manager := c.crdManagerFactory.New(c.opts, c.cluster, c.opts.Logger, c.config, c.kind, c.ls)
c.manager = manager
c.manager = c.crdManagerFactory.New(c.opts, c.cluster, c.opts.Logger, c.config, c.kind, c.ls)

// Wait for the old manager to stop.
// If we start the new manager before stopping the old one,
// the new manager might not be able to register its debug metrics due to a duplicate registration error.
if cancel != nil {
cancel()
}
runWg.Wait()

innerCtx, cancel = context.WithCancel(ctx)
runWg.Add(1)
cancel.Cancel()
var cncl context.CancelFunc
innerCtx, cncl = context.WithCancel(ctx)
cancel.Set(cncl)
wg.Add(1)
go func() {
if err := manager.Run(innerCtx); err != nil {
if err := c.manager.Run(innerCtx); err != nil {
level.Error(c.opts.Logger).Log("msg", "error running crd manager", "err", err)
errChan <- err
}
runWg.Done()
wg.Done()
}()
c.mut.Unlock()
}
}
}
Expand Down
24 changes: 13 additions & 11 deletions internal/component/prometheus/operator/common/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -37,7 +38,8 @@ type crdManagerHungRun struct {
}

func (c *crdManagerHungRun) Run(ctx context.Context) error {
<-c.stopRun
<-ctx.Done()
c.stopRun <- struct{}{}
return nil
}

Expand Down Expand Up @@ -94,25 +96,25 @@ func TestRunExit(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
cmpRunExited := atomic.Bool{}
cmpRunExited.Store(false)

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
err := c.Run(ctx)
require.NoError(t, err)
cmpRunExited.Store(true)
fmt.Println("component.Run exited")
}()

wg.Wait()
// Stop the component.
// It shouldn't stop immediately, because the CRD Manager is hung.
cancelFunc()
<-stopRun

// Make sure component.Run didn't exit for a few seconds
fmt.Println("start sleeping")
time.Sleep(5 * time.Second)
fmt.Println("finished sleeping")

if cmpRunExited.Load() {
require.Fail(t, "component.Run exited")
}
require.Eventually(t, func() bool {
fmt.Println(cmpRunExited.Load())
return cmpRunExited.Load()
}, 10*time.Second, 1*time.Second)

// Make crdManager.Run exit
close(stopRun)
Expand Down

0 comments on commit 8856717

Please sign in to comment.