diff --git a/core/scripts/go.mod b/core/scripts/go.mod index c8b616a4b6e..29b90836c13 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -22,7 +22,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 - github.com/smartcontractkit/ocr2keepers v0.7.27 + github.com/smartcontractkit/ocr2keepers v0.7.28 github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb github.com/spf13/cobra v1.6.1 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index c4d0575bb20..5956ac0ce4a 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1470,8 +1470,8 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0= github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 h1:qOsw2ETQD/Sb/W2xuYn2KPWjvvsWA0C+l19rWFq8iNg= github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= -github.com/smartcontractkit/ocr2keepers v0.7.27 h1:kwqMrzmEdq6gH4yqNuLQCbdlED0KaIjwZzu3FF+Gves= -github.com/smartcontractkit/ocr2keepers v0.7.27/go.mod h1:1QGzJURnoWpysguPowOe2bshV0hNp1YX10HHlhDEsas= +github.com/smartcontractkit/ocr2keepers v0.7.28 h1:dufAiYl4+uly9aH0+6GkS2jYzHGujq7tg0LYQE+x6JU= +github.com/smartcontractkit/ocr2keepers v0.7.28/go.mod h1:1QGzJURnoWpysguPowOe2bshV0hNp1YX10HHlhDEsas= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM= github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A= diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go index 1cad587e635..78cc7c994c1 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go @@ -34,11 +34,14 @@ import ( ) const ( + defaultPluginRetryExpiration = 30 * time.Minute // defaultAllowListExpiration decides how long an upkeep's allow list info will be valid for. - defaultAllowListExpiration = 20 * time.Minute - // allowListCleanupInterval decides when the expired items in allowList cache will be deleted. - allowListCleanupInterval = 5 * time.Minute + defaultAllowListExpiration = 10 * time.Minute + // cleanupInterval decides when the expired items in cache will be deleted. + cleanupInterval = 5 * time.Minute logTriggerRefreshBatchSize = 32 + totalFastPluginRetries = 5 + totalMediumPluginRetries = 10 ) var ( @@ -99,9 +102,10 @@ func NewEvmRegistry( headFunc: func(ocr2keepers.BlockKey) {}, chLog: make(chan logpoller.Log, 1000), mercury: &MercuryConfig{ - cred: mc, - abi: core.StreamsCompatibleABI, - allowListCache: cache.New(defaultAllowListExpiration, allowListCleanupInterval), + cred: mc, + abi: core.StreamsCompatibleABI, + allowListCache: cache.New(defaultAllowListExpiration, cleanupInterval), + pluginRetryCache: cache.New(defaultPluginRetryExpiration, cleanupInterval), }, hc: http.DefaultClient, logEventProvider: logEventProvider, @@ -125,6 +129,8 @@ type MercuryConfig struct { abi abi.ABI // allowListCache stores the upkeeps privileges. In 2.1, this only includes a JSON bytes for allowed to use mercury allowListCache *cache.Cache + + pluginRetryCache *cache.Cache } type EvmRegistry struct { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index 6f2594b6c38..f183e1f6bbe 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -162,10 +162,11 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep func (r *EvmRegistry) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *StreamsLookup, i int, checkResults []ocr2keepers.CheckResult, lggr logger.Logger) { defer wg.Done() - state, reason, values, retryable, err := r.doMercuryRequest(ctx, lookup, lggr) + state, reason, values, retryable, ri, err := r.doMercuryRequest(ctx, lookup, generatePluginRetryKey(checkResults[i].WorkID, lookup.block), lggr) if err != nil { - lggr.Errorf("upkeep %s retryable %v doMercuryRequest: %s", lookup.upkeepId, retryable, err.Error()) + lggr.Errorf("upkeep %s retryable %v retryInterval %s doMercuryRequest: %s", lookup.upkeepId, retryable, ri, err.Error()) checkResults[i].Retryable = retryable + checkResults[i].RetryInterval = ri checkResults[i].PipelineExecutionState = uint8(state) checkResults[i].IneligibilityReason = uint8(reason) return @@ -278,12 +279,12 @@ func (r *EvmRegistry) checkCallback(ctx context.Context, values [][]byte, lookup } // doMercuryRequest sends requests to Mercury API to retrieve mercury data. -func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, lggr logger.Logger) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, error) { +func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, prk string, lggr logger.Logger) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error) { var isMercuryV03 bool resultLen := len(sl.Feeds) ch := make(chan MercuryData, resultLen) if len(sl.Feeds) == 0 { - return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds) + return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds) } if sl.FeedParamKey == feedIdHex && sl.TimeParamKey == blockNumber { // only mercury v0.2 @@ -297,10 +298,11 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, l ch = make(chan MercuryData, resultLen) go r.multiFeedsRequest(ctx, ch, sl, lggr) } else { - return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds) + return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds) } var reqErr error + var ri time.Duration results := make([][]byte, len(sl.Feeds)) retryable := true allSuccess := true @@ -323,8 +325,11 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, l results[m.Index] = m.Bytes[0] } } + if retryable && !allSuccess { + ri = r.calculateRetryConfig(prk) + } // only retry when not all successful AND none are not retryable - return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, reqErr + return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, ri, reqErr } // singleFeedRequest sends a v0.2 Mercury request for a single feed report. @@ -378,7 +383,7 @@ func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryDa return err1 } - if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError { + if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusBadGateway || resp.StatusCode == http.StatusServiceUnavailable || resp.StatusCode == http.StatusGatewayTimeout { lggr.Warnf("at block %s upkeep %s received status code %d for feed %s", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode, sl.Feeds[index]) retryable = true state = encoding.MercuryFlakyFailure @@ -415,9 +420,9 @@ func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryDa sent = true return nil }, - // only retry when the error is 404 Not Found or 500 Internal Server Error + // only retry when the error is 404 Not Found, 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout retry.RetryIf(func(err error) bool { - return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) + return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) || err.Error() == fmt.Sprintf("%d", http.StatusBadGateway) || err.Error() == fmt.Sprintf("%d", http.StatusServiceUnavailable) || err.Error() == fmt.Sprintf("%d", http.StatusGatewayTimeout) }), retry.Context(ctx), retry.Delay(retryDelay), @@ -504,15 +509,29 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa retryable = false state = encoding.InvalidMercuryRequest return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3 with message: %s", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode, string(body)) - } else if resp.StatusCode == http.StatusInternalServerError { + } else if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusBadGateway || resp.StatusCode == http.StatusServiceUnavailable || resp.StatusCode == http.StatusGatewayTimeout { retryable = true state = encoding.MercuryFlakyFailure - return fmt.Errorf("%d", http.StatusInternalServerError) - } else if resp.StatusCode == 420 { - // in 0.3, this will happen when missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds - retryable = false - state = encoding.InvalidMercuryRequest - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode) + return fmt.Errorf("%d", resp.StatusCode) + } else if resp.StatusCode == http.StatusPartialContent { + //var response MercuryV03Response + //err1 = json.Unmarshal(body, &response) + //if err1 != nil { + // lggr.Warnf("at timestamp %s upkeep %s failed to unmarshal body to MercuryV03Response from mercury v0.3: %v", sl.Time.String(), sl.upkeepId.String(), err1) + // retryable = false + // state = encoding.MercuryUnmarshalError + // return err1 + //} + // in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract + // hence, retry in this case. retry will help when we send a very new timestamp and reports are not yet generated + //var receivedFeeds []string + //for _, f := range response.Reports { + // receivedFeeds = append(receivedFeeds, f.FeedID) + //} + lggr.Warnf("at timestamp %s upkeep %s requested [%s] feeds but mercury v0.3 server returned 206 status, treating it as 404 and retrying", sl.Time.String(), sl.upkeepId.String(), sl.Feeds) + retryable = true + state = encoding.MercuryFlakyFailure + return fmt.Errorf("%d", http.StatusPartialContent) } else if resp.StatusCode != http.StatusOK { retryable = false state = encoding.InvalidMercuryRequest @@ -532,8 +551,11 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa // in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract // hence, retry in this case. retry will help when we send a very new timestamp and reports are not yet generated if len(response.Reports) != len(sl.Feeds) { - // TODO: AUTO-5044: calculate what reports are missing and log a warning - lggr.Warnf("at timestamp %s upkeep %s mercury v0.3 server retruned 200 status with %d reports while we requested %d feeds, treating as 404 (not found) and retrying", sl.Time.String(), sl.upkeepId.String(), len(response.Reports), len(sl.Feeds)) + var receivedFeeds []string + for _, f := range response.Reports { + receivedFeeds = append(receivedFeeds, f.FeedID) + } + lggr.Warnf("at timestamp %s upkeep %s mercury v0.3 server returned 206 status with [%s] reports while we requested [%s] feeds, retrying", sl.Time.String(), sl.upkeepId.String(), receivedFeeds, sl.Feeds) retryable = true state = encoding.MercuryFlakyFailure return fmt.Errorf("%d", http.StatusNotFound) @@ -558,9 +580,9 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa sent = true return nil }, - // only retry when the error is 404 Not Found or 500 Internal Server Error + // only retry when the error is 206 Partial Content, 404 Not Found, 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout retry.RetryIf(func(err error) bool { - return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) + return err.Error() == fmt.Sprintf("%d", http.StatusPartialContent) || err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) || err.Error() == fmt.Sprintf("%d", http.StatusBadGateway) || err.Error() == fmt.Sprintf("%d", http.StatusServiceUnavailable) || err.Error() == fmt.Sprintf("%d", http.StatusGatewayTimeout) }), retry.Context(ctx), retry.Delay(retryDelay), @@ -593,3 +615,29 @@ func (r *EvmRegistry) generateHMAC(method string, path string, body []byte, clie userHmac := hex.EncodeToString(signedMessage.Sum(nil)) return userHmac } + +// calculateRetryConfig returns plugin retry interval based on how many times plugin has retried this work +func (r *EvmRegistry) calculateRetryConfig(prk string) time.Duration { + var ri time.Duration + var retries int + totalAttempts, ok := r.mercury.pluginRetryCache.Get(prk) + if ok { + retries = totalAttempts.(int) + if retries < totalFastPluginRetries { + ri = 1 * time.Second + } else if retries < totalMediumPluginRetries { + ri = 5 * time.Second + } + // if the core node has retried totalMediumPluginRetries times, do not set retry interval and plugin will use + // the default interval + } else { + ri = 1 * time.Second + } + r.mercury.pluginRetryCache.Set(prk, retries+1, cache.DefaultExpiration) + return ri +} + +// generatePluginRetryKey returns a plugin retry cache key +func generatePluginRetryKey(workID string, block uint64) string { + return workID + "|" + fmt.Sprintf("%d", block) +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go index 6f7065ef875..8d7c67d80ce 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go @@ -10,6 +10,7 @@ import ( "net/http" "strings" "testing" + "time" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -65,8 +66,9 @@ func setupEVMRegistry(t *testing.T) *EvmRegistry { Username: "FakeClientID", Password: "FakeClientKey", }, - abi: streamsLookupCompatibleABI, - allowListCache: cache.New(defaultAllowListExpiration, allowListCleanupInterval), + abi: streamsLookupCompatibleABI, + allowListCache: cache.New(defaultAllowListExpiration, cleanupInterval), + pluginRetryCache: cache.New(defaultPluginRetryExpiration, cleanupInterval), }, hc: mockHttpClient, } @@ -427,15 +429,18 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) { upkeepId, _ := new(big.Int).SetString("88786950015966611018675766524283132478093844178961698330929478019253453382042", 10) tests := []struct { - name string - lookup *StreamsLookup - mockHttpStatusCode int - mockChainlinkBlobs []string - expectedValues [][]byte - expectedRetryable bool - expectedError error - state encoding.PipelineExecutionState - reason encoding.UpkeepFailureReason + name string + lookup *StreamsLookup + mockHttpStatusCode int + mockChainlinkBlobs []string + pluginRetries int + pluginRetryKey string + expectedValues [][]byte + expectedRetryable bool + expectedRetryInterval time.Duration + expectedError error + state encoding.PipelineExecutionState + reason encoding.UpkeepFailureReason }{ { name: "success", @@ -456,7 +461,7 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) { expectedError: nil, }, { - name: "failure - retryable", + name: "failure - retryable and interval is 1s", lookup: &StreamsLookup{ StreamsLookupError: &encoding.StreamsLookupError{ FeedParamKey: feedIdHex, @@ -467,6 +472,49 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) { }, upkeepId: upkeepId, }, + mockHttpStatusCode: http.StatusInternalServerError, + mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"}, + expectedValues: [][]byte{nil}, + expectedRetryable: true, + pluginRetries: 0, + expectedRetryInterval: 1 * time.Second, + expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), + state: encoding.MercuryFlakyFailure, + }, + { + name: "failure - retryable and interval is 5s", + lookup: &StreamsLookup{ + StreamsLookupError: &encoding.StreamsLookupError{ + FeedParamKey: feedIdHex, + Feeds: []string{"0x4554482d5553442d415242495452554d2d544553544e45540000000000000000"}, + TimeParamKey: blockNumber, + Time: big.NewInt(25880526), + ExtraData: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100}, + }, + upkeepId: upkeepId, + }, + pluginRetries: 5, + mockHttpStatusCode: http.StatusInternalServerError, + mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"}, + expectedValues: [][]byte{nil}, + expectedRetryable: true, + expectedRetryInterval: 5 * time.Second, + expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), + state: encoding.MercuryFlakyFailure, + }, + { + name: "failure - not retryable because there are many plugin retries already", + lookup: &StreamsLookup{ + StreamsLookupError: &encoding.StreamsLookupError{ + FeedParamKey: feedIdHex, + Feeds: []string{"0x4554482d5553442d415242495452554d2d544553544e45540000000000000000"}, + TimeParamKey: blockNumber, + Time: big.NewInt(25880526), + ExtraData: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100}, + }, + upkeepId: upkeepId, + }, + pluginRetries: 10, mockHttpStatusCode: http.StatusInternalServerError, mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"}, expectedValues: [][]byte{nil}, @@ -486,11 +534,11 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) { }, upkeepId: upkeepId, }, - mockHttpStatusCode: http.StatusBadGateway, + mockHttpStatusCode: http.StatusTooManyRequests, mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"}, expectedValues: [][]byte{nil}, expectedRetryable: false, - expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: at block 25880526 upkeep 88786950015966611018675766524283132478093844178961698330929478019253453382042 received status code 502 for feed 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000"), + expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: at block 25880526 upkeep 88786950015966611018675766524283132478093844178961698330929478019253453382042 received status code 429 for feed 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000"), state: encoding.InvalidMercuryRequest, }, { @@ -528,6 +576,9 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + if tt.pluginRetries != 0 { + r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration) + } hc := mocks.NewHttpClient(t) for _, blob := range tt.mockChainlinkBlobs { @@ -539,7 +590,7 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) { StatusCode: tt.mockHttpStatusCode, Body: io.NopCloser(bytes.NewReader(b)), } - if tt.expectedError != nil && tt.expectedRetryable { + if tt.expectedError != nil && tt.expectedRetryable || tt.pluginRetries > 0 { hc.On("Do", mock.Anything).Return(resp, nil).Times(totalAttempt) } else { hc.On("Do", mock.Anything).Return(resp, nil).Once() @@ -547,13 +598,18 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) { } r.hc = hc - state, reason, values, retryable, reqErr := r.doMercuryRequest(context.Background(), tt.lookup, r.lggr) + state, reason, values, retryable, ri, reqErr := r.doMercuryRequest(context.Background(), tt.lookup, tt.pluginRetryKey, r.lggr) assert.Equal(t, tt.expectedValues, values) assert.Equal(t, tt.expectedRetryable, retryable) + if retryable { + newRetries, _ := r.mercury.pluginRetryCache.Get(tt.pluginRetryKey) + assert.Equal(t, tt.pluginRetries+1, newRetries.(int)) + } + assert.Equal(t, tt.expectedRetryInterval, ri) assert.Equal(t, tt.state, state) assert.Equal(t, tt.reason, reason) if tt.expectedError != nil { - assert.Equal(t, tt.expectedError.Error(), reqErr.Error()) + assert.True(t, strings.HasPrefix(reqErr.Error(), "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000")) } }) } @@ -563,15 +619,17 @@ func TestEvmRegistry_DoMercuryRequestV03(t *testing.T) { upkeepId, _ := new(big.Int).SetString("88786950015966611018675766524283132478093844178961698330929478019253453382042", 10) tests := []struct { - name string - lookup *StreamsLookup - mockHttpStatusCode int - mockChainlinkBlobs []string - expectedValues [][]byte - expectedRetryable bool - expectedError error - state encoding.PipelineExecutionState - reason encoding.UpkeepFailureReason + name string + lookup *StreamsLookup + mockHttpStatusCode int + mockChainlinkBlobs []string + pluginRetryKey string + expectedValues [][]byte + expectedRetryable bool + expectedRetryInterval time.Duration + expectedError error + state encoding.PipelineExecutionState + reason encoding.UpkeepFailureReason }{ { name: "success v0.3", @@ -622,9 +680,10 @@ func TestEvmRegistry_DoMercuryRequestV03(t *testing.T) { } r.hc = hc - state, reason, values, retryable, reqErr := r.doMercuryRequest(context.Background(), tt.lookup, r.lggr) + state, reason, values, retryable, ri, reqErr := r.doMercuryRequest(context.Background(), tt.lookup, tt.pluginRetryKey, r.lggr) assert.Equal(t, tt.expectedValues, values) assert.Equal(t, tt.expectedRetryable, retryable) + assert.Equal(t, tt.expectedRetryInterval, ri) assert.Equal(t, tt.state, state) assert.Equal(t, tt.reason, reason) if tt.expectedError != nil { @@ -640,6 +699,7 @@ func TestEvmRegistry_SingleFeedRequest(t *testing.T) { name string index int lookup *StreamsLookup + pluginRetryKey string blob string statusCode int lastStatusCode int @@ -728,8 +788,8 @@ func TestEvmRegistry_SingleFeedRequest(t *testing.T) { blob: "0xab2123dc", retryNumber: 1, statusCode: http.StatusNotFound, - lastStatusCode: http.StatusBadGateway, - errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 404\n#2: at block 123456 upkeep 123456789 received status code 502 for feed 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000", + lastStatusCode: http.StatusTooManyRequests, + errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 404\n#2: at block 123456 upkeep 123456789 received status code 429 for feed 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000", }, { name: "failure - returns not retryable", @@ -744,8 +804,8 @@ func TestEvmRegistry_SingleFeedRequest(t *testing.T) { upkeepId: upkeepId, }, blob: "0xab2123dc", - statusCode: http.StatusBadGateway, - errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: at block 123456 upkeep 123456789 received status code 502 for feed 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000", + statusCode: http.StatusConflict, + errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: at block 123456 upkeep 123456789 received status code 409 for feed 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000", }, } @@ -819,6 +879,8 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) { lookup *StreamsLookup statusCode int lastStatusCode int + pluginRetries int + pluginRetryKey string retryNumber int retryable bool errorMessage string @@ -883,6 +945,47 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) { }, statusCode: http.StatusOK, }, + { + name: "success - retry 206", + lookup: &StreamsLookup{ + StreamsLookupError: &encoding.StreamsLookupError{ + FeedParamKey: feedIDs, + Feeds: []string{"0x4554482d5553442d415242495452554d2d544553544e45540000000000000000", "0x4254432d5553442d415242495452554d2d544553544e45540000000000000000"}, + TimeParamKey: timestamp, + Time: big.NewInt(123456), + }, + upkeepId: upkeepId, + }, + firstResponse: &MercuryV03Response{ + Reports: []MercuryV03Report{ + { + FeedID: "0x4554482d5553442d415242495452554d2d544553544e45540000000000000000", + ValidFromTimestamp: 123456, + ObservationsTimestamp: 123456, + FullReport: "0xab2123dc00000012", + }, + }, + }, + response: &MercuryV03Response{ + Reports: []MercuryV03Report{ + { + FeedID: "0x4554482d5553442d415242495452554d2d544553544e45540000000000000000", + ValidFromTimestamp: 123456, + ObservationsTimestamp: 123456, + FullReport: "0xab2123dc00000012", + }, + { + FeedID: "0x4254432d5553442d415242495452554d2d544553544e45540000000000000000", + ValidFromTimestamp: 123458, + ObservationsTimestamp: 123458, + FullReport: "0xab2123dc00000019", + }, + }, + }, + retryNumber: 1, + statusCode: http.StatusPartialContent, + lastStatusCode: http.StatusOK, + }, { name: "success - retry for 500", lookup: &StreamsLookup{ @@ -946,7 +1049,7 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) { errorMessage: "All attempts fail:\n#1: hex string without 0x prefix", }, { - name: "failure - returns retryable", + name: "failure - returns retryable with 1s plugin retry interval", lookup: &StreamsLookup{ StreamsLookupError: &encoding.StreamsLookupError{ FeedParamKey: feedIDs, @@ -962,7 +1065,7 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) { errorMessage: "All attempts fail:\n#1: 500\n#2: 500\n#3: 500", }, { - name: "failure - returns retryable and then non-retryable", + name: "failure - returns retryable with 5s plugin retry interval", lookup: &StreamsLookup{ StreamsLookupError: &encoding.StreamsLookupError{ FeedParamKey: feedIDs, @@ -972,27 +1075,30 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) { }, upkeepId: upkeepId, }, - retryNumber: 1, - statusCode: http.StatusInternalServerError, - lastStatusCode: http.StatusUnauthorized, - errorMessage: "All attempts fail:\n#1: 500\n#2: at timestamp 123456 upkeep 123456789 received status code 401 from mercury v0.3, most likely this is caused by unauthorized upkeep", + pluginRetries: 6, + retryNumber: totalAttempt, + statusCode: http.StatusInternalServerError, + retryable: true, + errorMessage: "All attempts fail:\n#1: 500\n#2: 500\n#3: 500", }, { - name: "failure - returns status code 420 not retryable", + name: "failure - returns retryable and then non-retryable", lookup: &StreamsLookup{ StreamsLookupError: &encoding.StreamsLookupError{ FeedParamKey: feedIDs, - Feeds: []string{"0x4554482d5553442d415242495452554d2d544553544e45540000000000000000"}, + Feeds: []string{"0x4554482d5553442d415242495452554d2d544553544e45540000000000000000", "0x4254432d5553442d415242495452554d2d544553544e45540000000000000000"}, TimeParamKey: timestamp, Time: big.NewInt(123456), }, upkeepId: upkeepId, }, - statusCode: 420, - errorMessage: "All attempts fail:\n#1: at timestamp 123456 upkeep 123456789 received status code 420 from mercury v0.3, most likely this is caused by missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds", + retryNumber: 1, + statusCode: http.StatusInternalServerError, + lastStatusCode: http.StatusUnauthorized, + errorMessage: "All attempts fail:\n#1: 500\n#2: at timestamp 123456 upkeep 123456789 received status code 401 from mercury v0.3, most likely this is caused by unauthorized upkeep", }, { - name: "failure - returns status code 502 not retryable", + name: "failure - returns status code 422 not retryable", lookup: &StreamsLookup{ StreamsLookupError: &encoding.StreamsLookupError{ FeedParamKey: feedIDs, @@ -1002,8 +1108,8 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) { }, upkeepId: upkeepId, }, - statusCode: http.StatusBadGateway, - errorMessage: "All attempts fail:\n#1: at timestamp 123456 upkeep 123456789 received status code 502 from mercury v0.3", + statusCode: http.StatusUnprocessableEntity, + errorMessage: "All attempts fail:\n#1: at timestamp 123456 upkeep 123456789 received status code 422 from mercury v0.3", }, { name: "success - retry when reports length does not match feeds length", @@ -1042,14 +1148,19 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) { }, }, }, - retryNumber: 1, - statusCode: http.StatusOK, + retryNumber: 1, + statusCode: http.StatusOK, + lastStatusCode: http.StatusOK, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + if tt.pluginRetries != 0 { + r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration) + } + hc := mocks.NewHttpClient(t) b, err := json.Marshal(tt.response) assert.Nil(t, err) @@ -1071,7 +1182,7 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) { b1, err := json.Marshal(tt.response) assert.Nil(t, err) resp1 := &http.Response{ - StatusCode: tt.statusCode, + StatusCode: tt.lastStatusCode, Body: io.NopCloser(bytes.NewReader(b1)), } hc.On("Do", mock.Anything).Return(resp0, nil).Once().On("Do", mock.Anything).Return(resp1, nil).Once() diff --git a/go.mod b/go.mod index df970160acc..46664b8c838 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,7 @@ require ( github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 - github.com/smartcontractkit/ocr2keepers v0.7.27 + github.com/smartcontractkit/ocr2keepers v0.7.28 github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 diff --git a/go.sum b/go.sum index 59286787c2e..997bc25fc38 100644 --- a/go.sum +++ b/go.sum @@ -1471,8 +1471,8 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0= github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 h1:qOsw2ETQD/Sb/W2xuYn2KPWjvvsWA0C+l19rWFq8iNg= github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= -github.com/smartcontractkit/ocr2keepers v0.7.27 h1:kwqMrzmEdq6gH4yqNuLQCbdlED0KaIjwZzu3FF+Gves= -github.com/smartcontractkit/ocr2keepers v0.7.27/go.mod h1:1QGzJURnoWpysguPowOe2bshV0hNp1YX10HHlhDEsas= +github.com/smartcontractkit/ocr2keepers v0.7.28 h1:dufAiYl4+uly9aH0+6GkS2jYzHGujq7tg0LYQE+x6JU= +github.com/smartcontractkit/ocr2keepers v0.7.28/go.mod h1:1QGzJURnoWpysguPowOe2bshV0hNp1YX10HHlhDEsas= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM= github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 7dd2d017785..97e9b7e79db 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -24,7 +24,7 @@ require ( github.com/smartcontractkit/chainlink-testing-framework v1.17.12-0.20231018101901-23824db88d36 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 - github.com/smartcontractkit/ocr2keepers v0.7.27 + github.com/smartcontractkit/ocr2keepers v0.7.28 github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20230906073235-9e478e5e19f1 github.com/smartcontractkit/wasp v0.3.0 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 3be74077277..7f2c7cc09c7 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -2378,8 +2378,8 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0= github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 h1:qOsw2ETQD/Sb/W2xuYn2KPWjvvsWA0C+l19rWFq8iNg= github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= -github.com/smartcontractkit/ocr2keepers v0.7.27 h1:kwqMrzmEdq6gH4yqNuLQCbdlED0KaIjwZzu3FF+Gves= -github.com/smartcontractkit/ocr2keepers v0.7.27/go.mod h1:1QGzJURnoWpysguPowOe2bshV0hNp1YX10HHlhDEsas= +github.com/smartcontractkit/ocr2keepers v0.7.28 h1:dufAiYl4+uly9aH0+6GkS2jYzHGujq7tg0LYQE+x6JU= +github.com/smartcontractkit/ocr2keepers v0.7.28/go.mod h1:1QGzJURnoWpysguPowOe2bshV0hNp1YX10HHlhDEsas= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM= github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A=