Skip to content

Commit

Permalink
review: protect cancel func access with a mutex to avoid race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavogama-cll committed Dec 27, 2024
1 parent 4697ca1 commit 5c30694
Showing 1 changed file with 46 additions and 16 deletions.
62 changes: 46 additions & 16 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"encoding/hex"
"fmt"
"sync"
"time"

"github.com/avast/retry-go/v4"
Expand Down Expand Up @@ -143,7 +144,7 @@ type service struct {
lggr logger.Logger
version string
loopRegistrarConfig plugins.RegistrarConfig
syncNodeInfoCancel context.CancelFunc
syncNodeInfoCancel AtomicCancelFunc
}

// NewService constructs a new feeds service
Expand Down Expand Up @@ -186,7 +187,7 @@ func NewService(
lggr: lggr,
version: version,
loopRegistrarConfig: rc,
syncNodeInfoCancel: func() {},
syncNodeInfoCancel: AtomicCancelFunc{fn: func() {}},
}

return svc
Expand Down Expand Up @@ -261,20 +262,18 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar
// syncNodeInfoWithRetry syncs the node's information with FMS using a goroutine.
// In case of failures, it retries with an exponential backoff for up to 24h.
func (s *service) syncNodeInfoWithRetry(id int64) {
ctx, cancel := context.WithCancel(context.Background())

// cancel the previous context -- and, by extension, the existing goroutine --
// so that we can start anew
s.syncNodeInfoCancel()

var ctx context.Context
ctx, s.syncNodeInfoCancel = context.WithCancel(context.Background())
s.syncNodeInfoCancel.CallAndSwap(cancel)

retryOpts := []retry.Option{
retry.Context(ctx),
retry.Delay(5 * time.Second),
retry.DelayType(retry.BackOffDelay),
retry.Delay(10 * time.Second),
retry.MaxDelay(30 * time.Minute),
retry.Attempts(48 + 8), // 30m * 48 =~ 24h; plus the initial 8 shorter retries
retry.LastErrorOnly(true),
retry.OnRetry(func(attempt uint, err error) {
s.lggr.Info("failed to sync node info", "attempt", attempt, "err", err)
}),
Expand All @@ -288,8 +287,7 @@ func (s *service) syncNodeInfoWithRetry(id int64) {
s.lggr.Info("successfully synced node info")
}

s.syncNodeInfoCancel()
s.syncNodeInfoCancel = func() {}
s.syncNodeInfoCancel.CallAndSwap(func(){})
}()
}

Expand Down Expand Up @@ -1055,9 +1053,7 @@ func (s *service) CancelSpec(ctx context.Context, id int64) error {
)

err = s.transact(ctx, func(tx datasources) error {
var (
txerr error
)
var txerr error

if txerr = tx.orm.CancelSpec(ctx, id); txerr != nil {
return txerr
Expand Down Expand Up @@ -1180,7 +1176,7 @@ func (s *service) Close() error {
// This blocks until it finishes
s.connMgr.Close()

s.syncNodeInfoCancel()
s.syncNodeInfoCancel.CallAndSwap(func(){})

return nil
})
Expand Down Expand Up @@ -1243,8 +1239,10 @@ func (s *service) observeJobProposalCounts(ctx context.Context) error {
metrics := counts.toMetrics()

// Set the prometheus gauge metrics.
for _, status := range []JobProposalStatus{JobProposalStatusPending, JobProposalStatusApproved,
JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked} {
for _, status := range []JobProposalStatus{
JobProposalStatusPending, JobProposalStatusApproved,
JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked,
} {
status := status

promJobProposalCounts.With(prometheus.Labels{"status": string(status)}).Set(metrics[status])
Expand Down Expand Up @@ -1584,6 +1582,18 @@ func (s *service) isRevokable(propStatus JobProposalStatus, specStatus SpecStatu
return propStatus != JobProposalStatusDeleted && (specStatus == SpecStatusPending || specStatus == SpecStatusCancelled)
}

type AtomicCancelFunc struct {
fn context.CancelFunc
mutex sync.Mutex
}

func (f *AtomicCancelFunc) CallAndSwap(other func()) {
f.mutex.Lock()
defer f.mutex.Unlock()
f.fn()
f.fn = other
}

var _ Service = &NullService{}

// NullService defines an implementation of the Feeds Service that is used
Expand All @@ -1596,75 +1606,95 @@ func (ns NullService) Close() error { return nil }
func (ns NullService) ApproveSpec(ctx context.Context, id int64, force bool) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) CountJobProposalsByStatus(ctx context.Context) (*JobProposalCounts, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) CancelSpec(ctx context.Context, id int64) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ListSpecsByJobProposalIDs(ctx context.Context, ids []int64) ([]JobProposalSpec, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) GetManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ListManagersByIDs(ctx context.Context, ids []int64) ([]FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) GetSpec(ctx context.Context, id int64) (*JobProposalSpec, error) {
return nil, ErrFeedsManagerDisabled
}
func (ns NullService) ListManagers(ctx context.Context) ([]FeedsManager, error) { return nil, nil }
func (ns NullService) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) GetChainConfig(ctx context.Context, id int64) (*ChainConfig, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) DeleteChainConfig(ctx context.Context, id int64) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) ListChainConfigsByManagerIDs(ctx context.Context, mgrIDs []int64) ([]ChainConfig, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) {
return 0, ErrFeedsManagerDisabled
}
func (ns NullService) ListJobProposals(ctx context.Context) ([]JobProposal, error) { return nil, nil }
func (ns NullService) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]JobProposal, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) DeleteJob(ctx context.Context, args *DeleteJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RevokeJob(ctx context.Context, args *RevokeJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RegisterManager(ctx context.Context, params RegisterManagerParams) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RejectSpec(ctx context.Context, id int64) error {
return ErrFeedsManagerDisabled
}
func (ns NullService) SyncNodeInfo(ctx context.Context, id int64) error { return nil }
func (ns NullService) UpdateManager(ctx context.Context, mgr FeedsManager) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) EnableManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) DisableManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) IsJobManaged(ctx context.Context, jobID int64) (bool, error) {
return false, nil
}

func (ns NullService) UpdateSpecDefinition(ctx context.Context, id int64, spec string) error {
return ErrFeedsManagerDisabled
}
Expand Down

0 comments on commit 5c30694

Please sign in to comment.