From 72c60b65f95a3b2bd9f5044a30595d10dc78f581 Mon Sep 17 00:00:00 2001 From: Sam Date: Thu, 16 May 2024 20:24:34 -0400 Subject: [PATCH 1/3] Fix panic on mercury server error (#13231) * Fix changelog --- .changeset/eighty-hotels-sit.md | 5 + CHANGELOG.md | 6 +- core/services/relay/evm/mercury/queue.go | 43 ++-- .../services/relay/evm/mercury/transmitter.go | 39 ++-- .../relay/evm/mercury/transmitter_test.go | 189 ++++++++++++++++-- 5 files changed, 234 insertions(+), 48 deletions(-) create mode 100644 .changeset/eighty-hotels-sit.md diff --git a/.changeset/eighty-hotels-sit.md b/.changeset/eighty-hotels-sit.md new file mode 100644 index 00000000000..e83b70c7695 --- /dev/null +++ b/.changeset/eighty-hotels-sit.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Fix panic if mercury server returns error #bugfix diff --git a/CHANGELOG.md b/CHANGELOG.md index b41da1f30ef..b16c0542d53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -191,7 +191,7 @@ You may disable if this results in excessive log volume. Disable like so: ``` - [Pipeline] + [JobPipeline] VerboseLogging = false ``` @@ -221,7 +221,7 @@ - [#12404](https://github.com/smartcontractkit/chainlink/pull/12404) [`b74079b672`](https://github.com/smartcontractkit/chainlink/commit/b74079b672f36fb0c241f90ea1e875ea3a9524da) Thanks [@HenryNguyen5](https://github.com/HenryNguyen5)! - Add OCR3 capability contract wrapper -- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option Pipeline.VerboseLogging +- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option JobPipeline.VerboseLogging VerboseLogging enables detailed logging of pipeline execution steps. This is disabled by default because it increases log volume for pipeline runs, but can @@ -232,7 +232,7 @@ Set it like the following example: ``` - [Pipeline] + [JobPipeline] VerboseLogging = true ``` diff --git a/core/services/relay/evm/mercury/queue.go b/core/services/relay/evm/mercury/queue.go index 8a89f47302b..30a6e5e6eac 100644 --- a/core/services/relay/evm/mercury/queue.go +++ b/core/services/relay/evm/mercury/queue.go @@ -25,7 +25,7 @@ type asyncDeleter interface { AsyncDelete(req *pb.TransmitRequest) } -var _ services.Service = (*TransmitQueue)(nil) +var _ services.Service = (*transmitQueue)(nil) var transmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "mercury_transmit_queue_load", @@ -40,7 +40,7 @@ const promInterval = 6500 * time.Millisecond // TransmitQueue is the high-level package that everything outside of this file should be using // It stores pending transmissions, yielding the latest (highest priority) first to the caller -type TransmitQueue struct { +type transmitQueue struct { services.StateMachine cond sync.Cond @@ -62,11 +62,20 @@ type Transmission struct { ReportCtx ocrtypes.ReportContext // contains priority information (latest epoch/round wins) } +type TransmitQueue interface { + services.Service + + BlockingPop() (t *Transmission) + Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) + Init(transmissions []*Transmission) + IsEmpty() bool +} + // maxlen controls how many items will be stored in the queue // 0 means unlimited - be careful, this can cause memory leaks -func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) *TransmitQueue { +func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) TransmitQueue { mu := new(sync.RWMutex) - return &TransmitQueue{ + return &transmitQueue{ services.StateMachine{}, sync.Cond{L: mu}, lggr.Named("TransmitQueue"), @@ -80,13 +89,13 @@ func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, } } -func (tq *TransmitQueue) Init(transmissions []*Transmission) { +func (tq *transmitQueue) Init(transmissions []*Transmission) { pq := priorityQueue(transmissions) heap.Init(&pq) // ensure the heap is ordered tq.pq = &pq } -func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) { +func (tq *transmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) { tq.cond.L.Lock() defer tq.cond.L.Unlock() @@ -111,7 +120,7 @@ func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.Report // BlockingPop will block until at least one item is in the heap, and then return it // If the queue is closed, it will immediately return nil -func (tq *TransmitQueue) BlockingPop() (t *Transmission) { +func (tq *transmitQueue) BlockingPop() (t *Transmission) { tq.cond.L.Lock() defer tq.cond.L.Unlock() if tq.closed { @@ -126,13 +135,13 @@ func (tq *TransmitQueue) BlockingPop() (t *Transmission) { return t } -func (tq *TransmitQueue) IsEmpty() bool { +func (tq *transmitQueue) IsEmpty() bool { tq.mu.RLock() defer tq.mu.RUnlock() return tq.pq.Len() == 0 } -func (tq *TransmitQueue) Start(context.Context) error { +func (tq *transmitQueue) Start(context.Context) error { return tq.StartOnce("TransmitQueue", func() error { t := time.NewTicker(utils.WithJitter(promInterval)) wg := new(sync.WaitGroup) @@ -148,7 +157,7 @@ func (tq *TransmitQueue) Start(context.Context) error { }) } -func (tq *TransmitQueue) Close() error { +func (tq *transmitQueue) Close() error { return tq.StopOnce("TransmitQueue", func() error { tq.cond.L.Lock() tq.closed = true @@ -159,7 +168,7 @@ func (tq *TransmitQueue) Close() error { }) } -func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) { +func (tq *transmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() for { @@ -172,25 +181,25 @@ func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, } } -func (tq *TransmitQueue) report() { +func (tq *transmitQueue) report() { tq.mu.RLock() length := tq.pq.Len() tq.mu.RUnlock() tq.transmitQueueLoad.Set(float64(length)) } -func (tq *TransmitQueue) Ready() error { +func (tq *transmitQueue) Ready() error { return nil } -func (tq *TransmitQueue) Name() string { return tq.lggr.Name() } -func (tq *TransmitQueue) HealthReport() map[string]error { +func (tq *transmitQueue) Name() string { return tq.lggr.Name() } +func (tq *transmitQueue) HealthReport() map[string]error { report := map[string]error{tq.Name(): errors.Join( tq.status(), )} return report } -func (tq *TransmitQueue) status() (merr error) { +func (tq *transmitQueue) status() (merr error) { tq.mu.RLock() length := tq.pq.Len() closed := tq.closed @@ -206,7 +215,7 @@ func (tq *TransmitQueue) status() (merr error) { // pop latest Transmission from the heap // Not thread-safe -func (tq *TransmitQueue) pop() *Transmission { +func (tq *transmitQueue) pop() *Transmission { if tq.pq.Len() == 0 { return nil } diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 82a76450e5f..d8ef981ff0c 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -147,10 +147,12 @@ type server struct { c wsrpc.Client pm *PersistenceManager - q *TransmitQueue + q TransmitQueue deleteQueue chan *pb.TransmitRequest + url string + transmitSuccessCount prometheus.Counter transmitDuplicateCount prometheus.Counter transmitConnectionErrorCount prometheus.Counter @@ -262,7 +264,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed s.transmitDuplicateCount.Inc() s.lggr.Debugw("Transmit report success; duplicate report", "payload", hexutil.Encode(t.Req.Payload), "response", res, "reportCtx", t.ReportCtx) default: - transmitServerErrorCount.WithLabelValues(feedIDHex, fmt.Sprintf("%d", res.Code)).Inc() + transmitServerErrorCount.WithLabelValues(feedIDHex, s.url, fmt.Sprintf("%d", res.Code)).Inc() s.lggr.Errorw("Transmit report failed; mercury server returned error", "response", res, "reportCtx", t.ReportCtx, "err", res.Error, "code", res.Code) } } @@ -275,26 +277,31 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed } } +func newServer(lggr logger.Logger, cfg TransmitterConfig, client wsrpc.Client, pm *PersistenceManager, serverURL, feedIDHex string) *server { + return &server{ + lggr, + cfg.TransmitTimeout().Duration(), + client, + pm, + NewTransmitQueue(lggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm), + make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())), + serverURL, + transmitSuccessCount.WithLabelValues(feedIDHex, serverURL), + transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL), + transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL), + transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex, serverURL), + transmitQueueInsertErrorCount.WithLabelValues(feedIDHex, serverURL), + transmitQueuePushErrorCount.WithLabelValues(feedIDHex, serverURL), + } +} + func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter { feedIDHex := fmt.Sprintf("0x%x", feedID[:]) servers := make(map[string]*server, len(clients)) for serverURL, client := range clients { cLggr := lggr.Named(serverURL).With("serverURL", serverURL) pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency) - servers[serverURL] = &server{ - cLggr, - cfg.TransmitTimeout().Duration(), - client, - pm, - NewTransmitQueue(cLggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm), - make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())), - transmitSuccessCount.WithLabelValues(feedIDHex, serverURL), - transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL), - transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL), - transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex, serverURL), - transmitQueueInsertErrorCount.WithLabelValues(feedIDHex, serverURL), - transmitQueuePushErrorCount.WithLabelValues(feedIDHex, serverURL), - } + servers[serverURL] = newServer(cLggr, cfg, client, pm, serverURL, feedIDHex) } return &mercuryTransmitter{ services.StateMachine{}, diff --git a/core/services/relay/evm/mercury/transmitter_test.go b/core/services/relay/evm/mercury/transmitter_test.go index b0da9bea635..8224adc0d7c 100644 --- a/core/services/relay/evm/mercury/transmitter_test.go +++ b/core/services/relay/evm/mercury/transmitter_test.go @@ -3,6 +3,7 @@ package mercury import ( "context" "math/big" + "sync" "testing" "time" @@ -14,6 +15,7 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -55,8 +57,8 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) t.Run("v2 report transmission successfully enqueued", func(t *testing.T) { report := sampleV2Report @@ -69,8 +71,8 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) t.Run("v3 report transmission successfully enqueued", func(t *testing.T) { report := sampleV3Report @@ -83,8 +85,8 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) }) @@ -105,12 +107,12 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) - require.Equal(t, mt.servers[sURL2].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL2].q.pq.Pop().(*Transmission).Req.Payload, report) - require.Equal(t, mt.servers[sURL3].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL3].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL2].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL2].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL3].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL3].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) } @@ -395,3 +397,166 @@ func Test_sortReportsLatestFirst(t *testing.T) { assert.Nil(t, reports[6]) assert.Nil(t, reports[7]) } + +type mockQ struct { + ch chan *Transmission +} + +func newMockQ() *mockQ { + return &mockQ{make(chan *Transmission, 100)} +} + +func (m *mockQ) Start(context.Context) error { return nil } +func (m *mockQ) Close() error { + m.ch <- nil + return nil +} +func (m *mockQ) Ready() error { return nil } +func (m *mockQ) HealthReport() map[string]error { return nil } +func (m *mockQ) Name() string { return "" } +func (m *mockQ) BlockingPop() (t *Transmission) { + val := <-m.ch + return val +} +func (m *mockQ) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) { + m.ch <- &Transmission{Req: req, ReportCtx: reportCtx} + return true +} +func (m *mockQ) Init(transmissions []*Transmission) {} +func (m *mockQ) IsEmpty() bool { return false } + +func Test_MercuryTransmitter_runQueueLoop(t *testing.T) { + feedIDHex := utils.NewHash().Hex() + lggr := logger.TestLogger(t) + c := &mocks.MockWSRPCClient{} + db := pgtest.NewSqlxDB(t) + orm := NewORM(db) + pm := NewPersistenceManager(lggr, sURL, orm, 0, 0, 0, 0) + cfg := mockCfg{} + + s := newServer(lggr, cfg, c, pm, sURL, feedIDHex) + + req := &pb.TransmitRequest{ + Payload: []byte{1, 2, 3}, + ReportFormat: 32, + } + + t.Run("pulls from queue and transmits successfully", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{Code: 0, Error: ""}, nil + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + + go s.runQueueLoop(nil, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected a transmit request to be sent") + } + + q.Close() + wg.Wait() + }) + + t.Run("on duplicate, success", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{Code: DuplicateReport, Error: ""}, nil + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + + go s.runQueueLoop(nil, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected a transmit request to be sent") + } + + q.Close() + wg.Wait() + }) + t.Run("on server-side error, does not retry", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{Code: DuplicateReport, Error: ""}, nil + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + + go s.runQueueLoop(nil, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected a transmit request to be sent") + } + + q.Close() + wg.Wait() + }) + t.Run("on transmit error, retries", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{}, errors.New("transmission error") + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + stopCh := make(chan struct{}, 1) + + go s.runQueueLoop(stopCh, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + cnt := 0 + Loop: + for { + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + if cnt > 2 { + break Loop + } + cnt++ + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected 3 transmit requests to be sent") + } + } + + close(stopCh) + wg.Wait() + }) +} From cefb5bd975c9294e7ee8e0d912c1b1a437735459 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Mon, 13 May 2024 14:04:48 -0400 Subject: [PATCH 2/3] Reduce number of inserts for multi-server --- .changeset/twelve-wolves-clean.md | 5 ++ core/services/relay/evm/mercury/orm.go | 36 ++++++-- core/services/relay/evm/mercury/orm_test.go | 87 ++++++++++++++----- .../relay/evm/mercury/persistence_manager.go | 2 +- .../services/relay/evm/mercury/transmitter.go | 11 ++- 5 files changed, 109 insertions(+), 32 deletions(-) create mode 100644 .changeset/twelve-wolves-clean.md diff --git a/.changeset/twelve-wolves-clean.md b/.changeset/twelve-wolves-clean.md new file mode 100644 index 00000000000..c38fbe3fd83 --- /dev/null +++ b/.changeset/twelve-wolves-clean.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Performance improvements for mercury single insert for multiple mercury servers #internal diff --git a/core/services/relay/evm/mercury/orm.go b/core/services/relay/evm/mercury/orm.go index 6426ef54a5d..65df9ab4cc6 100644 --- a/core/services/relay/evm/mercury/orm.go +++ b/core/services/relay/evm/mercury/orm.go @@ -5,6 +5,8 @@ import ( "crypto/sha256" "database/sql" "errors" + "fmt" + "strings" "sync" "github.com/ethereum/go-ethereum/common" @@ -19,7 +21,7 @@ import ( ) type ORM interface { - InsertTransmitRequest(ctx context.Context, serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error + InsertTransmitRequest(ctx context.Context, serverURLs []string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error DeleteTransmitRequests(ctx context.Context, serverURL string, reqs []*pb.TransmitRequest) error GetTransmitRequests(ctx context.Context, serverURL string, jobID int32) ([]*Transmission, error) PruneTransmitRequests(ctx context.Context, serverURL string, jobID int32, maxSize int) error @@ -42,11 +44,14 @@ func NewORM(ds sqlutil.DataSource) ORM { } // InsertTransmitRequest inserts one transmit request if the payload does not exist already. -func (o *orm) InsertTransmitRequest(ctx context.Context, serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error { +func (o *orm) InsertTransmitRequest(ctx context.Context, serverURLs []string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error { feedID, err := FeedIDFromReport(req.Payload) if err != nil { return err } + if len(serverURLs) == 0 { + return errors.New("no server URLs provided") + } var wg sync.WaitGroup wg.Add(2) @@ -54,11 +59,30 @@ func (o *orm) InsertTransmitRequest(ctx context.Context, serverURL string, req * go func() { defer wg.Done() - _, err1 = o.ds.ExecContext(ctx, ` - INSERT INTO mercury_transmit_requests (server_url, payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + + values := make([]string, len(serverURLs)) + args := []interface{}{ + req.Payload, + hashPayload(req.Payload), + reportCtx.ConfigDigest[:], + reportCtx.Epoch, + reportCtx.Round, + reportCtx.ExtraHash[:], + jobID, + feedID[:], + } + for i, serverURL := range serverURLs { + // server url is the only thing that changes, might as well re-use + // the same parameters for each insert + values[i] = fmt.Sprintf("($1, $2, $3, $4, $5, $6, $7, $8, $%d)", i+9) + args = append(args, serverURL) + } + + _, err1 = o.ds.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO mercury_transmit_requests (payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id, server_url) + VALUES %s ON CONFLICT (server_url, payload_hash) DO NOTHING - `, serverURL, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID, feedID[:]) + `, strings.Join(values, ",")), args...) }() go func() { diff --git a/core/services/relay/evm/mercury/orm_test.go b/core/services/relay/evm/mercury/orm_test.go index 2b2e15ffd53..bffc35522f3 100644 --- a/core/services/relay/evm/mercury/orm_test.go +++ b/core/services/relay/evm/mercury/orm_test.go @@ -48,15 +48,15 @@ func TestORM(t *testing.T) { // Test insert and get requests. // s1 - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0]) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[1]}, jobID, reportContexts[1]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[1]}, jobID, reportContexts[1]) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2]) require.NoError(t, err) // s2 - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[0]) + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[0]) require.NoError(t, err) transmissions, err := orm.GetTransmitRequests(ctx, sURL, jobID) @@ -119,7 +119,7 @@ func TestORM(t *testing.T) { require.Empty(t, transmissions) // More inserts. - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) transmissions, err = orm.GetTransmitRequests(ctx, sURL, jobID) @@ -129,9 +129,9 @@ func TestORM(t *testing.T) { }) // Duplicate requests are ignored. - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) transmissions, err = orm.GetTransmitRequests(ctx, sURL, jobID) @@ -151,6 +151,51 @@ func TestORM(t *testing.T) { } +func TestORM_InsertTransmitRequest_MultipleServerURLs(t *testing.T) { + ctx := testutils.Context(t) + db := pgtest.NewSqlxDB(t) + + jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter + pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) + pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) + orm := NewORM(db) + feedID := sampleFeedID + + reports := sampleReports + reportContexts := make([]ocrtypes.ReportContext, 4) + for i := range reportContexts { + reportContexts[i] = ocrtypes.ReportContext{ + ReportTimestamp: ocrtypes.ReportTimestamp{ + ConfigDigest: ocrtypes.ConfigDigest{'1'}, + Epoch: 10, + Round: uint8(i), + }, + ExtraHash: [32]byte{'2'}, + } + } + err := orm.InsertTransmitRequest(ctx, []string{sURL, sURL2, sURL3}, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0]) + require.NoError(t, err) + + transmissions, err := orm.GetTransmitRequests(ctx, sURL, jobID) + require.NoError(t, err) + require.Len(t, transmissions, 1) + assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]}) + + transmissions, err = orm.GetTransmitRequests(ctx, sURL2, jobID) + require.NoError(t, err) + require.Len(t, transmissions, 1) + assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]}) + + transmissions, err = orm.GetTransmitRequests(ctx, sURL3, jobID) + require.NoError(t, err) + require.Len(t, transmissions, 1) + assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]}) + + l, err := orm.LatestReport(testutils.Context(t), feedID) + require.NoError(t, err) + assert.Equal(t, reports[0], l) +} + func TestORM_PruneTransmitRequests(t *testing.T) { ctx := testutils.Context(t) db := pgtest.NewSqlxDB(t) @@ -174,18 +219,18 @@ func TestORM_PruneTransmitRequests(t *testing.T) { } // s1 - err := orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1)) + err := orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1)) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2)) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2)) require.NoError(t, err) // s2 - should not be touched - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 0)) + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 0)) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1)) + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1)) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2)) + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2)) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 3)) + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 3)) require.NoError(t, err) // Max size greater than number of records, expect no-op @@ -221,9 +266,9 @@ func TestORM_PruneTransmitRequests(t *testing.T) { {Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)}, }, transmissions) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1)) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1)) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2)) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2)) require.NoError(t, err) // Max size is table size - 1, expect the oldest row to be pruned. @@ -267,13 +312,13 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { } } - err := orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext( + err := orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext( 0, 0, )) require.NoError(t, err) // this should be ignored, because report context is the same - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext( + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext( 0, 0, )) require.NoError(t, err) @@ -283,7 +328,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[0], l) t.Run("replaces if epoch and round are larger", func(t *testing.T) { - err = orm.InsertTransmitRequest(ctx, "foo", &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 1)) + err = orm.InsertTransmitRequest(ctx, []string{"foo"}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 1)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) @@ -291,7 +336,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[1], l) }) t.Run("replaces if epoch is the same but round is greater", func(t *testing.T) { - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 2)) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 2)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) @@ -299,7 +344,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[2], l) }) t.Run("replaces if epoch is larger but round is smaller", func(t *testing.T) { - err = orm.InsertTransmitRequest(ctx, "bar", &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 1)) + err = orm.InsertTransmitRequest(ctx, []string{"bar"}, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 1)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) @@ -307,7 +352,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[3], l) }) t.Run("does not overwrite if epoch/round is the same", func(t *testing.T) { - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(2, 1)) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(2, 1)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) diff --git a/core/services/relay/evm/mercury/persistence_manager.go b/core/services/relay/evm/mercury/persistence_manager.go index d49d0d4ed01..38576174423 100644 --- a/core/services/relay/evm/mercury/persistence_manager.go +++ b/core/services/relay/evm/mercury/persistence_manager.go @@ -69,7 +69,7 @@ func (pm *PersistenceManager) Close() error { } func (pm *PersistenceManager) Insert(ctx context.Context, req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) error { - return pm.orm.InsertTransmitRequest(ctx, pm.serverURL, req, pm.jobID, reportCtx) + return pm.orm.InsertTransmitRequest(ctx, []string{pm.serverURL}, req, pm.jobID, reportCtx) } func (pm *PersistenceManager) Delete(ctx context.Context, req *pb.TransmitRequest) error { diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index d8ef981ff0c..7abd926255a 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -18,6 +18,7 @@ import ( pkgerrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" @@ -109,6 +110,7 @@ type mercuryTransmitter struct { lggr logger.Logger cfg TransmitterConfig + orm ORM servers map[string]*server codec TransmitterReportDecoder @@ -307,6 +309,7 @@ func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[strin services.StateMachine{}, lggr.Named("MercuryTransmitter").With("feedID", feedIDHex), cfg, + orm, servers, codec, feedID, @@ -407,14 +410,14 @@ func (mt *mercuryTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes.R mt.lggr.Tracew("Transmit enqueue", "req.Payload", req.Payload, "report", report, "reportCtx", reportCtx, "signatures", signatures) + if err := mt.orm.InsertTransmitRequest(ctx, maps.Keys(mt.servers), req, mt.jobID, reportCtx); err != nil { + return err + } + g := new(errgroup.Group) for _, s := range mt.servers { s := s // https://golang.org/doc/faq#closures_and_goroutines g.Go(func() error { - if err := s.pm.Insert(ctx, req, reportCtx); err != nil { - s.transmitQueueInsertErrorCount.Inc() - return err - } if ok := s.q.Push(req, reportCtx); !ok { s.transmitQueuePushErrorCount.Inc() return errors.New("transmit queue is closed") From 46189067027d26f496b61b40bcd7f82fdcfe910b Mon Sep 17 00:00:00 2001 From: Sam Date: Thu, 16 May 2024 20:24:34 -0400 Subject: [PATCH 3/3] Fix panic on mercury server error (#13231) * Fix panic on mercury server error * Changelog * Remove newline * Fix changelog * changelog