Skip to content

Commit

Permalink
Retry Q: support custom intervals (#284)
Browse files Browse the repository at this point in the history
* added RetryInterval field

* modify RetryQ to support injected interval

* fix tests

* fix comment
  • Loading branch information
amirylm authored Nov 6, 2023
1 parent 7fc76aa commit 9fea8f7
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 36 deletions.
16 changes: 10 additions & 6 deletions pkg/v3/flows/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,16 @@ func TestRetryFlow(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)

err := retryQ.Enqueue(ocr2keepers.UpkeepPayload{
UpkeepID: ocr2keepers.UpkeepIdentifier([32]byte{1}),
WorkID: "0x1",
}, ocr2keepers.UpkeepPayload{
UpkeepID: ocr2keepers.UpkeepIdentifier([32]byte{2}),
WorkID: "0x2",
err := retryQ.Enqueue(ocr2keepers.RetryRecord{
Payload: ocr2keepers.UpkeepPayload{
UpkeepID: ocr2keepers.UpkeepIdentifier([32]byte{1}),
WorkID: "0x1",
},
}, ocr2keepers.RetryRecord{
Payload: ocr2keepers.UpkeepPayload{
UpkeepID: ocr2keepers.UpkeepIdentifier([32]byte{2}),
WorkID: "0x2",
},
})
assert.NoError(t, err)

Expand Down
30 changes: 15 additions & 15 deletions pkg/v3/plugin/ocr3_test.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion pkg/v3/postprocessors/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ func (p *retryablePostProcessor) PostProcess(_ context.Context, results []ocr2ke
retryable := 0
for i, res := range results {
if res.PipelineExecutionState != 0 && res.Retryable {
e := p.q.Enqueue(payloads[i])
e := p.q.Enqueue(ocr2keepers.RetryRecord{
Payload: payloads[i],
Interval: res.RetryInterval,
})
if e == nil {
retryable++
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/v3/postprocessors/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"log"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -19,7 +20,7 @@ func TestRetryPostProcessor_PostProcess(t *testing.T) {
results := []ocr2keepers.CheckResult{
{Retryable: true, PipelineExecutionState: 1},
{Retryable: false, PipelineExecutionState: 3},
{Retryable: true, PipelineExecutionState: 2},
{Retryable: true, RetryInterval: time.Second, PipelineExecutionState: 2},
}

// Call the PostProcess method
Expand Down
20 changes: 16 additions & 4 deletions pkg/v3/stores/retry_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import (
)

var (
// DefaultExpiration is the default expiration time for items in the queue
DefaultExpiration = 24 * time.Hour
RetryInterval = 30 * time.Second
// RetryInterval is the default time between retries
RetryInterval = 30 * time.Second
)

type retryQueueRecord struct {
// payload is the desired unit of work to be retried
payload types.UpkeepPayload
// interval is the retry interval for the payload
interval time.Duration
// pending is true if the item is currently being retried
pending bool
// createdAt is the first time the item was seen by the queue
Expand Down Expand Up @@ -55,13 +59,14 @@ func NewRetryQueue(lggr *log.Logger) *retryQueue {
}
}

func (q *retryQueue) Enqueue(payloads ...types.UpkeepPayload) error {
func (q *retryQueue) Enqueue(records ...types.RetryRecord) error {
q.lock.Lock()
defer q.lock.Unlock()

now := time.Now()

for _, payload := range payloads {
for _, rec := range records {
payload := rec.Payload
record, ok := q.records[payload.WorkID]
if !ok {
record = retryQueueRecord{
Expand All @@ -79,6 +84,13 @@ func (q *retryQueue) Enqueue(payloads ...types.UpkeepPayload) error {
// (can happen when the same payload gets retryable error again)
record.updatedAt = now
record.pending = false
// if some custom interval is set for this record, use it.
// otherwise use the default interval
if rec.Interval > 0 {
record.interval = rec.Interval
} else {
record.interval = q.interval
}
q.records[payload.WorkID] = record
}

Expand All @@ -105,7 +117,7 @@ func (q *retryQueue) Dequeue(n int) ([]types.UpkeepPayload, error) {
if record.pending {
continue
}
if record.elapsed(now, q.interval) {
if record.elapsed(now, record.interval) {
results = append(results, record.payload)
record.pending = true
q.records[k] = record
Expand Down
28 changes: 21 additions & 7 deletions pkg/v3/stores/retry_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func TestRetryQueue_Sanity(t *testing.T) {
q := NewRetryQueue(log.New(io.Discard, "", 0))

err := q.Enqueue(
ocr2keepers.UpkeepPayload{WorkID: "1"},
ocr2keepers.UpkeepPayload{WorkID: "2"},
newRetryRecord(ocr2keepers.UpkeepPayload{WorkID: "1"}, 0),
newRetryRecord(ocr2keepers.UpkeepPayload{WorkID: "2"}, time.Millisecond*5),
)
require.NoError(t, err)

err = q.Enqueue(
ocr2keepers.UpkeepPayload{WorkID: "2"},
ocr2keepers.UpkeepPayload{WorkID: "3"},
newRetryRecord(ocr2keepers.UpkeepPayload{WorkID: "2"}, 0),
newRetryRecord(ocr2keepers.UpkeepPayload{WorkID: "3"}, 0),
)
require.NoError(t, err)

Expand All @@ -54,6 +54,13 @@ func TestRetryQueue_Sanity(t *testing.T) {
require.Len(t, items, 0)

require.Equal(t, 3, q.Size())

// adding a record with a custom interval
err = q.Enqueue(
newRetryRecord(ocr2keepers.UpkeepPayload{WorkID: "4"}, defaultExpiration-time.Millisecond*5),
)
require.NoError(t, err)
require.Equal(t, 4, q.Size())
// dequeue after retry interval elapsed
go func() {
defer cancel()
Expand All @@ -62,7 +69,7 @@ func TestRetryQueue_Sanity(t *testing.T) {
require.NoError(t, err)
require.Len(t, items, 2)

require.Equal(t, 1, q.Size())
require.Equal(t, 2, q.Size())
}()

<-ctx.Done()
Expand All @@ -78,8 +85,8 @@ func TestRetryQueue_Expiration(t *testing.T) {

t.Run("dequeue before expiration", func(t *testing.T) {
err := q.Enqueue(
ocr2keepers.UpkeepPayload{WorkID: "1"},
ocr2keepers.UpkeepPayload{WorkID: "2"},
newRetryRecord(ocr2keepers.UpkeepPayload{WorkID: "1"}, 0),
newRetryRecord(ocr2keepers.UpkeepPayload{WorkID: "2"}, 0),
)
require.NoError(t, err)
require.Equal(t, 2, q.Size())
Expand All @@ -105,3 +112,10 @@ func TestRetryQueue_Expiration(t *testing.T) {
require.Equal(t, 0, n)
})
}

func newRetryRecord(payload ocr2keepers.UpkeepPayload, interval time.Duration) ocr2keepers.RetryRecord {
return ocr2keepers.RetryRecord{
Payload: payload,
Interval: interval,
}
}
16 changes: 15 additions & 1 deletion pkg/v3/types/basetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"strings"
"time"
)

const (
Expand All @@ -14,6 +15,7 @@ const (
var checkResultStringTemplate = `{
"PipelineExecutionState":%d,
"Retryable":%v,
"RetryInterval":%d,
"Eligible":%v,
"IneligibilityReason":%d,
"UpkeepID":%s,
Expand Down Expand Up @@ -130,6 +132,10 @@ type CheckResult struct {
// if PipelineExecutionState is non zero, then retryable indicates that the same
// payload can be processed again in order to get a successful execution
Retryable bool
// RetryInterval is the time interval after which the same payload can be retried.
// This field is used is special cases (such as mercury lookup), where we want to
// have a different retry interval than the default one (30s)
RetryInterval time.Duration
// Rest of these fields are only applicable if PipelineExecutionState is zero
// Eligible indicates whether this result is eligible to be performed
Eligible bool
Expand Down Expand Up @@ -210,7 +216,7 @@ func (r CheckResult) UniqueID() string {

func (r CheckResult) String() string {
return fmt.Sprintf(
checkResultStringTemplate, r.PipelineExecutionState, r.Retryable, r.Eligible,
checkResultStringTemplate, r.PipelineExecutionState, r.Retryable, r.RetryInterval, r.Eligible,
r.IneligibilityReason, r.UpkeepID, r.Trigger, r.WorkID, r.GasAllocated,
hex.EncodeToString(r.PerformData), r.FastGasWei, r.LinkNative,
)
Expand Down Expand Up @@ -271,3 +277,11 @@ type ReportedUpkeep struct {
// WorkID represents the unit of work for the reported upkeep
WorkID string
}

// RetryRecord is a record of a payload that can be retried after a certain interval.
type RetryRecord struct {
// payload is the desired unit of work to be retried
Payload UpkeepPayload
// Interval is the time interval after which the same payload can be retried.
Interval time.Duration
}
12 changes: 12 additions & 0 deletions pkg/v3/types/basetypes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func TestCheckResultString(t *testing.T) {
input := CheckResult{
PipelineExecutionState: 1,
Retryable: true,
RetryInterval: 1,
Eligible: true,
IneligibilityReason: 10,
UpkeepID: UpkeepIdentifier{1, 2, 3, 4, 5, 6, 7, 8},
Expand All @@ -240,6 +241,7 @@ func TestCheckResultString(t *testing.T) {
{
"PipelineExecutionState":1,
"Retryable":true,
"RetryInterval":1,
"Eligible":true,
"IneligibilityReason":10,
"UpkeepID":455867356320691211288303676705517652851520854420902457558325773249309310976,
Expand Down Expand Up @@ -275,6 +277,7 @@ func TestCheckResult_UniqueID(t *testing.T) {
result: CheckResult{
PipelineExecutionState: 0,
Retryable: false,
RetryInterval: 0,
Eligible: false,
IneligibilityReason: 0,
UpkeepID: UpkeepIdentifier{},
Expand All @@ -292,6 +295,7 @@ func TestCheckResult_UniqueID(t *testing.T) {
result: CheckResult{
PipelineExecutionState: 1,
Retryable: false,
RetryInterval: 0,
Eligible: false,
IneligibilityReason: 0,
UpkeepID: UpkeepIdentifier{},
Expand All @@ -309,6 +313,7 @@ func TestCheckResult_UniqueID(t *testing.T) {
result: CheckResult{
PipelineExecutionState: 2,
Retryable: true,
RetryInterval: 0,
Eligible: false,
IneligibilityReason: 0,
UpkeepID: UpkeepIdentifier{},
Expand All @@ -326,6 +331,7 @@ func TestCheckResult_UniqueID(t *testing.T) {
result: CheckResult{
PipelineExecutionState: 2,
Retryable: true,
RetryInterval: 0,
Eligible: true,
IneligibilityReason: 0,
UpkeepID: UpkeepIdentifier{},
Expand All @@ -343,6 +349,7 @@ func TestCheckResult_UniqueID(t *testing.T) {
result: CheckResult{
PipelineExecutionState: 2,
Retryable: true,
RetryInterval: 0,
Eligible: true,
IneligibilityReason: 6,
UpkeepID: UpkeepIdentifier{},
Expand All @@ -360,6 +367,7 @@ func TestCheckResult_UniqueID(t *testing.T) {
result: CheckResult{
PipelineExecutionState: 2,
Retryable: true,
RetryInterval: 0,
Eligible: true,
IneligibilityReason: 6,
UpkeepID: UpkeepIdentifier([32]byte{9, 9, 9, 9}),
Expand All @@ -377,6 +385,7 @@ func TestCheckResult_UniqueID(t *testing.T) {
result: CheckResult{
PipelineExecutionState: 2,
Retryable: true,
RetryInterval: 0,
Eligible: true,
IneligibilityReason: 6,
UpkeepID: UpkeepIdentifier([32]byte{9, 9, 9, 9}),
Expand All @@ -403,6 +412,7 @@ func TestCheckResult_UniqueID(t *testing.T) {
result: CheckResult{
PipelineExecutionState: 2,
Retryable: true,
RetryInterval: 0,
Eligible: true,
IneligibilityReason: 6,
UpkeepID: UpkeepIdentifier([32]byte{9, 9, 9, 9}),
Expand All @@ -429,6 +439,7 @@ func TestCheckResult_UniqueID(t *testing.T) {
result: CheckResult{
PipelineExecutionState: 2,
Retryable: true,
RetryInterval: 0,
Eligible: true,
IneligibilityReason: 6,
UpkeepID: UpkeepIdentifier([32]byte{9, 9, 9, 9}),
Expand All @@ -455,6 +466,7 @@ func TestCheckResult_UniqueID(t *testing.T) {
result: CheckResult{
PipelineExecutionState: 2,
Retryable: true,
RetryInterval: 0,
Eligible: true,
IneligibilityReason: 6,
UpkeepID: UpkeepIdentifier([32]byte{9, 9, 9, 9}),
Expand Down
2 changes: 1 addition & 1 deletion pkg/v3/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type UpkeepStateUpdater interface {

type RetryQueue interface {
// Enqueue adds new items to the queue
Enqueue(items ...UpkeepPayload) error
Enqueue(items ...RetryRecord) error
// Dequeue returns the next n items in the queue, considering retry time schedules
Dequeue(n int) ([]UpkeepPayload, error)
}
Expand Down

0 comments on commit 9fea8f7

Please sign in to comment.