Skip to content

Commit

Permalink
stop using runner manager bc it was killing all runners which was cau…
Browse files Browse the repository at this point in the history
…sing super confusing issues

Signed-off-by: Cassandra Coyle <[email protected]>
  • Loading branch information
cicoyle committed Nov 25, 2024
1 parent 9c8b3a1 commit 8c72309
Show file tree
Hide file tree
Showing 8 changed files with 672 additions and 78 deletions.
138 changes: 104 additions & 34 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"sync/atomic"
"time"

"github.com/dapr/kit/concurrency"
"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"go.etcd.io/etcd/client/pkg/v3/logutil"
Expand Down Expand Up @@ -70,6 +69,7 @@ type Options struct {
// cron is the implementation of the cron interface.
type cron struct {
log logr.Logger
wg sync.WaitGroup

key *key.Key
leadership *leadership.Leadership
Expand All @@ -78,11 +78,12 @@ type cron struct {
triggerFn api.TriggerFunction
gcgInterval *time.Duration

lock sync.RWMutex
engine atomic.Pointer[engine.Engine]
running atomic.Bool
readyCh chan struct{}
closeCh chan struct{}
lock sync.RWMutex
engine atomic.Pointer[engine.Engine]
running atomic.Bool
readyCh chan struct{}
closeCh chan struct{}
restartingCh chan struct{}
}

// New creates a new cron instance.
Expand Down Expand Up @@ -131,15 +132,17 @@ func New(opts Options) (api.Interface, error) {
})

return &cron{
log: log,
key: key,
client: client,
part: part,
triggerFn: opts.TriggerFn,
gcgInterval: opts.CounterGarbageCollectionInterval,
leadership: leadership,
readyCh: make(chan struct{}),
closeCh: make(chan struct{}),
log: log,
wg: sync.WaitGroup{},
key: key,
client: client,
part: part,
triggerFn: opts.TriggerFn,
gcgInterval: opts.CounterGarbageCollectionInterval,
leadership: leadership,
readyCh: make(chan struct{}),
closeCh: make(chan struct{}),
restartingCh: make(chan struct{}),
}, nil
}

Expand All @@ -151,10 +154,45 @@ func (c *cron) Run(ctx context.Context) error {

defer close(c.closeCh)

err := concurrency.NewRunnerManager(
c.leadership.Run,
func(ctx context.Context) error {
for {
errCh := make(chan error, 2)
defer close(errCh)

c.wg.Add(2)
defer c.wg.Wait()

go func(ctx context.Context) {
defer c.wg.Done()
if err := c.leadership.Run(ctx); err != nil {
if errors.Is(err, context.Canceled) {
// Ignore context cancellation errors
return
}
select {
case errCh <- err:
case <-ctx.Done():
}
return
}
return

Check failure on line 176 in cron/cron.go

View workflow job for this annotation

GitHub Actions / tests

S1023: redundant `return` statement (gosimple)
}(ctx)

engineCtx, engineCancel := context.WithCancel(ctx)
defer engineCancel()

go func(ctx context.Context) {
defer c.wg.Done()
for {
leadershipCtx, leadershipCancel := context.WithCancel(ctx)

select {
case <-ctx.Done():
leadershipCancel()
return
default:
c.lock.Lock()
c.restartingCh = make(chan struct{})
c.lock.Unlock()

engine, err := engine.New(engine.Options{
Log: c.log,
Key: c.key,
Expand All @@ -164,42 +202,74 @@ func (c *cron) Run(ctx context.Context) error {
CounterGarbageCollectionInterval: c.gcgInterval,
})
if err != nil {
return fmt.Errorf("failed to create engine: %w", err)
leadershipCancel()
select {
case errCh <- err:
case <-ctx.Done():
}
return
}

ectx, err := c.leadership.WaitForLeadership(ctx)
ectx, err := c.leadership.WaitForLeadership(leadershipCtx)
if err != nil {
return err
leadershipCancel()
select {
case errCh <- err:
case <-ctx.Done():
}
return
}

// Store the engine once ready
c.lock.Lock()
c.engine.Store(engine)
close(c.readyCh)
c.lock.Unlock()

// Run engine with leadership context
if err := engine.Run(ectx); err != nil {
return err
}

if err := ctx.Err(); err != nil {
return err
leadershipCancel()
select {
case errCh <- err:
case <-ctx.Done():
}
return
}

c.log.Info("restarting engine")
// Restart engine loop
c.log.Info("Restarting engine due to leadership change")

c.lock.Lock()
close(c.restartingCh)
c.readyCh = make(chan struct{})
c.lock.Unlock()

// Check for cancellation again
if err := ctx.Err(); err != nil {
leadershipCancel()
select {
case errCh <- err:
case <-ctx.Done():
}
return
}
}
},
).Run(ctx)
if err != nil {
return err
}
}
}(engineCtx)

<-ctx.Done() // block until cron is done
c.log.Info("cron shutdown gracefully")

return nil
select {
case err := <-errCh:
if errors.Is(err, context.Canceled) {
// Ignore context cancellation errors
return nil
}
return err
default:
return nil
}
}

// Add forwards the call to the embedded API.
Expand Down
Loading

0 comments on commit 8c72309

Please sign in to comment.