Skip to content

Commit

Permalink
Merge pull request #13435 from smartcontractkit/fix_mercury_db_multic…
Browse files Browse the repository at this point in the history
…ast_performance_issue

Fix mercury db multicast performance issue
  • Loading branch information
snehaagni authored Jun 6, 2024
2 parents 963c513 + 1d66b26 commit ee339ab
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 32 deletions.
5 changes: 5 additions & 0 deletions .changeset/eighty-hotels-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix panic if mercury server returns error #bugfix
5 changes: 5 additions & 0 deletions .changeset/twelve-wolves-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Performance improvements for mercury single insert for multiple mercury servers #internal
36 changes: 30 additions & 6 deletions core/services/relay/evm/mercury/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"crypto/sha256"
"database/sql"
"errors"
"fmt"
"strings"
"sync"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -19,7 +21,7 @@ import (
)

type ORM interface {
InsertTransmitRequest(ctx context.Context, serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error
InsertTransmitRequest(ctx context.Context, serverURLs []string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error
DeleteTransmitRequests(ctx context.Context, serverURL string, reqs []*pb.TransmitRequest) error
GetTransmitRequests(ctx context.Context, serverURL string, jobID int32) ([]*Transmission, error)
PruneTransmitRequests(ctx context.Context, serverURL string, jobID int32, maxSize int) error
Expand All @@ -42,23 +44,45 @@ func NewORM(ds sqlutil.DataSource) ORM {
}

// InsertTransmitRequest inserts one transmit request if the payload does not exist already.
func (o *orm) InsertTransmitRequest(ctx context.Context, serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error {
func (o *orm) InsertTransmitRequest(ctx context.Context, serverURLs []string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error {
feedID, err := FeedIDFromReport(req.Payload)
if err != nil {
return err
}
if len(serverURLs) == 0 {
return errors.New("no server URLs provided")
}

var wg sync.WaitGroup
wg.Add(2)
var err1, err2 error

go func() {
defer wg.Done()
_, err1 = o.ds.ExecContext(ctx, `
INSERT INTO mercury_transmit_requests (server_url, payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)

values := make([]string, len(serverURLs))
args := []interface{}{
req.Payload,
hashPayload(req.Payload),
reportCtx.ConfigDigest[:],
reportCtx.Epoch,
reportCtx.Round,
reportCtx.ExtraHash[:],
jobID,
feedID[:],
}
for i, serverURL := range serverURLs {
// server url is the only thing that changes, might as well re-use
// the same parameters for each insert
values[i] = fmt.Sprintf("($1, $2, $3, $4, $5, $6, $7, $8, $%d)", i+9)
args = append(args, serverURL)
}

_, err1 = o.ds.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO mercury_transmit_requests (payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id, server_url)
VALUES %s
ON CONFLICT (server_url, payload_hash) DO NOTHING
`, serverURL, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID, feedID[:])
`, strings.Join(values, ",")), args...)
}()

go func() {
Expand Down
87 changes: 66 additions & 21 deletions core/services/relay/evm/mercury/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func TestORM(t *testing.T) {

// Test insert and get requests.
// s1
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0])
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[1]}, jobID, reportContexts[1])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[1]}, jobID, reportContexts[1])
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2])
require.NoError(t, err)

// s2
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[0])
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[0])
require.NoError(t, err)

transmissions, err := orm.GetTransmitRequests(ctx, sURL, jobID)
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestORM(t *testing.T) {
require.Empty(t, transmissions)

// More inserts.
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
require.NoError(t, err)

transmissions, err = orm.GetTransmitRequests(ctx, sURL, jobID)
Expand All @@ -129,9 +129,9 @@ func TestORM(t *testing.T) {
})

// Duplicate requests are ignored.
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
require.NoError(t, err)

transmissions, err = orm.GetTransmitRequests(ctx, sURL, jobID)
Expand All @@ -151,6 +151,51 @@ func TestORM(t *testing.T) {

}

func TestORM_InsertTransmitRequest_MultipleServerURLs(t *testing.T) {
ctx := testutils.Context(t)
db := pgtest.NewSqlxDB(t)

jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)
orm := NewORM(db)
feedID := sampleFeedID

reports := sampleReports
reportContexts := make([]ocrtypes.ReportContext, 4)
for i := range reportContexts {
reportContexts[i] = ocrtypes.ReportContext{
ReportTimestamp: ocrtypes.ReportTimestamp{
ConfigDigest: ocrtypes.ConfigDigest{'1'},
Epoch: 10,
Round: uint8(i),
},
ExtraHash: [32]byte{'2'},
}
}
err := orm.InsertTransmitRequest(ctx, []string{sURL, sURL2, sURL3}, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0])
require.NoError(t, err)

transmissions, err := orm.GetTransmitRequests(ctx, sURL, jobID)
require.NoError(t, err)
require.Len(t, transmissions, 1)
assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]})

transmissions, err = orm.GetTransmitRequests(ctx, sURL2, jobID)
require.NoError(t, err)
require.Len(t, transmissions, 1)
assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]})

transmissions, err = orm.GetTransmitRequests(ctx, sURL3, jobID)
require.NoError(t, err)
require.Len(t, transmissions, 1)
assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]})

l, err := orm.LatestReport(testutils.Context(t), feedID)
require.NoError(t, err)
assert.Equal(t, reports[0], l)
}

func TestORM_PruneTransmitRequests(t *testing.T) {
ctx := testutils.Context(t)
db := pgtest.NewSqlxDB(t)
Expand All @@ -174,18 +219,18 @@ func TestORM_PruneTransmitRequests(t *testing.T) {
}

// s1
err := orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1))
err := orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1))
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2))
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2))
require.NoError(t, err)
// s2 - should not be touched
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 0))
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 0))
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1))
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1))
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2))
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2))
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 3))
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 3))
require.NoError(t, err)

// Max size greater than number of records, expect no-op
Expand Down Expand Up @@ -221,9 +266,9 @@ func TestORM_PruneTransmitRequests(t *testing.T) {
{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)},
}, transmissions)

err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1))
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1))
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2))
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2))
require.NoError(t, err)

// Max size is table size - 1, expect the oldest row to be pruned.
Expand Down Expand Up @@ -267,13 +312,13 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) {
}
}

err := orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(
err := orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(
0, 0,
))
require.NoError(t, err)

// this should be ignored, because report context is the same
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(
0, 0,
))
require.NoError(t, err)
Expand All @@ -283,31 +328,31 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) {
assert.Equal(t, reports[0], l)

t.Run("replaces if epoch and round are larger", func(t *testing.T) {
err = orm.InsertTransmitRequest(ctx, "foo", &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 1))
err = orm.InsertTransmitRequest(ctx, []string{"foo"}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 1))
require.NoError(t, err)

l, err = orm.LatestReport(testutils.Context(t), feedID)
require.NoError(t, err)
assert.Equal(t, reports[1], l)
})
t.Run("replaces if epoch is the same but round is greater", func(t *testing.T) {
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 2))
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 2))
require.NoError(t, err)

l, err = orm.LatestReport(testutils.Context(t), feedID)
require.NoError(t, err)
assert.Equal(t, reports[2], l)
})
t.Run("replaces if epoch is larger but round is smaller", func(t *testing.T) {
err = orm.InsertTransmitRequest(ctx, "bar", &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 1))
err = orm.InsertTransmitRequest(ctx, []string{"bar"}, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 1))
require.NoError(t, err)

l, err = orm.LatestReport(testutils.Context(t), feedID)
require.NoError(t, err)
assert.Equal(t, reports[3], l)
})
t.Run("does not overwrite if epoch/round is the same", func(t *testing.T) {
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(2, 1))
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(2, 1))
require.NoError(t, err)

l, err = orm.LatestReport(testutils.Context(t), feedID)
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/mercury/persistence_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (pm *PersistenceManager) Close() error {
}

func (pm *PersistenceManager) Insert(ctx context.Context, req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) error {
return pm.orm.InsertTransmitRequest(ctx, pm.serverURL, req, pm.jobID, reportCtx)
return pm.orm.InsertTransmitRequest(ctx, []string{pm.serverURL}, req, pm.jobID, reportCtx)
}

func (pm *PersistenceManager) Delete(ctx context.Context, req *pb.TransmitRequest) error {
Expand Down
11 changes: 7 additions & 4 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
pkgerrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil"
Expand Down Expand Up @@ -109,6 +110,7 @@ type mercuryTransmitter struct {
lggr logger.Logger
cfg TransmitterConfig

orm ORM
servers map[string]*server

codec TransmitterReportDecoder
Expand Down Expand Up @@ -307,6 +309,7 @@ func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[strin
services.StateMachine{},
lggr.Named("MercuryTransmitter").With("feedID", feedIDHex),
cfg,
orm,
servers,
codec,
feedID,
Expand Down Expand Up @@ -407,14 +410,14 @@ func (mt *mercuryTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes.R

mt.lggr.Tracew("Transmit enqueue", "req.Payload", req.Payload, "report", report, "reportCtx", reportCtx, "signatures", signatures)

if err := mt.orm.InsertTransmitRequest(ctx, maps.Keys(mt.servers), req, mt.jobID, reportCtx); err != nil {
return err
}

g := new(errgroup.Group)
for _, s := range mt.servers {
s := s // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
if err := s.pm.Insert(ctx, req, reportCtx); err != nil {
s.transmitQueueInsertErrorCount.Inc()
return err
}
if ok := s.q.Push(req, reportCtx); !ok {
s.transmitQueuePushErrorCount.Inc()
return errors.New("transmit queue is closed")
Expand Down

0 comments on commit ee339ab

Please sign in to comment.