Skip to content

Commit

Permalink
audit contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Oct 22, 2024
1 parent 95bdca3 commit d6daff8
Show file tree
Hide file tree
Showing 24 changed files with 269 additions and 223 deletions.
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
golang 1.20
mockery 2.12.1
mockery 2.43.2
34 changes: 0 additions & 34 deletions internal/util/closer.go

This file was deleted.

29 changes: 0 additions & 29 deletions internal/util/closer_test.go

This file was deleted.

17 changes: 12 additions & 5 deletions pkg/v2/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/smartcontractkit/chainlink-automation/pkg/util"
ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v2"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

var (
Expand Down Expand Up @@ -74,7 +75,7 @@ type reportCoordinator struct {

// run state data
running atomic.Bool
chStop chan struct{}
chStop services.StopChan
}

// NewReportCoordinator provides a new report coordinator. The coordinator
Expand Down Expand Up @@ -228,7 +229,7 @@ func (rc *reportCoordinator) checkLogs(ctx context.Context) error {
}
}

staleReportLogs, err = rc.logs.StaleReportLogs(context.Background())
staleReportLogs, err = rc.logs.StaleReportLogs(ctx)
// It can happen that in between the time the report is generated and it gets
// confirmed on chain something changes and it becomes stale. Current scenarios are:
// - Another report for the upkeep is transmitted making this report stale
Expand Down Expand Up @@ -386,13 +387,20 @@ func (rc *reportCoordinator) Close() error {
func (rc *reportCoordinator) run() {
cadence := time.Second
timer := time.NewTimer(cadence)
defer timer.Stop()

ctx, cancel := rc.chStop.NewCtx()
defer cancel()

for {
select {
case <-timer.C:
startTime := time.Now()

if err := rc.checkLogs(context.Background()); err != nil {
if err := rc.checkLogs(ctx); err != nil {
if ctx.Err() != nil {
return
}
rc.logger.Printf("failed to check perform and stale report logs: %s", err)
}

Expand All @@ -407,8 +415,7 @@ func (rc *reportCoordinator) run() {
// wait the difference between the cadence and the time taken
timer.Reset(cadence - diff)
}
case <-rc.chStop:
timer.Stop()
case <-ctx.Done():
return
}
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/v2/coordinator/mocks/encoder.generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 13 additions & 6 deletions pkg/v2/coordinator/mocks/log_provider.generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions pkg/v2/mocks/logger.generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 27 additions & 18 deletions pkg/v3/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"log"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/services"
common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

internalutil "github.com/smartcontractkit/chainlink-automation/internal/util"
"github.com/smartcontractkit/chainlink-automation/pkg/util"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/config"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
Expand All @@ -21,7 +21,9 @@ const (
)

type coordinator struct {
closer internalutil.Closer
services.StateMachine
stopCh services.StopChan
done chan struct{}
logger *log.Logger

eventsProvider types.TransmitEventProvider
Expand All @@ -46,6 +48,8 @@ type record struct {
func NewCoordinator(transmitEventProvider types.TransmitEventProvider, upkeepTypeGetter types.UpkeepTypeGetter, conf config.OffchainConfig, logger *log.Logger) *coordinator {
performLockoutWindow := time.Duration(conf.PerformLockoutWindow) * time.Millisecond
return &coordinator{
stopCh: make(chan struct{}),
done: make(chan struct{}),
logger: logger,
eventsProvider: transmitEventProvider,
upkeepTypeGetter: upkeepTypeGetter,
Expand Down Expand Up @@ -208,16 +212,24 @@ func (c *coordinator) checkEvents(ctx context.Context) error {
return nil
}

func (c *coordinator) run(ctx context.Context) {
func (c *coordinator) run() {
defer close(c.done)

timer := time.NewTimer(cadence)
defer timer.Stop()

ctx, cancel := c.stopCh.NewCtx()
defer cancel()

for {
select {
case <-timer.C:
startTime := time.Now()

if err := c.checkEvents(context.Background()); err != nil {
if err := c.checkEvents(ctx); err != nil {
if ctx.Err() != nil {
return
}
c.logger.Printf("failed to check for transmit events: %s", err)
}

Expand All @@ -239,32 +251,29 @@ func (c *coordinator) run(ctx context.Context) {
}

// Start starts all subprocesses
func (c *coordinator) Start(pctx context.Context) error {
ctx, cancel := context.WithCancel(pctx)
defer cancel()

if !c.closer.Store(cancel) {
return fmt.Errorf("process already running")
func (c *coordinator) Start(_ context.Context) error {
if err := c.StateMachine.StartOnce("Coordinator", func() error { return nil }); err != nil {
return err
}

go c.cache.Start(defaultCacheClean)
go c.visited.Start(defaultCacheClean)

c.run(ctx)

c.run()
return nil
}

// Close terminates all subprocesses
func (c *coordinator) Close() error {
if !c.closer.Close() {
return fmt.Errorf("process not running")
}
return c.StateMachine.StopOnce("Coordinator", func() error {
close(c.stopCh)
<-c.done

c.cache.Stop()
c.visited.Stop()
c.cache.Stop()
c.visited.Stop()

return nil
return nil
})
}

func (c *coordinator) visitedID(e common.TransmitEvent) string {
Expand Down
13 changes: 0 additions & 13 deletions pkg/v3/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,6 @@ func TestNewCoordinator(t *testing.T) {
wg.Wait()
assert.True(t, strings.Contains(memLog.String(), "check database indexes and other performance improvements"))
})

t.Run("starting an already started coordinator returns an error", func(t *testing.T) {
c := NewCoordinator(nil, nil, config.OffchainConfig{PerformLockoutWindow: 3600 * 1000, MinConfirmations: 2}, nil)
c.closer.Store(context.CancelFunc(func() {}))
err := c.Start(context.Background())
assert.Error(t, err)
})

t.Run("closing an already closed coordinator returns an error", func(t *testing.T) {
c := NewCoordinator(nil, nil, config.OffchainConfig{PerformLockoutWindow: 3600 * 1000, MinConfirmations: 2}, nil)
err := c.Close()
assert.Error(t, err)
})
}

func TestNewCoordinator_checkEvents(t *testing.T) {
Expand Down
Loading

0 comments on commit d6daff8

Please sign in to comment.