From 3d9de4b63c0a6f64d5ec3f64fc7b6d24056dad52 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 7 Mar 2024 10:04:38 -0500 Subject: [PATCH 1/4] Multi-mercury server --- .../ocr2/plugins/mercury/config/config.go | 85 +++- .../plugins/mercury/config/config_test.go | 90 ++++- .../ocr2/plugins/mercury/helpers_test.go | 28 +- .../ocr2/plugins/mercury/integration_test.go | 49 ++- core/services/relay/evm/evm.go | 25 +- core/services/relay/evm/mercury/orm.go | 44 +- core/services/relay/evm/mercury/orm_test.go | 113 ++++-- .../relay/evm/mercury/persistence_manager.go | 18 +- .../evm/mercury/persistence_manager_test.go | 2 +- core/services/relay/evm/mercury/queue.go | 16 +- core/services/relay/evm/mercury/queue_test.go | 6 +- .../services/relay/evm/mercury/transmitter.go | 379 +++++++++++------- .../relay/evm/mercury/transmitter_test.go | 228 +++++++---- .../relay/evm/mercury/wsrpc/cache/cache.go | 2 +- .../relay/evm/mercury/wsrpc/client.go | 4 +- .../relay/evm/mercury/wsrpc/mocks/mocks.go | 18 +- core/services/relay/evm/mercury/wsrpc/pool.go | 8 +- ...27_add_server_url_to_transmit_requests.sql | 9 + 18 files changed, 746 insertions(+), 378 deletions(-) create mode 100644 core/store/migrate/migrations/0227_add_server_url_to_transmit_requests.sql diff --git a/core/services/ocr2/plugins/mercury/config/config.go b/core/services/ocr2/plugins/mercury/config/config.go index fc5d0a6a20d..5763b883ac0 100644 --- a/core/services/ocr2/plugins/mercury/config/config.go +++ b/core/services/ocr2/plugins/mercury/config/config.go @@ -8,6 +8,7 @@ import ( "fmt" "net/url" "regexp" + "sort" pkgerrors "github.com/pkg/errors" @@ -17,8 +18,18 @@ import ( ) type PluginConfig struct { + // Must either specify details for single server OR multiple servers. + // Specifying both is not valid. + + // Single mercury server + // LEGACY: This is the old way of specifying a mercury server RawServerURL string `json:"serverURL" toml:"serverURL"` ServerPubKey utils.PlainHexBytes `json:"serverPubKey" toml:"serverPubKey"` + + // Multi mercury servers + // This is the preferred way to specify mercury server(s) + Servers map[string]utils.PlainHexBytes `json:"servers" toml:"servers"` + // InitialBlockNumber allows to set a custom "validFromBlockNumber" for // the first ever report in the case of a brand new feed, where the mercury // server does not have any previous reports. For a brand new feed, this @@ -29,26 +40,64 @@ type PluginConfig struct { NativeFeedID *mercuryutils.FeedID `json:"nativeFeedID" toml:"nativeFeedID"` } -func ValidatePluginConfig(config PluginConfig, feedID mercuryutils.FeedID) (merr error) { - if config.RawServerURL == "" { - merr = errors.New("mercury: ServerURL must be specified") +func validateURL(rawServerURL string) error { + var normalizedURI string + if schemeRegexp.MatchString(rawServerURL) { + normalizedURI = rawServerURL } else { - var normalizedURI string - if schemeRegexp.MatchString(config.RawServerURL) { - normalizedURI = config.RawServerURL + normalizedURI = fmt.Sprintf("wss://%s", rawServerURL) + } + uri, err := url.ParseRequestURI(normalizedURI) + if err != nil { + return pkgerrors.Errorf(`Mercury: invalid value for ServerURL, got: %q`, rawServerURL) + } + if uri.Scheme != "wss" { + return pkgerrors.Errorf(`Mercury: invalid scheme specified for MercuryServer, got: %q (scheme: %q) but expected a websocket url e.g. "192.0.2.2:4242" or "wss://192.0.2.2:4242"`, rawServerURL, uri.Scheme) + } + return nil +} + +type Server struct { + URL string + PubKey utils.PlainHexBytes +} + +func (p PluginConfig) GetServers() (servers []Server) { + if p.RawServerURL != "" { + return []Server{{URL: wssRegexp.ReplaceAllString(p.RawServerURL, ""), PubKey: p.ServerPubKey}} + } + for url, pubKey := range p.Servers { + servers = append(servers, Server{URL: wssRegexp.ReplaceAllString(url, ""), PubKey: pubKey}) + } + sort.Slice(servers, func(i, j int) bool { + return servers[i].URL < servers[j].URL + }) + return +} + +func ValidatePluginConfig(config PluginConfig, feedID mercuryutils.FeedID) (merr error) { + if len(config.Servers) > 0 { + if config.RawServerURL != "" || len(config.ServerPubKey) != 0 { + merr = errors.Join(merr, errors.New("Mercury: Servers and RawServerURL/ServerPubKey may not be specified together")) } else { - normalizedURI = fmt.Sprintf("wss://%s", config.RawServerURL) + for serverName, serverPubKey := range config.Servers { + if err := validateURL(serverName); err != nil { + merr = errors.Join(merr, pkgerrors.Wrap(err, "Mercury: invalid value for ServerURL")) + } + if len(serverPubKey) != 32 { + merr = errors.Join(merr, errors.New("Mercury: ServerPubKey must be a 32-byte hex string")) + } + } } - uri, err := url.ParseRequestURI(normalizedURI) - if err != nil { - merr = pkgerrors.Wrap(err, "Mercury: invalid value for ServerURL") - } else if uri.Scheme != "wss" { - merr = pkgerrors.Errorf(`Mercury: invalid scheme specified for MercuryServer, got: %q (scheme: %q) but expected a websocket url e.g. "192.0.2.2:4242" or "wss://192.0.2.2:4242"`, config.RawServerURL, uri.Scheme) + } else if config.RawServerURL == "" { + merr = errors.Join(merr, errors.New("Mercury: Servers must be specified")) + } else { + if err := validateURL(config.RawServerURL); err != nil { + merr = errors.Join(merr, pkgerrors.Wrap(err, "Mercury: invalid value for ServerURL")) + } + if len(config.ServerPubKey) != 32 { + merr = errors.Join(merr, errors.New("Mercury: If RawServerURL is specified, ServerPubKey is also required and must be a 32-byte hex string")) } - } - - if len(config.ServerPubKey) != 32 { - merr = errors.Join(merr, errors.New("mercury: ServerPubKey is required and must be a 32-byte hex string")) } switch feedID.Version() { @@ -78,7 +127,3 @@ func ValidatePluginConfig(config PluginConfig, feedID mercuryutils.FeedID) (merr var schemeRegexp = regexp.MustCompile(`^[a-zA-Z][a-zA-Z0-9+.-]*://`) var wssRegexp = regexp.MustCompile(`^wss://`) - -func (p PluginConfig) ServerURL() string { - return wssRegexp.ReplaceAllString(p.RawServerURL, "") -} diff --git a/core/services/ocr2/plugins/mercury/config/config_test.go b/core/services/ocr2/plugins/mercury/config/config_test.go index 60cc548f1fa..cc7c6a82e36 100644 --- a/core/services/ocr2/plugins/mercury/config/config_test.go +++ b/core/services/ocr2/plugins/mercury/config/config_test.go @@ -6,6 +6,8 @@ import ( "github.com/pelletier/go-toml/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/utils" ) var v1FeedId = [32]uint8{00, 01, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114} @@ -31,6 +33,46 @@ func Test_PluginConfig(t *testing.T) { err = ValidatePluginConfig(mc, v1FeedId) require.NoError(t, err) }) + t.Run("with multiple server URLs", func(t *testing.T) { + t.Run("if no ServerURL/ServerPubKey is specified", func(t *testing.T) { + rawToml := ` + Servers = { "example.com:80" = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", "example2.invalid:1234" = "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" } + ` + + var mc PluginConfig + err := toml.Unmarshal([]byte(rawToml), &mc) + require.NoError(t, err) + + assert.Len(t, mc.Servers, 2) + assert.Equal(t, "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", mc.Servers["example.com:80"].String()) + assert.Equal(t, "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", mc.Servers["example2.invalid:1234"].String()) + + err = ValidatePluginConfig(mc, v1FeedId) + require.NoError(t, err) + }) + t.Run("if ServerURL or ServerPubKey is specified", func(t *testing.T) { + rawToml := ` + Servers = { "example.com:80" = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", "example2.invalid:1234" = "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" } + ServerURL = "example.com:80" + ` + var mc PluginConfig + err := toml.Unmarshal([]byte(rawToml), &mc) + require.NoError(t, err) + + err = ValidatePluginConfig(mc, v1FeedId) + require.EqualError(t, err, "Mercury: Servers and RawServerURL/ServerPubKey may not be specified together") + + rawToml = ` + Servers = { "example.com:80" = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", "example2.invalid:1234" = "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" } + ServerPubKey = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" + ` + err = toml.Unmarshal([]byte(rawToml), &mc) + require.NoError(t, err) + + err = ValidatePluginConfig(mc, v1FeedId) + require.EqualError(t, err, "Mercury: Servers and RawServerURL/ServerPubKey may not be specified together") + }) + }) t.Run("with invalid values", func(t *testing.T) { rawToml := ` @@ -53,7 +95,7 @@ func Test_PluginConfig(t *testing.T) { err = ValidatePluginConfig(mc, v1FeedId) require.Error(t, err) assert.Contains(t, err.Error(), `Mercury: invalid scheme specified for MercuryServer, got: "http://example.com" (scheme: "http") but expected a websocket url e.g. "192.0.2.2:4242" or "wss://192.0.2.2:4242"`) - assert.Contains(t, err.Error(), `mercury: ServerPubKey is required and must be a 32-byte hex string`) + assert.Contains(t, err.Error(), `If RawServerURL is specified, ServerPubKey is also required and must be a 32-byte hex string`) }) t.Run("with unnecessary values", func(t *testing.T) { @@ -135,13 +177,41 @@ func Test_PluginConfig(t *testing.T) { }) } -func Test_PluginConfig_ServerURL(t *testing.T) { - pc := PluginConfig{RawServerURL: "example.com"} - assert.Equal(t, "example.com", pc.ServerURL()) - pc = PluginConfig{RawServerURL: "wss://example.com"} - assert.Equal(t, "example.com", pc.ServerURL()) - pc = PluginConfig{RawServerURL: "example.com:1234/foo"} - assert.Equal(t, "example.com:1234/foo", pc.ServerURL()) - pc = PluginConfig{RawServerURL: "wss://example.com:1234/foo"} - assert.Equal(t, "example.com:1234/foo", pc.ServerURL()) +func Test_PluginConfig_GetServers(t *testing.T) { + t.Run("with single server", func(t *testing.T) { + pubKey := utils.PlainHexBytes([]byte{1, 2, 3}) + pc := PluginConfig{RawServerURL: "example.com", ServerPubKey: pubKey} + require.Len(t, pc.GetServers(), 1) + assert.Equal(t, "example.com", pc.GetServers()[0].URL) + assert.Equal(t, pubKey, pc.GetServers()[0].PubKey) + + pc = PluginConfig{RawServerURL: "wss://example.com", ServerPubKey: pubKey} + require.Len(t, pc.GetServers(), 1) + assert.Equal(t, "example.com", pc.GetServers()[0].URL) + assert.Equal(t, pubKey, pc.GetServers()[0].PubKey) + + pc = PluginConfig{RawServerURL: "example.com:1234/foo", ServerPubKey: pubKey} + require.Len(t, pc.GetServers(), 1) + assert.Equal(t, "example.com:1234/foo", pc.GetServers()[0].URL) + assert.Equal(t, pubKey, pc.GetServers()[0].PubKey) + + pc = PluginConfig{RawServerURL: "wss://example.com:1234/foo", ServerPubKey: pubKey} + require.Len(t, pc.GetServers(), 1) + assert.Equal(t, "example.com:1234/foo", pc.GetServers()[0].URL) + assert.Equal(t, pubKey, pc.GetServers()[0].PubKey) + }) + + t.Run("with multiple servers", func(t *testing.T) { + servers := map[string]utils.PlainHexBytes{ + "example.com:80": utils.PlainHexBytes([]byte{1, 2, 3}), + "mercuryserver.invalid:1234/foo": utils.PlainHexBytes([]byte{4, 5, 6}), + } + pc := PluginConfig{Servers: servers} + + require.Len(t, pc.GetServers(), 2) + assert.Equal(t, "example.com:80", pc.GetServers()[0].URL) + assert.Equal(t, utils.PlainHexBytes{1, 2, 3}, pc.GetServers()[0].PubKey) + assert.Equal(t, "mercuryserver.invalid:1234/foo", pc.GetServers()[1].URL) + assert.Equal(t, utils.PlainHexBytes{4, 5, 6}, pc.GetServers()[1].PubKey) + }) } diff --git a/core/services/ocr2/plugins/mercury/helpers_test.go b/core/services/ocr2/plugins/mercury/helpers_test.go index 473db53bc6f..c7273cd374e 100644 --- a/core/services/ocr2/plugins/mercury/helpers_test.go +++ b/core/services/ocr2/plugins/mercury/helpers_test.go @@ -8,6 +8,7 @@ import ( "fmt" "math/big" "net" + "strings" "testing" "time" @@ -389,23 +390,28 @@ func addV3MercuryJob( bootstrapNodePort int, bmBridge, bidBridge, - askBridge, - serverURL string, - serverPubKey, + askBridge string, + servers map[string]string, clientPubKey ed25519.PublicKey, feedName string, feedID [32]byte, linkFeedID [32]byte, nativeFeedID [32]byte, ) { + srvs := make([]string, 0, len(servers)) + for u, k := range servers { + srvs = append(srvs, fmt.Sprintf("%q = %q", u, k)) + } + serversStr := fmt.Sprintf("{ %s }", strings.Join(srvs, ", ")) + node.AddJob(t, fmt.Sprintf(` type = "offchainreporting2" schemaVersion = 1 -name = "mercury-%[1]d-%[12]s" +name = "mercury-%[1]d-%[11]s" forwardingAllowed = false maxTaskDuration = "1s" contractID = "%[2]s" -feedID = "0x%[11]x" +feedID = "0x%[10]x" contractConfigTrackerPollInterval = "1s" ocrKeyBundleID = "%[3]s" p2pv2Bootstrappers = [ @@ -413,7 +419,7 @@ p2pv2Bootstrappers = [ ] relay = "evm" pluginType = "mercury" -transmitterID = "%[10]x" +transmitterID = "%[9]x" observationSource = """ // Benchmark Price price1 [type=bridge name="%[5]s" timeout="50ms" requestData="{\\"data\\":{\\"from\\":\\"ETH\\",\\"to\\":\\"USD\\"}}"]; @@ -438,10 +444,9 @@ observationSource = """ """ [pluginConfig] -serverURL = "%[8]s" -serverPubKey = "%[9]x" -linkFeedID = "0x%[13]x" -nativeFeedID = "0x%[14]x" +servers = %[8]s +linkFeedID = "0x%[12]x" +nativeFeedID = "0x%[13]x" [relayConfig] chainID = 1337 @@ -453,8 +458,7 @@ chainID = 1337 bmBridge, bidBridge, askBridge, - serverURL, - serverPubKey, + serversStr, clientPubKey, feedID, feedName, diff --git a/core/services/ocr2/plugins/mercury/integration_test.go b/core/services/ocr2/plugins/mercury/integration_test.go index a12052e0b74..e4ac5dd7c5c 100644 --- a/core/services/ocr2/plugins/mercury/integration_test.go +++ b/core/services/ocr2/plugins/mercury/integration_test.go @@ -788,16 +788,6 @@ func integration_MercuryV3(t *testing.T) { feedM[feeds[i].id] = feeds[i] } - reqs := make(chan request) - serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(-1)) - serverPubKey := serverKey.PublicKey - srv := NewMercuryServer(t, ed25519.PrivateKey(serverKey.Raw()), reqs, func() []byte { - report, err := (&reportcodecv3.ReportCodec{}).BuildReport(v3.ReportFields{BenchmarkPrice: big.NewInt(234567), Bid: big.NewInt(1), Ask: big.NewInt(1), LinkFee: big.NewInt(1), NativeFee: big.NewInt(1)}) - if err != nil { - panic(err) - } - return report - }) clientCSAKeys := make([]csakey.KeyV2, n+1) clientPubKeys := make([]ed25519.PublicKey, n+1) for i := 0; i < n+1; i++ { @@ -806,7 +796,25 @@ func integration_MercuryV3(t *testing.T) { clientCSAKeys[i] = key clientPubKeys[i] = key.PublicKey } - serverURL := startMercuryServer(t, srv, clientPubKeys) + + // Test multi-send to three servers + const nSrvs = 3 + reqChs := make([]chan request, nSrvs) + servers := make(map[string]string) + for i := 0; i < nSrvs; i++ { + k := csakey.MustNewV2XXXTestingOnly(big.NewInt(int64(-(i + 1)))) + reqs := make(chan request, 100) + srv := NewMercuryServer(t, ed25519.PrivateKey(k.Raw()), reqs, func() []byte { + report, err := (&reportcodecv3.ReportCodec{}).BuildReport(v3.ReportFields{BenchmarkPrice: big.NewInt(234567), Bid: big.NewInt(1), Ask: big.NewInt(1), LinkFee: big.NewInt(1), NativeFee: big.NewInt(1)}) + if err != nil { + panic(err) + } + return report + }) + serverURL := startMercuryServer(t, srv, clientPubKeys) + reqChs[i] = reqs + servers[serverURL] = fmt.Sprintf("%x", k.PublicKey) + } chainID := testutils.SimulatedChainID steve, backend, verifier, verifierAddress := setupBlockchain(t) @@ -895,8 +903,7 @@ func integration_MercuryV3(t *testing.T) { bmBridge, bidBridge, askBridge, - serverURL, - serverPubKey, + servers, clientPubKeys[i], feed.name, feed.id, @@ -963,8 +970,8 @@ func integration_MercuryV3(t *testing.T) { backend.Commit() } - runTestSetup := func() { - // Expect at least one report per feed from each oracle + runTestSetup := func(reqs chan request) { + // Expect at least one report per feed from each oracle, per server seen := make(map[[32]byte]map[credentials.StaticSizedPublicKey]struct{}) for i := range feeds { // feedID will be deleted when all n oracles have reported @@ -1017,12 +1024,10 @@ func integration_MercuryV3(t *testing.T) { } } - t.Run("receives at least one report per feed from each oracle when EAs are at 100% reliability", func(t *testing.T) { - runTestSetup() - }) - - t.Run("receives at least one report per feed from each oracle when EAs are at 80% reliability", func(t *testing.T) { - pError.Store(20) - runTestSetup() + t.Run("receives at least one report per feed for every server from each oracle when EAs are at 100% reliability", func(t *testing.T) { + for i := 0; i < nSrvs; i++ { + reqs := reqChs[i] + runTestSetup(reqs) + } }) } diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 3797e6633a6..c27b931970a 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -79,9 +79,12 @@ type Relayer struct { chainReader commontypes.ChainReader codec commontypes.Codec + // Mercury + mercuryORM mercury.ORM + // LLO/data streams cdcFactory llo.ChannelDefinitionCacheFactory - orm llo.ORM + lloORM llo.ORM } type CSAETHKeystore interface { @@ -121,8 +124,9 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R } lggr = lggr.Named("Relayer") - orm := llo.NewORM(pg.NewQ(opts.DB, lggr, opts.QConfig), chain.ID()) - cdcFactory := llo.NewChannelDefinitionCacheFactory(lggr, orm, chain.LogPoller()) + mercuryORM := mercury.NewORM(opts.DB, lggr, opts.QConfig) + lloORM := llo.NewORM(pg.NewQ(opts.DB, lggr, opts.QConfig), chain.ID()) + cdcFactory := llo.NewChannelDefinitionCacheFactory(lggr, lloORM, chain.LogPoller()) return &Relayer{ db: opts.DB, chain: chain, @@ -131,7 +135,8 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R mercuryPool: opts.MercuryPool, pgCfg: opts.QConfig, cdcFactory: cdcFactory, - orm: orm, + lloORM: lloORM, + mercuryORM: mercuryORM, }, nil } @@ -219,9 +224,13 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty return nil, pkgerrors.Wrap(err, "failed to get CSA key for mercury connection") } - client, err := r.mercuryPool.Checkout(context.Background(), privKey, mercuryConfig.ServerPubKey, mercuryConfig.ServerURL()) - if err != nil { - return nil, err + clients := make(map[string]wsrpc.Client) + for _, server := range mercuryConfig.GetServers() { + client, err := r.mercuryPool.Checkout(context.Background(), privKey, server.PubKey, server.URL) + if err != nil { + return nil, err + } + clients[server.URL] = client } // FIXME: We actually know the version here since it's in the feed ID, can @@ -242,7 +251,7 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty default: return nil, fmt.Errorf("invalid feed version %d", feedID.Version()) } - transmitter := mercury.NewTransmitter(lggr, client, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.db, r.pgCfg, transmitterCodec) + transmitter := mercury.NewTransmitter(lggr, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec) return NewMercuryProvider(cp, r.chainReader, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil } diff --git a/core/services/relay/evm/mercury/orm.go b/core/services/relay/evm/mercury/orm.go index f8d4c8cb1ee..1a8c7a93bc9 100644 --- a/core/services/relay/evm/mercury/orm.go +++ b/core/services/relay/evm/mercury/orm.go @@ -21,10 +21,10 @@ import ( ) type ORM interface { - InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error - DeleteTransmitRequests(reqs []*pb.TransmitRequest, qopts ...pg.QOpt) error - GetTransmitRequests(jobID int32, qopts ...pg.QOpt) ([]*Transmission, error) - PruneTransmitRequests(jobID int32, maxSize int, qopts ...pg.QOpt) error + InsertTransmitRequest(serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error + DeleteTransmitRequests(serverURL string, reqs []*pb.TransmitRequest, qopts ...pg.QOpt) error + GetTransmitRequests(serverURL string, jobID int32, qopts ...pg.QOpt) ([]*Transmission, error) + PruneTransmitRequests(serverURL string, jobID int32, maxSize int, qopts ...pg.QOpt) error LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error) } @@ -48,7 +48,7 @@ func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) ORM { } // InsertTransmitRequest inserts one transmit request if the payload does not exist already. -func (o *orm) InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error { +func (o *orm) InsertTransmitRequest(serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error { feedID, err := FeedIDFromReport(req.Payload) if err != nil { return err @@ -62,10 +62,10 @@ func (o *orm) InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, report go func() { defer wg.Done() err1 = q.ExecQ(` - INSERT INTO mercury_transmit_requests (payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (payload_hash) DO NOTHING - `, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID, feedID[:]) + INSERT INTO mercury_transmit_requests (server_url, payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (server_url, payload_hash) DO NOTHING + `, serverURL, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID, feedID[:]) }() go func() { @@ -83,7 +83,7 @@ func (o *orm) InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, report } // DeleteTransmitRequest deletes the given transmit requests if they exist. -func (o *orm) DeleteTransmitRequests(reqs []*pb.TransmitRequest, qopts ...pg.QOpt) error { +func (o *orm) DeleteTransmitRequests(serverURL string, reqs []*pb.TransmitRequest, qopts ...pg.QOpt) error { if len(reqs) == 0 { return nil } @@ -96,22 +96,22 @@ func (o *orm) DeleteTransmitRequests(reqs []*pb.TransmitRequest, qopts ...pg.QOp q := o.q.WithOpts(qopts...) err := q.ExecQ(` DELETE FROM mercury_transmit_requests - WHERE payload_hash = ANY($1) - `, hashes) + WHERE server_url = $1 AND payload_hash = ANY($2) + `, serverURL, hashes) return err } // GetTransmitRequests returns all transmit requests in chronologically descending order. -func (o *orm) GetTransmitRequests(jobID int32, qopts ...pg.QOpt) ([]*Transmission, error) { +func (o *orm) GetTransmitRequests(serverURL string, jobID int32, qopts ...pg.QOpt) ([]*Transmission, error) { q := o.q.WithOpts(qopts...) // The priority queue uses epoch and round to sort transmissions so order by // the same fields here for optimal insertion into the pq. rows, err := q.QueryContext(q.ParentCtx, ` SELECT payload, config_digest, epoch, round, extra_hash FROM mercury_transmit_requests - WHERE job_id = $1 + WHERE job_id = $1 AND server_url = $2 ORDER BY epoch DESC, round DESC - `, jobID) + `, jobID, serverURL) if err != nil { return nil, err } @@ -146,20 +146,20 @@ func (o *orm) GetTransmitRequests(jobID int32, qopts ...pg.QOpt) ([]*Transmissio // PruneTransmitRequests keeps at most maxSize rows for the given job ID, // deleting the oldest transactions. -func (o *orm) PruneTransmitRequests(jobID int32, maxSize int, qopts ...pg.QOpt) error { +func (o *orm) PruneTransmitRequests(serverURL string, jobID int32, maxSize int, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) // Prune the oldest requests by epoch and round. return q.ExecQ(` DELETE FROM mercury_transmit_requests - WHERE job_id = $1 AND + WHERE job_id = $1 AND server_url = $2 AND payload_hash NOT IN ( SELECT payload_hash FROM mercury_transmit_requests - WHERE job_id = $1 + WHERE job_id = $1 AND server_url = $2 ORDER BY epoch DESC, round DESC - LIMIT $2 + LIMIT $3 ) - `, jobID, maxSize) + `, jobID, serverURL, maxSize) } func (o *orm) LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error) { @@ -172,6 +172,6 @@ func (o *orm) LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOp } func hashPayload(payload []byte) []byte { - checksum := sha256.Sum256(payload) - return checksum[:] + h := sha256.New() + return h.Sum(payload) } diff --git a/core/services/relay/evm/mercury/orm_test.go b/core/services/relay/evm/mercury/orm_test.go index 56dea70417b..14be878eeef 100644 --- a/core/services/relay/evm/mercury/orm_test.go +++ b/core/services/relay/evm/mercury/orm_test.go @@ -14,6 +14,12 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" ) +var ( + sURL = "wss://example.com/mercury" + sURL2 = "wss://mercuryserver.test" + sURL3 = "wss://mercuryserver.example/foo" +) + func TestORM(t *testing.T) { db := pgtest.NewSqlxDB(t) @@ -42,20 +48,30 @@ func TestORM(t *testing.T) { assert.Nil(t, l) // Test insert and get requests. - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0]) + // s1 + err = orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0]) require.NoError(t, err) - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[1]}, jobID, reportContexts[1]) + err = orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[1]}, jobID, reportContexts[1]) require.NoError(t, err) - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2]) + err = orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2]) require.NoError(t, err) - transmissions, err := orm.GetTransmitRequests(jobID) + // s2 + err = orm.InsertTransmitRequest(sURL2, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[0]) + require.NoError(t, err) + + transmissions, err := orm.GetTransmitRequests(sURL, jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]}, {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: reportContexts[1]}, {Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]}, }) + transmissions, err = orm.GetTransmitRequests(sURL2, jobID) + require.NoError(t, err) + require.Equal(t, transmissions, []*Transmission{ + {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: reportContexts[0]}, + }) l, err = orm.LatestReport(testutils.Context(t), feedID) require.NoError(t, err) @@ -63,10 +79,10 @@ func TestORM(t *testing.T) { assert.Equal(t, reports[2], l) // Test requests can be deleted. - err = orm.DeleteTransmitRequests([]*pb.TransmitRequest{{Payload: reports[1]}}) + err = orm.DeleteTransmitRequests(sURL, []*pb.TransmitRequest{{Payload: reports[1]}}) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests(jobID) + transmissions, err = orm.GetTransmitRequests(sURL, jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]}, @@ -78,10 +94,10 @@ func TestORM(t *testing.T) { assert.Equal(t, reports[2], l) // Test deleting non-existent requests does not error. - err = orm.DeleteTransmitRequests([]*pb.TransmitRequest{{Payload: []byte("does-not-exist")}}) + err = orm.DeleteTransmitRequests(sURL, []*pb.TransmitRequest{{Payload: []byte("does-not-exist")}}) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests(jobID) + transmissions, err = orm.GetTransmitRequests(sURL, jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]}, @@ -89,7 +105,7 @@ func TestORM(t *testing.T) { }) // Test deleting multiple requests. - err = orm.DeleteTransmitRequests([]*pb.TransmitRequest{ + err = orm.DeleteTransmitRequests(sURL, []*pb.TransmitRequest{ {Payload: reports[0]}, {Payload: reports[2]}, }) @@ -99,27 +115,27 @@ func TestORM(t *testing.T) { require.NoError(t, err) assert.Equal(t, reports[2], l) - transmissions, err = orm.GetTransmitRequests(jobID) + transmissions, err = orm.GetTransmitRequests(sURL, jobID) require.NoError(t, err) require.Empty(t, transmissions) // More inserts. - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) + err = orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests(jobID) + transmissions, err = orm.GetTransmitRequests(sURL, jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: reportContexts[3]}, }) // Duplicate requests are ignored. - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) + err = orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) + err = orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests(jobID) + transmissions, err = orm.GetTransmitRequests(sURL, jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: reportContexts[3]}, @@ -128,6 +144,12 @@ func TestORM(t *testing.T) { l, err = orm.LatestReport(testutils.Context(t), feedID) require.NoError(t, err) assert.Equal(t, reports[3], l) + + // s2 not affected by deletion + transmissions, err = orm.GetTransmitRequests(sURL2, jobID) + require.NoError(t, err) + require.Len(t, transmissions, 1) + } func TestORM_PruneTransmitRequests(t *testing.T) { @@ -152,60 +174,75 @@ func TestORM_PruneTransmitRequests(t *testing.T) { } } - err := orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1)) + // s1 + err := orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1)) require.NoError(t, err) - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2)) + err = orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2)) + require.NoError(t, err) + // s2 - should not be touched + err = orm.InsertTransmitRequest(sURL2, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 0)) + require.NoError(t, err) + err = orm.InsertTransmitRequest(sURL2, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1)) + require.NoError(t, err) + err = orm.InsertTransmitRequest(sURL2, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2)) + require.NoError(t, err) + err = orm.InsertTransmitRequest(sURL2, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 3)) require.NoError(t, err) - // Max size greater than table size, expect no-op - err = orm.PruneTransmitRequests(jobID, 5) + // Max size greater than number of records, expect no-op + err = orm.PruneTransmitRequests(sURL, jobID, 5) require.NoError(t, err) - transmissions, err := orm.GetTransmitRequests(jobID) + transmissions, err := orm.GetTransmitRequests(sURL, jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, {Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)}, }) - // Max size equal to table size, expect no-op - err = orm.PruneTransmitRequests(jobID, 2) + // Max size equal to number of records, expect no-op + err = orm.PruneTransmitRequests(sURL, jobID, 2) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests(jobID) + transmissions, err = orm.GetTransmitRequests(sURL, jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, {Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)}, }) - // Max size is table size + 1, but jobID differs, expect no-op - err = orm.PruneTransmitRequests(-1, 2) + // Max size is number of records + 1, but jobID differs, expect no-op + err = orm.PruneTransmitRequests(sURL, -1, 2) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests(jobID) + transmissions, err = orm.GetTransmitRequests(sURL, jobID) require.NoError(t, err) require.Equal(t, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, {Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)}, }, transmissions) - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1)) + err = orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1)) require.NoError(t, err) - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2)) + err = orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2)) require.NoError(t, err) // Max size is table size - 1, expect the oldest row to be pruned. - err = orm.PruneTransmitRequests(jobID, 3) + err = orm.PruneTransmitRequests(sURL, jobID, 3) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests(jobID) + transmissions, err = orm.GetTransmitRequests(sURL, jobID) require.NoError(t, err) require.Equal(t, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: makeReportContext(2, 2)}, {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: makeReportContext(2, 1)}, {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, }, transmissions) + + // s2 not touched + transmissions, err = orm.GetTransmitRequests(sURL2, jobID) + require.NoError(t, err) + assert.Len(t, transmissions, 3) } func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { @@ -231,7 +268,13 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { } } - err := orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext( + err := orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext( + 0, 0, + )) + require.NoError(t, err) + + // this should be ignored, because report context is the same + err = orm.InsertTransmitRequest(sURL2, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext( 0, 0, )) require.NoError(t, err) @@ -241,7 +284,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[0], l) t.Run("replaces if epoch and round are larger", func(t *testing.T) { - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 1)) + err = orm.InsertTransmitRequest("foo", &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 1)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) @@ -249,7 +292,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[1], l) }) t.Run("replaces if epoch is the same but round is greater", func(t *testing.T) { - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 2)) + err = orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 2)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) @@ -257,7 +300,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[2], l) }) t.Run("replaces if epoch is larger but round is smaller", func(t *testing.T) { - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 1)) + err = orm.InsertTransmitRequest("bar", &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 1)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) @@ -265,7 +308,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[3], l) }) t.Run("does not overwrite if epoch/round is the same", func(t *testing.T) { - err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(2, 1)) + err = orm.InsertTransmitRequest(sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(2, 1)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) diff --git a/core/services/relay/evm/mercury/persistence_manager.go b/core/services/relay/evm/mercury/persistence_manager.go index 779e275f154..bcf6403ffeb 100644 --- a/core/services/relay/evm/mercury/persistence_manager.go +++ b/core/services/relay/evm/mercury/persistence_manager.go @@ -20,8 +20,9 @@ var ( ) type PersistenceManager struct { - lggr logger.Logger - orm ORM + lggr logger.Logger + orm ORM + serverURL string once services.StateMachine stopCh services.StopChan @@ -37,10 +38,11 @@ type PersistenceManager struct { pruneFrequency time.Duration } -func NewPersistenceManager(lggr logger.Logger, orm ORM, jobID int32, maxTransmitQueueSize int, flushDeletesFrequency, pruneFrequency time.Duration) *PersistenceManager { +func NewPersistenceManager(lggr logger.Logger, serverURL string, orm ORM, jobID int32, maxTransmitQueueSize int, flushDeletesFrequency, pruneFrequency time.Duration) *PersistenceManager { return &PersistenceManager{ lggr: lggr.Named("MercuryPersistenceManager"), orm: orm, + serverURL: serverURL, stopCh: make(services.StopChan), jobID: jobID, maxTransmitQueueSize: maxTransmitQueueSize, @@ -67,11 +69,11 @@ func (pm *PersistenceManager) Close() error { } func (pm *PersistenceManager) Insert(ctx context.Context, req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) error { - return pm.orm.InsertTransmitRequest(req, pm.jobID, reportCtx, pg.WithParentCtx(ctx)) + return pm.orm.InsertTransmitRequest(pm.serverURL, req, pm.jobID, reportCtx, pg.WithParentCtx(ctx)) } func (pm *PersistenceManager) Delete(ctx context.Context, req *pb.TransmitRequest) error { - return pm.orm.DeleteTransmitRequests([]*pb.TransmitRequest{req}, pg.WithParentCtx(ctx)) + return pm.orm.DeleteTransmitRequests(pm.serverURL, []*pb.TransmitRequest{req}, pg.WithParentCtx(ctx)) } func (pm *PersistenceManager) AsyncDelete(req *pb.TransmitRequest) { @@ -79,7 +81,7 @@ func (pm *PersistenceManager) AsyncDelete(req *pb.TransmitRequest) { } func (pm *PersistenceManager) Load(ctx context.Context) ([]*Transmission, error) { - return pm.orm.GetTransmitRequests(pm.jobID, pg.WithParentCtx(ctx)) + return pm.orm.GetTransmitRequests(pm.serverURL, pm.jobID, pg.WithParentCtx(ctx)) } func (pm *PersistenceManager) runFlushDeletesLoop() { @@ -96,7 +98,7 @@ func (pm *PersistenceManager) runFlushDeletesLoop() { return case <-ticker.C: queuedReqs := pm.resetDeleteQueue() - if err := pm.orm.DeleteTransmitRequests(queuedReqs, pg.WithParentCtx(ctx)); err != nil { + if err := pm.orm.DeleteTransmitRequests(pm.serverURL, queuedReqs, pg.WithParentCtx(ctx)); err != nil { pm.lggr.Errorw("Failed to delete queued transmit requests", "err", err) pm.addToDeleteQueue(queuedReqs...) } else { @@ -119,7 +121,7 @@ func (pm *PersistenceManager) runPruneLoop() { ticker.Stop() return case <-ticker.C: - if err := pm.orm.PruneTransmitRequests(pm.jobID, pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil { + if err := pm.orm.PruneTransmitRequests(pm.serverURL, pm.jobID, pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil { pm.lggr.Errorw("Failed to prune transmit requests table", "err", err) } else { pm.lggr.Debugw("Pruned transmit requests table") diff --git a/core/services/relay/evm/mercury/persistence_manager_test.go b/core/services/relay/evm/mercury/persistence_manager_test.go index 755d64a5a23..15b1424f1a4 100644 --- a/core/services/relay/evm/mercury/persistence_manager_test.go +++ b/core/services/relay/evm/mercury/persistence_manager_test.go @@ -23,7 +23,7 @@ func bootstrapPersistenceManager(t *testing.T, jobID int32, db *sqlx.DB) (*Persi t.Helper() lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel) orm := NewORM(db, lggr, pgtest.NewQConfig(true)) - return NewPersistenceManager(lggr, orm, jobID, 2, 5*time.Millisecond, 5*time.Millisecond), observedLogs + return NewPersistenceManager(lggr, "mercuryserver.example", orm, jobID, 2, 5*time.Millisecond, 5*time.Millisecond), observedLogs } func TestPersistenceManager(t *testing.T) { diff --git a/core/services/relay/evm/mercury/queue.go b/core/services/relay/evm/mercury/queue.go index 07ef8a97426..8a89f47302b 100644 --- a/core/services/relay/evm/mercury/queue.go +++ b/core/services/relay/evm/mercury/queue.go @@ -31,7 +31,7 @@ var transmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "mercury_transmit_queue_load", Help: "Percent of transmit queue capacity used", }, - []string{"feedID", "capacity"}, + []string{"feedID", "serverURL", "capacity"}, ) // Prometheus' default interval is 15s, set this to under 7.5s to avoid @@ -64,9 +64,7 @@ type Transmission struct { // maxlen controls how many items will be stored in the queue // 0 means unlimited - be careful, this can cause memory leaks -func NewTransmitQueue(lggr logger.Logger, feedID string, maxlen int, transmissions []*Transmission, asyncDeleter asyncDeleter) *TransmitQueue { - pq := priorityQueue(transmissions) - heap.Init(&pq) // ensure the heap is ordered +func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) *TransmitQueue { mu := new(sync.RWMutex) return &TransmitQueue{ services.StateMachine{}, @@ -74,14 +72,20 @@ func NewTransmitQueue(lggr logger.Logger, feedID string, maxlen int, transmissio lggr.Named("TransmitQueue"), asyncDeleter, mu, - &pq, + nil, // pq needs to be initialized by calling tq.Init before use maxlen, false, nil, - transmitQueueLoad.WithLabelValues(feedID, fmt.Sprintf("%d", maxlen)), + transmitQueueLoad.WithLabelValues(feedID, serverURL, fmt.Sprintf("%d", maxlen)), } } +func (tq *TransmitQueue) Init(transmissions []*Transmission) { + pq := priorityQueue(transmissions) + heap.Init(&pq) // ensure the heap is ordered + tq.pq = &pq +} + func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) { tq.cond.L.Lock() defer tq.cond.L.Unlock() diff --git a/core/services/relay/evm/mercury/queue_test.go b/core/services/relay/evm/mercury/queue_test.go index de2f64f9fe9..8e5a0caf614 100644 --- a/core/services/relay/evm/mercury/queue_test.go +++ b/core/services/relay/evm/mercury/queue_test.go @@ -68,7 +68,8 @@ func Test_Queue(t *testing.T) { lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel) testTransmissions := createTestTransmissions(t) deleter := mocks.NewAsyncDeleter(t) - transmitQueue := NewTransmitQueue(lggr, "foo feed ID", 7, nil, deleter) + transmitQueue := NewTransmitQueue(lggr, sURL, "foo feed ID", 7, deleter) + transmitQueue.Init([]*Transmission{}) t.Run("successfully add transmissions to transmit queue", func(t *testing.T) { for _, tt := range testTransmissions { @@ -138,7 +139,8 @@ func Test_Queue(t *testing.T) { }, }, } - transmitQueue := NewTransmitQueue(lggr, "foo feed ID", 7, transmissions, deleter) + transmitQueue := NewTransmitQueue(lggr, sURL, "foo feed ID", 7, deleter) + transmitQueue.Init(transmissions) transmission := transmitQueue.BlockingPop() assert.Equal(t, transmission.Req.Payload, []byte("new1")) diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 9444b904b89..6f49ca91bfc 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -6,7 +6,9 @@ import ( "crypto/ed25519" "errors" "fmt" + "io" "math/big" + "sort" "sync" "time" @@ -16,8 +18,7 @@ import ( pkgerrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/jmoiron/sqlx" + "golang.org/x/sync/errgroup" "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -26,7 +27,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/mercury" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" @@ -49,43 +49,43 @@ var ( Name: "mercury_transmit_success_count", Help: "Number of successful transmissions (duplicates are counted as success)", }, - []string{"feedID"}, + []string{"feedID", "serverURL"}, ) transmitDuplicateCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "mercury_transmit_duplicate_count", Help: "Number of transmissions where the server told us it was a duplicate", }, - []string{"feedID"}, + []string{"feedID", "serverURL"}, ) transmitConnectionErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "mercury_transmit_connection_error_count", Help: "Number of errored transmissions that failed due to problem with the connection", }, - []string{"feedID"}, + []string{"feedID", "serverURL"}, ) transmitQueueDeleteErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "mercury_transmit_queue_delete_error_count", Help: "Running count of DB errors when trying to delete an item from the queue DB", }, - []string{"feedID"}, + []string{"feedID", "serverURL"}, ) transmitQueueInsertErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "mercury_transmit_queue_insert_error_count", Help: "Running count of DB errors when trying to insert an item into the queue DB", }, - []string{"feedID"}, + []string{"feedID", "serverURL"}, ) transmitQueuePushErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "mercury_transmit_queue_push_error_count", Help: "Running count of DB errors when trying to push an item onto the queue", }, - []string{"feedID"}, + []string{"feedID", "serverURL"}, ) transmitServerErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "mercury_transmit_server_error_count", Help: "Number of errored transmissions that failed due to an error returned by the mercury server", }, - []string{"feedID", "code"}, + []string{"feedID", "serverURL", "code"}, ) ) @@ -106,27 +106,18 @@ var _ Transmitter = (*mercuryTransmitter)(nil) type mercuryTransmitter struct { services.StateMachine - lggr logger.Logger - rpcClient wsrpc.Client - persistenceManager *PersistenceManager - codec TransmitterReportDecoder + lggr logger.Logger + + servers map[string]*server + + codec TransmitterReportDecoder feedID mercuryutils.FeedID jobID int32 fromAccount string stopCh services.StopChan - queue *TransmitQueue - wg sync.WaitGroup - - deleteQueue chan *pb.TransmitRequest - - transmitSuccessCount prometheus.Counter - transmitDuplicateCount prometheus.Counter - transmitConnectionErrorCount prometheus.Counter - transmitQueueDeleteErrorCount prometheus.Counter - transmitQueueInsertErrorCount prometheus.Counter - transmitQueuePushErrorCount prometheus.Counter + wg *sync.WaitGroup } var PayloadTypes = getPayloadTypes() @@ -148,83 +139,33 @@ func getPayloadTypes() abi.Arguments { }) } -func NewTransmitter(lggr logger.Logger, rpcClient wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, db *sqlx.DB, cfg pg.QConfig, codec TransmitterReportDecoder) *mercuryTransmitter { - feedIDHex := fmt.Sprintf("0x%x", feedID[:]) - persistenceManager := NewPersistenceManager(lggr, NewORM(db, lggr, cfg), jobID, maxTransmitQueueSize, flushDeletesFrequency, pruneFrequency) - return &mercuryTransmitter{ - services.StateMachine{}, - lggr.Named("MercuryTransmitter").With("feedID", feedIDHex), - rpcClient, - persistenceManager, - codec, - feedID, - jobID, - fmt.Sprintf("%x", fromAccount), - make(services.StopChan), - nil, - sync.WaitGroup{}, - make(chan *pb.TransmitRequest, maxDeleteQueueSize), - transmitSuccessCount.WithLabelValues(feedIDHex), - transmitDuplicateCount.WithLabelValues(feedIDHex), - transmitConnectionErrorCount.WithLabelValues(feedIDHex), - transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex), - transmitQueueInsertErrorCount.WithLabelValues(feedIDHex), - transmitQueuePushErrorCount.WithLabelValues(feedIDHex), - } -} +type server struct { + lggr logger.Logger -func (mt *mercuryTransmitter) Start(ctx context.Context) (err error) { - return mt.StartOnce("MercuryTransmitter", func() error { - mt.lggr.Debugw("Loading transmit requests from database") - if err := mt.persistenceManager.Start(ctx); err != nil { - return err - } - transmissions, err := mt.persistenceManager.Load(ctx) - if err != nil { - return err - } - mt.queue = NewTransmitQueue(mt.lggr, mt.feedID.String(), maxTransmitQueueSize, transmissions, mt.persistenceManager) + c wsrpc.Client + pm *PersistenceManager + q *TransmitQueue - if err := mt.rpcClient.Start(ctx); err != nil { - return err - } - if err := mt.queue.Start(ctx); err != nil { - return err - } - mt.wg.Add(1) - go mt.runDeleteQueueLoop() - mt.wg.Add(1) - go mt.runQueueLoop() - return nil - }) -} + deleteQueue chan *pb.TransmitRequest -func (mt *mercuryTransmitter) Close() error { - return mt.StopOnce("MercuryTransmitter", func() error { - if err := mt.queue.Close(); err != nil { - return err - } - if err := mt.persistenceManager.Close(); err != nil { - return err - } - close(mt.stopCh) - mt.wg.Wait() - return mt.rpcClient.Close() - }) + transmitSuccessCount prometheus.Counter + transmitDuplicateCount prometheus.Counter + transmitConnectionErrorCount prometheus.Counter + transmitQueueDeleteErrorCount prometheus.Counter + transmitQueueInsertErrorCount prometheus.Counter + transmitQueuePushErrorCount prometheus.Counter } -func (mt *mercuryTransmitter) Name() string { return mt.lggr.Name() } - -func (mt *mercuryTransmitter) HealthReport() map[string]error { - report := map[string]error{mt.Name(): mt.Healthy()} - services.CopyHealth(report, mt.rpcClient.HealthReport()) - services.CopyHealth(report, mt.queue.HealthReport()) +func (s *server) HealthReport() map[string]error { + report := map[string]error{} + services.CopyHealth(report, s.c.HealthReport()) + services.CopyHealth(report, s.q.HealthReport()) return report } -func (mt *mercuryTransmitter) runDeleteQueueLoop() { - defer mt.wg.Done() - runloopCtx, cancel := mt.stopCh.Ctx(context.Background()) +func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup) { + defer wg.Done() + runloopCtx, cancel := stopCh.Ctx(context.Background()) defer cancel() // Exponential backoff for very rarely occurring errors (DB disconnect etc) @@ -237,16 +178,16 @@ func (mt *mercuryTransmitter) runDeleteQueueLoop() { for { select { - case req := <-mt.deleteQueue: + case req := <-s.deleteQueue: for { - if err := mt.persistenceManager.Delete(runloopCtx, req); err != nil { - mt.lggr.Errorw("Failed to delete transmit request record", "err", err, "req.Payload", req.Payload) - mt.transmitQueueDeleteErrorCount.Inc() + if err := s.pm.Delete(runloopCtx, req); err != nil { + s.lggr.Errorw("Failed to delete transmit request record", "err", err, "req.Payload", req.Payload) + s.transmitQueueDeleteErrorCount.Inc() select { case <-time.After(b.Duration()): // Wait a backoff duration before trying to delete again continue - case <-mt.stopCh: + case <-stopCh: // abort and return immediately on stop even if items remain in queue return } @@ -255,15 +196,15 @@ func (mt *mercuryTransmitter) runDeleteQueueLoop() { } // success b.Reset() - case <-mt.stopCh: + case <-stopCh: // abort and return immediately on stop even if items remain in queue return } } } -func (mt *mercuryTransmitter) runQueueLoop() { - defer mt.wg.Done() +func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feedIDHex string) { + defer wg.Done() // Exponential backoff with very short retry interval (since latency is a priority) // 5ms, 10ms, 20ms, 40ms etc b := backoff.Backoff{ @@ -272,26 +213,26 @@ func (mt *mercuryTransmitter) runQueueLoop() { Factor: 2, Jitter: true, } - runloopCtx, cancel := mt.stopCh.Ctx(context.Background()) + runloopCtx, cancel := stopCh.Ctx(context.Background()) defer cancel() for { - t := mt.queue.BlockingPop() + t := s.q.BlockingPop() if t == nil { // queue was closed return } ctx, cancel := context.WithTimeout(runloopCtx, utils.WithJitter(transmitTimeout)) - res, err := mt.rpcClient.Transmit(ctx, t.Req) + res, err := s.c.Transmit(ctx, t.Req) cancel() if runloopCtx.Err() != nil { // runloop context is only canceled on transmitter close so we can // exit the runloop here return } else if err != nil { - mt.transmitConnectionErrorCount.Inc() - mt.lggr.Errorw("Transmit report failed", "err", err, "reportCtx", t.ReportCtx) - if ok := mt.queue.Push(t.Req, t.ReportCtx); !ok { - mt.lggr.Error("Failed to push report to transmit queue; queue is closed") + s.transmitConnectionErrorCount.Inc() + s.lggr.Errorw("Transmit report failed", "err", err, "reportCtx", t.ReportCtx) + if ok := s.q.Push(t.Req, t.ReportCtx); !ok { + s.lggr.Error("Failed to push report to transmit queue; queue is closed") return } // Wait a backoff duration before pulling the most recent transmission @@ -299,36 +240,132 @@ func (mt *mercuryTransmitter) runQueueLoop() { select { case <-time.After(b.Duration()): continue - case <-mt.stopCh: + case <-stopCh: return } } b.Reset() if res.Error == "" { - mt.transmitSuccessCount.Inc() - mt.lggr.Debugw("Transmit report success", "payload", hexutil.Encode(t.Req.Payload), "response", res, "reportCtx", t.ReportCtx) + s.transmitSuccessCount.Inc() + s.lggr.Debugw("Transmit report success", "payload", hexutil.Encode(t.Req.Payload), "response", res, "reportCtx", t.ReportCtx) } else { // We don't need to retry here because the mercury server // has confirmed it received the report. We only need to retry // on networking/unknown errors switch res.Code { case DuplicateReport: - mt.transmitSuccessCount.Inc() - mt.transmitDuplicateCount.Inc() - mt.lggr.Debugw("Transmit report success; duplicate report", "payload", hexutil.Encode(t.Req.Payload), "response", res, "reportCtx", t.ReportCtx) + s.transmitSuccessCount.Inc() + s.transmitDuplicateCount.Inc() + s.lggr.Debugw("Transmit report success; duplicate report", "payload", hexutil.Encode(t.Req.Payload), "response", res, "reportCtx", t.ReportCtx) default: - transmitServerErrorCount.WithLabelValues(mt.feedID.String(), fmt.Sprintf("%d", res.Code)).Inc() - mt.lggr.Errorw("Transmit report failed; mercury server returned error", "response", res, "reportCtx", t.ReportCtx, "err", res.Error, "code", res.Code) + transmitServerErrorCount.WithLabelValues(feedIDHex, fmt.Sprintf("%d", res.Code)).Inc() + s.lggr.Errorw("Transmit report failed; mercury server returned error", "response", res, "reportCtx", t.ReportCtx, "err", res.Error, "code", res.Code) } } select { - case mt.deleteQueue <- t.Req: + case s.deleteQueue <- t.Req: default: - mt.lggr.Criticalw("Delete queue is full", "reportCtx", t.ReportCtx) + s.lggr.Criticalw("Delete queue is full", "reportCtx", t.ReportCtx) + } + } +} + +func NewTransmitter(lggr logger.Logger, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter { + feedIDHex := fmt.Sprintf("0x%x", feedID[:]) + servers := make(map[string]*server, len(clients)) + for serverURL, client := range clients { + cLggr := lggr.Named(serverURL).With("serverURL", serverURL) + pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, maxTransmitQueueSize, flushDeletesFrequency, pruneFrequency) + servers[serverURL] = &server{ + cLggr, + client, + pm, + NewTransmitQueue(cLggr, serverURL, feedIDHex, maxTransmitQueueSize, pm), + make(chan *pb.TransmitRequest, maxDeleteQueueSize), + transmitSuccessCount.WithLabelValues(feedIDHex, serverURL), + transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL), + transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL), + transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex, serverURL), + transmitQueueInsertErrorCount.WithLabelValues(feedIDHex, serverURL), + transmitQueuePushErrorCount.WithLabelValues(feedIDHex, serverURL), + } + } + return &mercuryTransmitter{ + services.StateMachine{}, + lggr.Named("MercuryTransmitter").With("feedID", feedIDHex), + servers, + codec, + feedID, + jobID, + fmt.Sprintf("%x", fromAccount), + make(services.StopChan), + &sync.WaitGroup{}, + } +} + +func (mt *mercuryTransmitter) Start(ctx context.Context) (err error) { + return mt.StartOnce("MercuryTransmitter", func() error { + mt.lggr.Debugw("Loading transmit requests from database") + + { + var startClosers []services.StartClose + for _, s := range mt.servers { + transmissions, err := s.pm.Load(ctx) + if err != nil { + return err + } + s.q.Init(transmissions) + // starting pm after loading from it is fine because it simply spawns some garbage collection/prune goroutines + startClosers = append(startClosers, s.c, s.q, s.pm) + + mt.wg.Add(2) + go s.runDeleteQueueLoop(mt.stopCh, mt.wg) + go s.runQueueLoop(mt.stopCh, mt.wg, mt.feedID.Hex()) + } + if err := (&services.MultiStart{}).Start(ctx, startClosers...); err != nil { + return err + } + } + + return nil + }) +} + +func (mt *mercuryTransmitter) Close() error { + return mt.StopOnce("MercuryTransmitter", func() error { + // Drain all the queues first + var qs []io.Closer + for _, s := range mt.servers { + qs = append(qs, s.q) + } + if err := services.CloseAll(qs...); err != nil { + return err + } + + close(mt.stopCh) + mt.wg.Wait() + + // Close all the persistence managers + // Close all the clients + var closers []io.Closer + for _, s := range mt.servers { + closers = append(closers, s.pm) + closers = append(closers, s.c) } + return services.CloseAll(closers...) + }) +} + +func (mt *mercuryTransmitter) Name() string { return mt.lggr.Name() } + +func (mt *mercuryTransmitter) HealthReport() map[string]error { + report := map[string]error{mt.Name(): mt.Healthy()} + for _, s := range mt.servers { + services.CopyHealth(report, s.HealthReport()) } + return report } // Transmit sends the report to the on-chain smart contract's Transmit method. @@ -358,15 +395,23 @@ func (mt *mercuryTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes.R mt.lggr.Tracew("Transmit enqueue", "req.Payload", req.Payload, "report", report, "reportCtx", reportCtx, "signatures", signatures) - if err := mt.persistenceManager.Insert(ctx, req, reportCtx); err != nil { - mt.transmitQueueInsertErrorCount.Inc() - return err - } - if ok := mt.queue.Push(req, reportCtx); !ok { - mt.transmitQueuePushErrorCount.Inc() - return errors.New("transmit queue is closed") + g := new(errgroup.Group) + for _, s := range mt.servers { + s := s // https://golang.org/doc/faq#closures_and_goroutines + g.Go(func() error { + if err := s.pm.Insert(ctx, req, reportCtx); err != nil { + s.transmitQueueInsertErrorCount.Inc() + return err + } + if ok := s.q.Push(req, reportCtx); !ok { + s.transmitQueuePushErrorCount.Inc() + return errors.New("transmit queue is closed") + } + return nil + }) } - return nil + + return g.Wait() } // FromAccount returns the stringified (hex) CSA public key @@ -444,29 +489,67 @@ func (mt *mercuryTransmitter) latestReport(ctx context.Context, feedID [32]byte) req := &pb.LatestReportRequest{ FeedId: feedID[:], } - resp, err := mt.rpcClient.LatestReport(ctx, req) - if err != nil { - mt.lggr.Warnw("latestReport failed", "err", err) - return nil, pkgerrors.Wrap(err, "latestReport failed") - } - if resp == nil { - return nil, errors.New("latestReport expected non-nil response") - } - if resp.Error != "" { - err = errors.New(resp.Error) - mt.lggr.Warnw("latestReport failed; mercury server returned error", "err", err) - return nil, err + + var reports []*pb.Report + mu := sync.Mutex{} + var g errgroup.Group + for _, s := range mt.servers { + s := s + g.Go(func() error { + resp, err := s.c.LatestReport(ctx, req) + if err != nil { + s.lggr.Warnw("latestReport failed", "err", err) + return err + } + if resp == nil { + err = errors.New("latestReport expected non-nil response from server") + s.lggr.Warn(err.Error()) + return err + } + if resp.Error != "" { + err = errors.New(resp.Error) + s.lggr.Warnw("latestReport failed; mercury server returned error", "err", err) + return fmt.Errorf("latestReport failed; mercury server returned error: %s", resp.Error) + } + if resp.Report == nil { + s.lggr.Tracew("latestReport success: returned nil") + } else if !bytes.Equal(resp.Report.FeedId, feedID[:]) { + err = fmt.Errorf("latestReport failed; mismatched feed IDs, expected: 0x%x, got: 0x%x", mt.feedID[:], resp.Report.FeedId[:]) + s.lggr.Errorw("latestReport failed", "err", err) + return err + } else { + s.lggr.Tracew("latestReport success", "observationsTimestamp", resp.Report.ObservationsTimestamp, "currentBlockNum", resp.Report.CurrentBlockNumber) + } + mu.Lock() + defer mu.Unlock() + reports = append(reports, resp.Report) + return nil + }) } - if resp.Report == nil { - mt.lggr.Tracew("latestReport success: returned nil") - return nil, nil - } else if !bytes.Equal(resp.Report.FeedId, feedID[:]) { - err = fmt.Errorf("latestReport failed; mismatched feed IDs, expected: 0x%x, got: 0x%x", mt.feedID[:], resp.Report.FeedId[:]) - mt.lggr.Errorw("latestReport failed", "err", err) - return nil, err + err := g.Wait() + + if len(reports) == 0 { + return nil, fmt.Errorf("latestReport failed; all servers returned an error: %w", err) } - mt.lggr.Tracew("latestReport success", "currentBlockNum", resp.Report.CurrentBlockNumber) + sortReportsLatestFirst(reports) + + return reports[0], nil +} - return resp.Report, nil +func sortReportsLatestFirst(reports []*pb.Report) { + sort.Slice(reports, func(i, j int) bool { + // nils are "earliest" so they go to the end + if reports[i] == nil { + return false + } else if reports[j] == nil { + return true + } + // Handle block number case + if reports[i].ObservationsTimestamp == reports[j].ObservationsTimestamp { + return reports[i].CurrentBlockNumber > reports[j].CurrentBlockNumber + } + // Timestamp case + return reports[i].ObservationsTimestamp > reports[j].ObservationsTimestamp + }) } diff --git a/core/services/relay/evm/mercury/transmitter_test.go b/core/services/relay/evm/mercury/transmitter_test.go index 188beff5113..d7d62a9f422 100644 --- a/core/services/relay/evm/mercury/transmitter_test.go +++ b/core/services/relay/evm/mercury/transmitter_test.go @@ -16,6 +16,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" mercurytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" mocks "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" ) @@ -26,62 +27,78 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { var jobID int32 pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) - q := NewTransmitQueue(lggr, "", 0, nil, nil) codec := new(mockCodec) + orm := NewORM(db, lggr, pgtest.NewQConfig(true)) + clients := map[string]wsrpc.Client{} + + t.Run("with one mercury server", func(t *testing.T) { + t.Run("v1 report transmission successfully enqueued", func(t *testing.T) { + report := sampleV1Report + c := &mocks.MockWSRPCClient{} + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + // init the queue since we skipped starting transmitter + mt.servers[sURL].q.Init([]*Transmission{}) + err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) + require.NoError(t, err) - t.Run("v1 report transmission successfully enqueued", func(t *testing.T) { - report := sampleV1Report - c := mocks.MockWSRPCClient{ - TransmitF: func(ctx context.Context, in *pb.TransmitRequest) (out *pb.TransmitResponse, err error) { - require.NotNil(t, in) - assert.Equal(t, hexutil.Encode(buildSamplePayload(report)), hexutil.Encode(in.Payload)) - out = new(pb.TransmitResponse) - out.Code = 42 - out.Error = "" - return out, nil - }, - } - mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) - mt.queue = q - err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) + // ensure it was added to the queue + require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + }) + t.Run("v2 report transmission successfully enqueued", func(t *testing.T) { + report := sampleV2Report + c := &mocks.MockWSRPCClient{} + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + // init the queue since we skipped starting transmitter + mt.servers[sURL].q.Init([]*Transmission{}) + err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) + require.NoError(t, err) - require.NoError(t, err) - }) - t.Run("v2 report transmission successfully enqueued", func(t *testing.T) { - report := sampleV2Report - c := mocks.MockWSRPCClient{ - TransmitF: func(ctx context.Context, in *pb.TransmitRequest) (out *pb.TransmitResponse, err error) { - require.NotNil(t, in) - assert.Equal(t, hexutil.Encode(buildSamplePayload(report)), hexutil.Encode(in.Payload)) - out = new(pb.TransmitResponse) - out.Code = 42 - out.Error = "" - return out, nil - }, - } - mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) - mt.queue = q - err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) + // ensure it was added to the queue + require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + }) + t.Run("v3 report transmission successfully enqueued", func(t *testing.T) { + report := sampleV3Report + c := &mocks.MockWSRPCClient{} + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + // init the queue since we skipped starting transmitter + mt.servers[sURL].q.Init([]*Transmission{}) + err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) + require.NoError(t, err) - require.NoError(t, err) + // ensure it was added to the queue + require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + }) }) - t.Run("v3 report transmission successfully enqueued", func(t *testing.T) { + + t.Run("with multiple mercury servers", func(t *testing.T) { report := sampleV3Report - c := mocks.MockWSRPCClient{ - TransmitF: func(ctx context.Context, in *pb.TransmitRequest) (out *pb.TransmitResponse, err error) { - require.NotNil(t, in) - assert.Equal(t, hexutil.Encode(buildSamplePayload(report)), hexutil.Encode(in.Payload)) - out = new(pb.TransmitResponse) - out.Code = 42 - out.Error = "" - return out, nil - }, - } - mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) - mt.queue = q - err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) + c := &mocks.MockWSRPCClient{} + clients[sURL] = c + clients[sURL2] = c + clients[sURL3] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + // init the queue since we skipped starting transmitter + mt.servers[sURL].q.Init([]*Transmission{}) + mt.servers[sURL2].q.Init([]*Transmission{}) + mt.servers[sURL3].q.Init([]*Transmission{}) + + err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) require.NoError(t, err) + + // ensure it was added to the queue + require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL2].q.pq.Len(), 1) + assert.Subset(t, mt.servers[sURL2].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL3].q.pq.Len(), 1) + assert.Subset(t, mt.servers[sURL3].q.pq.Pop().(*Transmission).Req.Payload, report) }) } @@ -92,8 +109,11 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { var jobID int32 codec := new(mockCodec) + orm := NewORM(db, lggr, pgtest.NewQConfig(true)) + clients := map[string]wsrpc.Client{} + t.Run("successful query", func(t *testing.T) { - c := mocks.MockWSRPCClient{ + c := &mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { require.NotNil(t, in) assert.Equal(t, hexutil.Encode(sampleFeedID[:]), hexutil.Encode(in.FeedId)) @@ -104,7 +124,8 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -112,14 +133,15 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { }) t.Run("successful query returning nil report (new feed) gives latest timestamp = -1", func(t *testing.T) { - c := mocks.MockWSRPCClient{ + c := &mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { out = new(pb.LatestReportResponse) out.Report = nil return out, nil }, } - mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -127,16 +149,48 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { }) t.Run("failing query", func(t *testing.T) { - c := mocks.MockWSRPCClient{ + c := &mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { return nil, errors.New("something exploded") }, } - mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.LatestTimestamp(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") }) + + t.Run("with multiple servers, uses latest", func(t *testing.T) { + clients[sURL] = &mocks.MockWSRPCClient{ + LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { + return nil, errors.New("something exploded") + }, + } + clients[sURL2] = &mocks.MockWSRPCClient{ + LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { + out = new(pb.LatestReportResponse) + out.Report = new(pb.Report) + out.Report.FeedId = sampleFeedID[:] + out.Report.ObservationsTimestamp = 42 + return out, nil + }, + } + clients[sURL3] = &mocks.MockWSRPCClient{ + LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { + out = new(pb.LatestReportResponse) + out.Report = new(pb.Report) + out.Report.FeedId = sampleFeedID[:] + out.Report.ObservationsTimestamp = 41 + return out, nil + }, + } + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + ts, err := mt.LatestTimestamp(testutils.Context(t)) + require.NoError(t, err) + + assert.Equal(t, int64(42), ts) + }) } type mockCodec struct { @@ -157,10 +211,12 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { var jobID int32 codec := new(mockCodec) + orm := NewORM(db, lggr, pgtest.NewQConfig(true)) + clients := map[string]wsrpc.Client{} t.Run("successful query", func(t *testing.T) { originalPrice := big.NewInt(123456789) - c := mocks.MockWSRPCClient{ + c := &mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { require.NotNil(t, in) assert.Equal(t, hexutil.Encode(sampleFeedID[:]), hexutil.Encode(in.FeedId)) @@ -171,7 +227,8 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) t.Run("BenchmarkPriceFromReport succeeds", func(t *testing.T) { codec.val = originalPrice @@ -194,14 +251,15 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }) t.Run("successful query returning nil report (new feed)", func(t *testing.T) { - c := mocks.MockWSRPCClient{ + c := &mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { out = new(pb.LatestReportResponse) out.Report = nil return out, nil }, } - mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) price, err := mt.LatestPrice(testutils.Context(t), sampleFeedID) require.NoError(t, err) @@ -209,12 +267,13 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }) t.Run("failing query", func(t *testing.T) { - c := mocks.MockWSRPCClient{ + c := &mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { return nil, errors.New("something exploded") }, } - mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.LatestPrice(testutils.Context(t), sampleFeedID) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -228,9 +287,11 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { db := pgtest.NewSqlxDB(t) var jobID int32 codec := new(mockCodec) + orm := NewORM(db, lggr, pgtest.NewQConfig(true)) + clients := map[string]wsrpc.Client{} t.Run("successful query", func(t *testing.T) { - c := mocks.MockWSRPCClient{ + c := &mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { require.NotNil(t, in) assert.Equal(t, hexutil.Encode(sampleFeedID[:]), hexutil.Encode(in.FeedId)) @@ -241,7 +302,8 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.NoError(t, err) @@ -249,32 +311,34 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { assert.Equal(t, 42, int(*bn)) }) t.Run("successful query returning nil report (new feed)", func(t *testing.T) { - c := mocks.MockWSRPCClient{ + c := &mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { out = new(pb.LatestReportResponse) out.Report = nil return out, nil }, } - mt := NewTransmitter(lggr, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), codec) + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.NoError(t, err) assert.Nil(t, bn) }) t.Run("failing query", func(t *testing.T) { - c := mocks.MockWSRPCClient{ + c := &mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { return nil, errors.New("something exploded") }, } - mt := NewTransmitter(lggr, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), codec) + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") }) t.Run("return feed ID is wrong", func(t *testing.T) { - c := mocks.MockWSRPCClient{ + c := &mocks.MockWSRPCClient{ LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (out *pb.LatestReportResponse, err error) { require.NotNil(t, in) assert.Equal(t, hexutil.Encode(sampleFeedID[:]), hexutil.Encode(in.FeedId)) @@ -285,9 +349,37 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, c, sampleClientPubKey, 0, sampleFeedID, db, pgtest.NewQConfig(true), codec) + clients[sURL] = c + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "latestReport failed; mismatched feed IDs, expected: 0x1c916b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472, got: 0x") }) } + +func Test_sortReportsLatestFirst(t *testing.T) { + reports := []*pb.Report{ + nil, + {ObservationsTimestamp: 1}, + {ObservationsTimestamp: 1}, + {ObservationsTimestamp: 2}, + {CurrentBlockNumber: 1}, + nil, + {CurrentBlockNumber: 2}, + {}, + } + + sortReportsLatestFirst(reports) + + assert.Equal(t, int64(2), reports[0].ObservationsTimestamp) + assert.Equal(t, int64(1), reports[1].ObservationsTimestamp) + assert.Equal(t, int64(1), reports[2].ObservationsTimestamp) + assert.Equal(t, int64(0), reports[3].ObservationsTimestamp) + assert.Equal(t, int64(2), reports[3].CurrentBlockNumber) + assert.Equal(t, int64(0), reports[4].ObservationsTimestamp) + assert.Equal(t, int64(1), reports[4].CurrentBlockNumber) + assert.Equal(t, int64(0), reports[5].ObservationsTimestamp) + assert.Equal(t, int64(0), reports[5].CurrentBlockNumber) + assert.Nil(t, reports[6]) + assert.Nil(t, reports[7]) +} diff --git a/core/services/relay/evm/mercury/wsrpc/cache/cache.go b/core/services/relay/evm/mercury/wsrpc/cache/cache.go index 712e62e5c0e..adc439e802b 100644 --- a/core/services/relay/evm/mercury/wsrpc/cache/cache.go +++ b/core/services/relay/evm/mercury/wsrpc/cache/cache.go @@ -174,7 +174,7 @@ type memCache struct { func newMemCache(lggr logger.Logger, client Client, cfg Config) *memCache { return &memCache{ services.StateMachine{}, - lggr.Named("MemCache"), + lggr.Named("MemCache").Named(client.ServerURL()), client, cfg, sync.Map{}, diff --git a/core/services/relay/evm/mercury/wsrpc/client.go b/core/services/relay/evm/mercury/wsrpc/client.go index d420a17a1a4..b5d784face0 100644 --- a/core/services/relay/evm/mercury/wsrpc/client.go +++ b/core/services/relay/evm/mercury/wsrpc/client.go @@ -110,7 +110,7 @@ func newClient(lggr logger.Logger, clientPrivKey csakey.KeyV2, serverPubKey []by csaKey: clientPrivKey, serverPubKey: serverPubKey, serverURL: serverURL, - logger: lggr.Named("WSRPC").With("mercuryServerURL", serverURL), + logger: lggr.Named("WSRPC").Named(serverURL).With("serverURL", serverURL), chResetTransport: make(chan struct{}, 1), cacheSet: cacheSet, chStop: make(services.StopChan), @@ -217,7 +217,7 @@ func (w *client) Close() error { } func (w *client) Name() string { - return "EVM.Mercury.WSRPCClient" + return w.logger.Name() } func (w *client) HealthReport() map[string]error { diff --git a/core/services/relay/evm/mercury/wsrpc/mocks/mocks.go b/core/services/relay/evm/mercury/wsrpc/mocks/mocks.go index c0caf0dee12..61912c26b02 100644 --- a/core/services/relay/evm/mercury/wsrpc/mocks/mocks.go +++ b/core/services/relay/evm/mercury/wsrpc/mocks/mocks.go @@ -13,20 +13,20 @@ type MockWSRPCClient struct { LatestReportF func(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) } -func (m MockWSRPCClient) Name() string { return "" } -func (m MockWSRPCClient) Start(context.Context) error { return nil } -func (m MockWSRPCClient) Close() error { return nil } -func (m MockWSRPCClient) HealthReport() map[string]error { return map[string]error{} } -func (m MockWSRPCClient) Ready() error { return nil } -func (m MockWSRPCClient) Transmit(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { +func (m *MockWSRPCClient) Name() string { return "" } +func (m *MockWSRPCClient) Start(context.Context) error { return nil } +func (m *MockWSRPCClient) Close() error { return nil } +func (m *MockWSRPCClient) HealthReport() map[string]error { return map[string]error{} } +func (m *MockWSRPCClient) Ready() error { return nil } +func (m *MockWSRPCClient) Transmit(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { return m.TransmitF(ctx, in) } -func (m MockWSRPCClient) LatestReport(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { +func (m *MockWSRPCClient) LatestReport(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { return m.LatestReportF(ctx, in) } -func (m MockWSRPCClient) ServerURL() string { return "mock server url" } +func (m *MockWSRPCClient) ServerURL() string { return "mock server url" } -func (m MockWSRPCClient) RawClient() pb.MercuryClient { return nil } +func (m *MockWSRPCClient) RawClient() pb.MercuryClient { return nil } type MockConn struct { State connectivity.State diff --git a/core/services/relay/evm/mercury/wsrpc/pool.go b/core/services/relay/evm/mercury/wsrpc/pool.go index dd85381469b..94c48736f5d 100644 --- a/core/services/relay/evm/mercury/wsrpc/pool.go +++ b/core/services/relay/evm/mercury/wsrpc/pool.go @@ -6,10 +6,10 @@ import ( "sync" "github.com/smartcontractkit/wsrpc/credentials" - "golang.org/x/exp/maps" + + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/csakey" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -106,7 +106,7 @@ func (conn *connection) forceCloseAll() (err error) { } type Pool interface { - services.ServiceCtx + services.Service // Checkout gets a wsrpc.Client for the given arguments // The same underlying client can be checked out multiple times, the pool // handles lifecycle management. The consumer can treat it as if it were @@ -226,6 +226,6 @@ func (p *pool) Ready() error { func (p *pool) HealthReport() map[string]error { hp := map[string]error{p.Name(): p.Ready()} - maps.Copy(hp, p.cacheSet.HealthReport()) + services.CopyHealth(hp, p.cacheSet.HealthReport()) return hp } diff --git a/core/store/migrate/migrations/0227_add_server_url_to_transmit_requests.sql b/core/store/migrate/migrations/0227_add_server_url_to_transmit_requests.sql new file mode 100644 index 00000000000..32608b47163 --- /dev/null +++ b/core/store/migrate/migrations/0227_add_server_url_to_transmit_requests.sql @@ -0,0 +1,9 @@ +-- +goose Up +ALTER TABLE mercury_transmit_requests DROP CONSTRAINT mercury_transmit_requests_pkey; +DELETE FROM mercury_transmit_requests; +ALTER TABLE mercury_transmit_requests ADD COLUMN server_url TEXT NOT NULL; +ALTER TABLE mercury_transmit_requests ADD PRIMARY KEY (server_url, payload_hash); + +-- +goose Down +ALTER TABLE mercury_transmit_requests DROP COLUMN server_url; +ALTER TABLE mercury_transmit_requests ADD PRIMARY KEY (payload_hash); From 12c35c0d22cf205bd02b4d4d6fa676e8cb6a067a Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Mon, 11 Mar 2024 14:03:52 -0400 Subject: [PATCH 2/4] Add changeset --- .changeset/strange-tables-occur.md | 21 +++++++++++++++++++++ pnpm-lock.yaml | 2 +- 2 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 .changeset/strange-tables-occur.md diff --git a/.changeset/strange-tables-occur.md b/.changeset/strange-tables-occur.md new file mode 100644 index 00000000000..68a39e43b54 --- /dev/null +++ b/.changeset/strange-tables-occur.md @@ -0,0 +1,21 @@ +--- +"chainlink": patch +--- + +Mercury jobs can now broadcast to multiple mercury servers. + +Previously, a single mercury server would be specified in a job spec as so: + +```toml +[pluginConfig] +serverURL = "example.com/foo" +serverPubKey = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" +``` + +You may now specify multiple mercury servers, as so: + +```toml +[pluginConfig] +servers = { "example.com/foo" = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", "mercury2.example:1234/bar" = "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" } +``` + diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9d92cedbfa4..20afef2e663 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2,7 +2,7 @@ lockfileVersion: '6.0' settings: autoInstallPeers: true - excludeLinksFromLockfile: false + excludeLinksFromLockfile: true devDependencies: '@changesets/changelog-github': From e862d3d6dd51b9b5c2e1dadb9e5cdeabd4d9956f Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Tue, 12 Mar 2024 08:45:43 -0400 Subject: [PATCH 3/4] Fix race --- core/services/relay/evm/mercury/orm.go | 4 ++-- core/services/relay/evm/mercury/persistence_manager.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/services/relay/evm/mercury/orm.go b/core/services/relay/evm/mercury/orm.go index 1a8c7a93bc9..19f2aa8e16b 100644 --- a/core/services/relay/evm/mercury/orm.go +++ b/core/services/relay/evm/mercury/orm.go @@ -172,6 +172,6 @@ func (o *orm) LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOp } func hashPayload(payload []byte) []byte { - h := sha256.New() - return h.Sum(payload) + checksum := sha256.Sum256(payload) + return checksum[:] } diff --git a/core/services/relay/evm/mercury/persistence_manager.go b/core/services/relay/evm/mercury/persistence_manager.go index bcf6403ffeb..dc805c12e7b 100644 --- a/core/services/relay/evm/mercury/persistence_manager.go +++ b/core/services/relay/evm/mercury/persistence_manager.go @@ -40,7 +40,7 @@ type PersistenceManager struct { func NewPersistenceManager(lggr logger.Logger, serverURL string, orm ORM, jobID int32, maxTransmitQueueSize int, flushDeletesFrequency, pruneFrequency time.Duration) *PersistenceManager { return &PersistenceManager{ - lggr: lggr.Named("MercuryPersistenceManager"), + lggr: lggr.Named("MercuryPersistenceManager").With("serverURL", serverURL), orm: orm, serverURL: serverURL, stopCh: make(services.StopChan), From 5ae5c8c3edfbde89f8853d97fb08d6f0eac6f413 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 14 Mar 2024 09:58:24 -0400 Subject: [PATCH 4/4] Bump migration version --- ..._requests.sql => 0228_add_server_url_to_transmit_requests.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename core/store/migrate/migrations/{0227_add_server_url_to_transmit_requests.sql => 0228_add_server_url_to_transmit_requests.sql} (100%) diff --git a/core/store/migrate/migrations/0227_add_server_url_to_transmit_requests.sql b/core/store/migrate/migrations/0228_add_server_url_to_transmit_requests.sql similarity index 100% rename from core/store/migrate/migrations/0227_add_server_url_to_transmit_requests.sql rename to core/store/migrate/migrations/0228_add_server_url_to_transmit_requests.sql