From 80bc9f23d437ac7bc0c9c8f9df40e69b5ddd8675 Mon Sep 17 00:00:00 2001 From: krehermann Date: Tue, 9 Jan 2024 08:06:22 -0700 Subject: [PATCH] Replace mercury eventbroadcaster with polling (#11707) * POC example code to replace mercury eventbroadcaster with polling * remove Mercury Notify optimization * fix bad merge --- core/services/relay/evm/evm.go | 1 - .../relay/evm/mercury/config_poller.go | 60 +------------------ .../relay/evm/mercury/config_poller_test.go | 51 ---------------- .../relay/evm/mercury/helpers_test.go | 12 +--- 4 files changed, 5 insertions(+), 119 deletions(-) diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 37e6d64452e..8cdbfe76058 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -342,7 +342,6 @@ func newConfigProvider(lggr logger.Logger, chain legacyevm.Chain, opts *types.Re chain.LogPoller(), aggregatorAddress, *relayConfig.FeedID, - eventBroadcaster, // TODO: Does mercury need to support config contract? DF-19182 ) } else { diff --git a/core/services/relay/evm/mercury/config_poller.go b/core/services/relay/evm/mercury/config_poller.go index 8964a283049..98ef78020c7 100644 --- a/core/services/relay/evm/mercury/config_poller.go +++ b/core/services/relay/evm/mercury/config_poller.go @@ -91,8 +91,6 @@ type ConfigPoller struct { destChainLogPoller logpoller.LogPoller addr common.Address feedId common.Hash - notifyCh chan struct{} - subscription pg.Subscription } func FilterName(addr common.Address, feedID common.Hash) string { @@ -100,43 +98,30 @@ func FilterName(addr common.Address, feedID common.Hash) string { } // NewConfigPoller creates a new Mercury ConfigPoller -func NewConfigPoller(lggr logger.Logger, destChainPoller logpoller.LogPoller, addr common.Address, feedId common.Hash, eventBroadcaster pg.EventBroadcaster) (*ConfigPoller, error) { +func NewConfigPoller(lggr logger.Logger, destChainPoller logpoller.LogPoller, addr common.Address, feedId common.Hash) (*ConfigPoller, error) { err := destChainPoller.RegisterFilter(logpoller.Filter{Name: FilterName(addr, feedId), EventSigs: []common.Hash{FeedScopedConfigSet}, Addresses: []common.Address{addr}}) if err != nil { return nil, err } - subscription, err := eventBroadcaster.Subscribe(pg.ChannelInsertOnEVMLogs, "") - if err != nil { - return nil, err - } - cp := &ConfigPoller{ lggr: lggr, destChainLogPoller: destChainPoller, addr: addr, feedId: feedId, - notifyCh: make(chan struct{}, 1), - subscription: subscription, } return cp, nil } -// Start the subscription to Postgres' notify events. -func (cp *ConfigPoller) Start() { - go cp.startLogSubscription() -} +func (cp *ConfigPoller) Start() {} -// Close the subscription to Postgres' notify events. func (cp *ConfigPoller) Close() error { - cp.subscription.Close() return nil } -// Notify abstracts the logpoller.LogPoller Notify() implementation func (cp *ConfigPoller) Notify() <-chan struct{} { - return cp.notifyCh + return nil // rely on libocr's builtin config polling } // Replay abstracts the logpoller.LogPoller Replay() implementation @@ -190,42 +175,3 @@ func (cp *ConfigPoller) LatestBlockHeight(ctx context.Context) (blockHeight uint } return uint64(latest.BlockNumber), nil } - -func (cp *ConfigPoller) startLogSubscription() { - // trim the leading 0x to make it comparable to pg's hex encoding. - addressPgHex := cp.addr.Hex()[2:] - feedIdPgHex := cp.feedId.Hex()[2:] - - for { - event, ok := <-cp.subscription.Events() - if !ok { - cp.lggr.Debug("eventBroadcaster subscription closed, exiting notify loop") - return - } - - // Event payload should look like: "
:," - addressTopicValues := strings.Split(event.Payload, ":") - if len(addressTopicValues) < 2 { - cp.lggr.Warnf("invalid event from %s channel: %s", pg.ChannelInsertOnEVMLogs, event.Payload) - continue - } - - address := addressTopicValues[0] - if address != addressPgHex { - continue - } - - topicValues := strings.Split(addressTopicValues[1], ",") - if len(topicValues) <= feedIdTopicIndex { - continue - } - if topicValues[feedIdTopicIndex] != feedIdPgHex { - continue - } - - select { - case cp.notifyCh <- struct{}{}: - default: - } - } -} diff --git a/core/services/relay/evm/mercury/config_poller_test.go b/core/services/relay/evm/mercury/config_poller_test.go index 71e88d41a22..f828938f954 100644 --- a/core/services/relay/evm/mercury/config_poller_test.go +++ b/core/services/relay/evm/mercury/config_poller_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/onsi/gomega" "github.com/pkg/errors" confighelper2 "github.com/smartcontractkit/libocr/offchainreporting2plus/confighelper" @@ -18,7 +17,6 @@ import ( evmutils "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -27,7 +25,6 @@ func TestMercuryConfigPoller(t *testing.T) { feedIDBytes := [32]byte(feedID) th := SetupTH(t, feedID) - th.subscription.On("Events").Return(nil) notify := th.configPoller.Notify() assert.Empty(t, notify) @@ -115,54 +112,6 @@ func TestMercuryConfigPoller(t *testing.T) { assert.Equal(t, offchainConfig, newConfig.OffchainConfig) } -func TestNotify(t *testing.T) { - testutils.SkipFlakey(t, "https://smartcontract-it.atlassian.net/browse/BCF-2746") - feedIDStr := "8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1" - feedIDBytes, err := hexutil.Decode("0x" + feedIDStr) - require.NoError(t, err) - feedID := common.BytesToHash(feedIDBytes) - - eventCh := make(chan pg.Event) - - th := SetupTH(t, feedID) - th.subscription.On("Events").Return((<-chan pg.Event)(eventCh)) - - addressPgHex := th.verifierAddress.Hex()[2:] - - notify := th.configPoller.Notify() - assert.Empty(t, notify) - - eventCh <- pg.Event{} // Empty event - assert.Empty(t, notify) - - eventCh <- pg.Event{Payload: addressPgHex} // missing topic values - assert.Empty(t, notify) - - eventCh <- pg.Event{Payload: addressPgHex + ":val1"} // missing feedId topic value - assert.Empty(t, notify) - - eventCh <- pg.Event{Payload: addressPgHex + ":8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1,val2"} // wrong index - assert.Empty(t, notify) - - eventCh <- pg.Event{Payload: addressPgHex + ":val1,val2,8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1"} // wrong index - assert.Empty(t, notify) - - eventCh <- pg.Event{Payload: addressPgHex + ":val1,0x8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1"} // 0x prefix - assert.Empty(t, notify) - - eventCh <- pg.Event{Payload: "wrong_address:val1,8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1"} // wrong address - assert.Empty(t, notify) - - eventCh <- pg.Event{Payload: addressPgHex + ":val1,8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1"} // expected event to notify on - assert.Eventually(t, func() bool { <-notify; return true }, time.Second, 10*time.Millisecond) - - eventCh <- pg.Event{Payload: addressPgHex + ":val1,8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1"} // try second time - assert.Eventually(t, func() bool { <-notify; return true }, time.Second, 10*time.Millisecond) - - eventCh <- pg.Event{Payload: addressPgHex + ":val1,8257737fdf4f79639585fd0ed01bea93c248a9ad940e98dd27f41c9b6230fed1:additional"} // additional colon separated parts - assert.Eventually(t, func() bool { <-notify; return true }, time.Second, 10*time.Millisecond) -} - func onchainPublicKeyToAddress(publicKeys []types.OnchainPublicKey) (addresses []common.Address, err error) { for _, signer := range publicKeys { if len(signer) != 20 { diff --git a/core/services/relay/evm/mercury/helpers_test.go b/core/services/relay/evm/mercury/helpers_test.go index 0703f878eed..f1686ee00c8 100644 --- a/core/services/relay/evm/mercury/helpers_test.go +++ b/core/services/relay/evm/mercury/helpers_test.go @@ -18,6 +18,7 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" @@ -26,7 +27,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" - pgmocks "github.com/smartcontractkit/chainlink/v2/core/services/pg/mocks" reportcodecv1 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v1/reportcodec" reportcodecv2 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2/reportcodec" reportcodecv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec" @@ -143,8 +143,6 @@ type TestHarness struct { verifierAddress common.Address verifierContract *verifier.Verifier logPoller logpoller.LogPoller - eventBroadcaster *pgmocks.EventBroadcaster - subscription *pgmocks.Subscription } func SetupTH(t *testing.T, feedID common.Hash) TestHarness { @@ -170,13 +168,9 @@ func SetupTH(t *testing.T, feedID common.Hash) TestHarness { lggr := logger.TestLogger(t) lorm := logpoller.NewORM(big.NewInt(1337), db, lggr, cfg) lp := logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000) - eventBroadcaster := pgmocks.NewEventBroadcaster(t) - subscription := pgmocks.NewSubscription(t) servicetest.Run(t, lp) - eventBroadcaster.On("Subscribe", "evm.insert_on_logs", "").Return(subscription, nil) - - configPoller, err := NewConfigPoller(lggr, lp, verifierAddress, feedID, eventBroadcaster) + configPoller, err := NewConfigPoller(lggr, lp, verifierAddress, feedID) require.NoError(t, err) configPoller.Start() @@ -188,7 +182,5 @@ func SetupTH(t *testing.T, feedID common.Hash) TestHarness { verifierAddress: verifierAddress, verifierContract: verifierContract, logPoller: lp, - eventBroadcaster: eventBroadcaster, - subscription: subscription, } }