Skip to content

Commit

Permalink
Multi-mercury server
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Mar 14, 2024
1 parent 587c8ed commit 3d9de4b
Show file tree
Hide file tree
Showing 18 changed files with 746 additions and 378 deletions.
85 changes: 65 additions & 20 deletions core/services/ocr2/plugins/mercury/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net/url"
"regexp"
"sort"

pkgerrors "github.com/pkg/errors"

Expand All @@ -17,8 +18,18 @@ import (
)

type PluginConfig struct {
// Must either specify details for single server OR multiple servers.
// Specifying both is not valid.

// Single mercury server
// LEGACY: This is the old way of specifying a mercury server
RawServerURL string `json:"serverURL" toml:"serverURL"`
ServerPubKey utils.PlainHexBytes `json:"serverPubKey" toml:"serverPubKey"`

// Multi mercury servers
// This is the preferred way to specify mercury server(s)
Servers map[string]utils.PlainHexBytes `json:"servers" toml:"servers"`

// InitialBlockNumber allows to set a custom "validFromBlockNumber" for
// the first ever report in the case of a brand new feed, where the mercury
// server does not have any previous reports. For a brand new feed, this
Expand All @@ -29,26 +40,64 @@ type PluginConfig struct {
NativeFeedID *mercuryutils.FeedID `json:"nativeFeedID" toml:"nativeFeedID"`
}

func ValidatePluginConfig(config PluginConfig, feedID mercuryutils.FeedID) (merr error) {
if config.RawServerURL == "" {
merr = errors.New("mercury: ServerURL must be specified")
func validateURL(rawServerURL string) error {
var normalizedURI string
if schemeRegexp.MatchString(rawServerURL) {
normalizedURI = rawServerURL
} else {
var normalizedURI string
if schemeRegexp.MatchString(config.RawServerURL) {
normalizedURI = config.RawServerURL
normalizedURI = fmt.Sprintf("wss://%s", rawServerURL)
}
uri, err := url.ParseRequestURI(normalizedURI)
if err != nil {
return pkgerrors.Errorf(`Mercury: invalid value for ServerURL, got: %q`, rawServerURL)
}
if uri.Scheme != "wss" {
return pkgerrors.Errorf(`Mercury: invalid scheme specified for MercuryServer, got: %q (scheme: %q) but expected a websocket url e.g. "192.0.2.2:4242" or "wss://192.0.2.2:4242"`, rawServerURL, uri.Scheme)
}
return nil
}

type Server struct {
URL string
PubKey utils.PlainHexBytes
}

func (p PluginConfig) GetServers() (servers []Server) {
if p.RawServerURL != "" {
return []Server{{URL: wssRegexp.ReplaceAllString(p.RawServerURL, ""), PubKey: p.ServerPubKey}}
}
for url, pubKey := range p.Servers {
servers = append(servers, Server{URL: wssRegexp.ReplaceAllString(url, ""), PubKey: pubKey})
}
sort.Slice(servers, func(i, j int) bool {
return servers[i].URL < servers[j].URL
})
return
}

func ValidatePluginConfig(config PluginConfig, feedID mercuryutils.FeedID) (merr error) {
if len(config.Servers) > 0 {
if config.RawServerURL != "" || len(config.ServerPubKey) != 0 {
merr = errors.Join(merr, errors.New("Mercury: Servers and RawServerURL/ServerPubKey may not be specified together"))
} else {
normalizedURI = fmt.Sprintf("wss://%s", config.RawServerURL)
for serverName, serverPubKey := range config.Servers {
if err := validateURL(serverName); err != nil {
merr = errors.Join(merr, pkgerrors.Wrap(err, "Mercury: invalid value for ServerURL"))
}
if len(serverPubKey) != 32 {
merr = errors.Join(merr, errors.New("Mercury: ServerPubKey must be a 32-byte hex string"))
}
}
}
uri, err := url.ParseRequestURI(normalizedURI)
if err != nil {
merr = pkgerrors.Wrap(err, "Mercury: invalid value for ServerURL")
} else if uri.Scheme != "wss" {
merr = pkgerrors.Errorf(`Mercury: invalid scheme specified for MercuryServer, got: %q (scheme: %q) but expected a websocket url e.g. "192.0.2.2:4242" or "wss://192.0.2.2:4242"`, config.RawServerURL, uri.Scheme)
} else if config.RawServerURL == "" {
merr = errors.Join(merr, errors.New("Mercury: Servers must be specified"))
} else {
if err := validateURL(config.RawServerURL); err != nil {
merr = errors.Join(merr, pkgerrors.Wrap(err, "Mercury: invalid value for ServerURL"))
}
if len(config.ServerPubKey) != 32 {
merr = errors.Join(merr, errors.New("Mercury: If RawServerURL is specified, ServerPubKey is also required and must be a 32-byte hex string"))
}
}

if len(config.ServerPubKey) != 32 {
merr = errors.Join(merr, errors.New("mercury: ServerPubKey is required and must be a 32-byte hex string"))
}

switch feedID.Version() {
Expand Down Expand Up @@ -78,7 +127,3 @@ func ValidatePluginConfig(config PluginConfig, feedID mercuryutils.FeedID) (merr

var schemeRegexp = regexp.MustCompile(`^[a-zA-Z][a-zA-Z0-9+.-]*://`)
var wssRegexp = regexp.MustCompile(`^wss://`)

func (p PluginConfig) ServerURL() string {
return wssRegexp.ReplaceAllString(p.RawServerURL, "")
}
90 changes: 80 additions & 10 deletions core/services/ocr2/plugins/mercury/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/pelletier/go-toml/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var v1FeedId = [32]uint8{00, 01, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114}
Expand All @@ -31,6 +33,46 @@ func Test_PluginConfig(t *testing.T) {
err = ValidatePluginConfig(mc, v1FeedId)
require.NoError(t, err)
})
t.Run("with multiple server URLs", func(t *testing.T) {
t.Run("if no ServerURL/ServerPubKey is specified", func(t *testing.T) {
rawToml := `
Servers = { "example.com:80" = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", "example2.invalid:1234" = "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" }
`

var mc PluginConfig
err := toml.Unmarshal([]byte(rawToml), &mc)
require.NoError(t, err)

assert.Len(t, mc.Servers, 2)
assert.Equal(t, "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", mc.Servers["example.com:80"].String())
assert.Equal(t, "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", mc.Servers["example2.invalid:1234"].String())

err = ValidatePluginConfig(mc, v1FeedId)
require.NoError(t, err)
})
t.Run("if ServerURL or ServerPubKey is specified", func(t *testing.T) {
rawToml := `
Servers = { "example.com:80" = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", "example2.invalid:1234" = "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" }
ServerURL = "example.com:80"
`
var mc PluginConfig
err := toml.Unmarshal([]byte(rawToml), &mc)
require.NoError(t, err)

err = ValidatePluginConfig(mc, v1FeedId)
require.EqualError(t, err, "Mercury: Servers and RawServerURL/ServerPubKey may not be specified together")

rawToml = `
Servers = { "example.com:80" = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", "example2.invalid:1234" = "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" }
ServerPubKey = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93"
`
err = toml.Unmarshal([]byte(rawToml), &mc)
require.NoError(t, err)

err = ValidatePluginConfig(mc, v1FeedId)
require.EqualError(t, err, "Mercury: Servers and RawServerURL/ServerPubKey may not be specified together")
})
})

t.Run("with invalid values", func(t *testing.T) {
rawToml := `
Expand All @@ -53,7 +95,7 @@ func Test_PluginConfig(t *testing.T) {
err = ValidatePluginConfig(mc, v1FeedId)
require.Error(t, err)
assert.Contains(t, err.Error(), `Mercury: invalid scheme specified for MercuryServer, got: "http://example.com" (scheme: "http") but expected a websocket url e.g. "192.0.2.2:4242" or "wss://192.0.2.2:4242"`)
assert.Contains(t, err.Error(), `mercury: ServerPubKey is required and must be a 32-byte hex string`)
assert.Contains(t, err.Error(), `If RawServerURL is specified, ServerPubKey is also required and must be a 32-byte hex string`)
})

t.Run("with unnecessary values", func(t *testing.T) {
Expand Down Expand Up @@ -135,13 +177,41 @@ func Test_PluginConfig(t *testing.T) {
})
}

func Test_PluginConfig_ServerURL(t *testing.T) {
pc := PluginConfig{RawServerURL: "example.com"}
assert.Equal(t, "example.com", pc.ServerURL())
pc = PluginConfig{RawServerURL: "wss://example.com"}
assert.Equal(t, "example.com", pc.ServerURL())
pc = PluginConfig{RawServerURL: "example.com:1234/foo"}
assert.Equal(t, "example.com:1234/foo", pc.ServerURL())
pc = PluginConfig{RawServerURL: "wss://example.com:1234/foo"}
assert.Equal(t, "example.com:1234/foo", pc.ServerURL())
func Test_PluginConfig_GetServers(t *testing.T) {
t.Run("with single server", func(t *testing.T) {
pubKey := utils.PlainHexBytes([]byte{1, 2, 3})
pc := PluginConfig{RawServerURL: "example.com", ServerPubKey: pubKey}
require.Len(t, pc.GetServers(), 1)
assert.Equal(t, "example.com", pc.GetServers()[0].URL)
assert.Equal(t, pubKey, pc.GetServers()[0].PubKey)

pc = PluginConfig{RawServerURL: "wss://example.com", ServerPubKey: pubKey}
require.Len(t, pc.GetServers(), 1)
assert.Equal(t, "example.com", pc.GetServers()[0].URL)
assert.Equal(t, pubKey, pc.GetServers()[0].PubKey)

pc = PluginConfig{RawServerURL: "example.com:1234/foo", ServerPubKey: pubKey}
require.Len(t, pc.GetServers(), 1)
assert.Equal(t, "example.com:1234/foo", pc.GetServers()[0].URL)
assert.Equal(t, pubKey, pc.GetServers()[0].PubKey)

pc = PluginConfig{RawServerURL: "wss://example.com:1234/foo", ServerPubKey: pubKey}
require.Len(t, pc.GetServers(), 1)
assert.Equal(t, "example.com:1234/foo", pc.GetServers()[0].URL)
assert.Equal(t, pubKey, pc.GetServers()[0].PubKey)
})

t.Run("with multiple servers", func(t *testing.T) {
servers := map[string]utils.PlainHexBytes{
"example.com:80": utils.PlainHexBytes([]byte{1, 2, 3}),
"mercuryserver.invalid:1234/foo": utils.PlainHexBytes([]byte{4, 5, 6}),
}
pc := PluginConfig{Servers: servers}

require.Len(t, pc.GetServers(), 2)
assert.Equal(t, "example.com:80", pc.GetServers()[0].URL)
assert.Equal(t, utils.PlainHexBytes{1, 2, 3}, pc.GetServers()[0].PubKey)
assert.Equal(t, "mercuryserver.invalid:1234/foo", pc.GetServers()[1].URL)
assert.Equal(t, utils.PlainHexBytes{4, 5, 6}, pc.GetServers()[1].PubKey)
})
}
28 changes: 16 additions & 12 deletions core/services/ocr2/plugins/mercury/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"math/big"
"net"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -389,31 +390,36 @@ func addV3MercuryJob(
bootstrapNodePort int,
bmBridge,
bidBridge,
askBridge,
serverURL string,
serverPubKey,
askBridge string,
servers map[string]string,
clientPubKey ed25519.PublicKey,
feedName string,
feedID [32]byte,
linkFeedID [32]byte,
nativeFeedID [32]byte,
) {
srvs := make([]string, 0, len(servers))
for u, k := range servers {
srvs = append(srvs, fmt.Sprintf("%q = %q", u, k))
}
serversStr := fmt.Sprintf("{ %s }", strings.Join(srvs, ", "))

node.AddJob(t, fmt.Sprintf(`
type = "offchainreporting2"
schemaVersion = 1
name = "mercury-%[1]d-%[12]s"
name = "mercury-%[1]d-%[11]s"
forwardingAllowed = false
maxTaskDuration = "1s"
contractID = "%[2]s"
feedID = "0x%[11]x"
feedID = "0x%[10]x"
contractConfigTrackerPollInterval = "1s"
ocrKeyBundleID = "%[3]s"
p2pv2Bootstrappers = [
"%[4]s"
]
relay = "evm"
pluginType = "mercury"
transmitterID = "%[10]x"
transmitterID = "%[9]x"
observationSource = """
// Benchmark Price
price1 [type=bridge name="%[5]s" timeout="50ms" requestData="{\\"data\\":{\\"from\\":\\"ETH\\",\\"to\\":\\"USD\\"}}"];
Expand All @@ -438,10 +444,9 @@ observationSource = """
"""
[pluginConfig]
serverURL = "%[8]s"
serverPubKey = "%[9]x"
linkFeedID = "0x%[13]x"
nativeFeedID = "0x%[14]x"
servers = %[8]s
linkFeedID = "0x%[12]x"
nativeFeedID = "0x%[13]x"
[relayConfig]
chainID = 1337
Expand All @@ -453,8 +458,7 @@ chainID = 1337
bmBridge,
bidBridge,
askBridge,
serverURL,
serverPubKey,
serversStr,
clientPubKey,
feedID,
feedName,
Expand Down
49 changes: 27 additions & 22 deletions core/services/ocr2/plugins/mercury/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,16 +788,6 @@ func integration_MercuryV3(t *testing.T) {
feedM[feeds[i].id] = feeds[i]
}

reqs := make(chan request)
serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(-1))
serverPubKey := serverKey.PublicKey
srv := NewMercuryServer(t, ed25519.PrivateKey(serverKey.Raw()), reqs, func() []byte {
report, err := (&reportcodecv3.ReportCodec{}).BuildReport(v3.ReportFields{BenchmarkPrice: big.NewInt(234567), Bid: big.NewInt(1), Ask: big.NewInt(1), LinkFee: big.NewInt(1), NativeFee: big.NewInt(1)})
if err != nil {
panic(err)
}
return report
})
clientCSAKeys := make([]csakey.KeyV2, n+1)
clientPubKeys := make([]ed25519.PublicKey, n+1)
for i := 0; i < n+1; i++ {
Expand All @@ -806,7 +796,25 @@ func integration_MercuryV3(t *testing.T) {
clientCSAKeys[i] = key
clientPubKeys[i] = key.PublicKey
}
serverURL := startMercuryServer(t, srv, clientPubKeys)

// Test multi-send to three servers
const nSrvs = 3
reqChs := make([]chan request, nSrvs)
servers := make(map[string]string)
for i := 0; i < nSrvs; i++ {
k := csakey.MustNewV2XXXTestingOnly(big.NewInt(int64(-(i + 1))))
reqs := make(chan request, 100)
srv := NewMercuryServer(t, ed25519.PrivateKey(k.Raw()), reqs, func() []byte {
report, err := (&reportcodecv3.ReportCodec{}).BuildReport(v3.ReportFields{BenchmarkPrice: big.NewInt(234567), Bid: big.NewInt(1), Ask: big.NewInt(1), LinkFee: big.NewInt(1), NativeFee: big.NewInt(1)})
if err != nil {
panic(err)
}
return report
})
serverURL := startMercuryServer(t, srv, clientPubKeys)
reqChs[i] = reqs
servers[serverURL] = fmt.Sprintf("%x", k.PublicKey)
}
chainID := testutils.SimulatedChainID

steve, backend, verifier, verifierAddress := setupBlockchain(t)
Expand Down Expand Up @@ -895,8 +903,7 @@ func integration_MercuryV3(t *testing.T) {
bmBridge,
bidBridge,
askBridge,
serverURL,
serverPubKey,
servers,
clientPubKeys[i],
feed.name,
feed.id,
Expand Down Expand Up @@ -963,8 +970,8 @@ func integration_MercuryV3(t *testing.T) {
backend.Commit()
}

runTestSetup := func() {
// Expect at least one report per feed from each oracle
runTestSetup := func(reqs chan request) {
// Expect at least one report per feed from each oracle, per server
seen := make(map[[32]byte]map[credentials.StaticSizedPublicKey]struct{})
for i := range feeds {
// feedID will be deleted when all n oracles have reported
Expand Down Expand Up @@ -1017,12 +1024,10 @@ func integration_MercuryV3(t *testing.T) {
}
}

t.Run("receives at least one report per feed from each oracle when EAs are at 100% reliability", func(t *testing.T) {
runTestSetup()
})

t.Run("receives at least one report per feed from each oracle when EAs are at 80% reliability", func(t *testing.T) {
pError.Store(20)
runTestSetup()
t.Run("receives at least one report per feed for every server from each oracle when EAs are at 100% reliability", func(t *testing.T) {
for i := 0; i < nSrvs; i++ {
reqs := reqChs[i]
runTestSetup(reqs)
}
})
}
Loading

0 comments on commit 3d9de4b

Please sign in to comment.