Skip to content

Commit

Permalink
Implement networking V2 for libocr (smartcontractkit#4597)
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav authored Jul 6, 2021
1 parent e759c1c commit 6350464
Show file tree
Hide file tree
Showing 13 changed files with 476 additions and 165 deletions.
14 changes: 14 additions & 0 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/smartcontractkit/chainlink/core/static"
"github.com/smartcontractkit/chainlink/core/store/dialects"

cryptop2p "github.com/libp2p/go-libp2p-core/crypto"
p2ppeer "github.com/libp2p/go-libp2p-core/peer"
"github.com/smartcontractkit/chainlink/core/assets"
"github.com/smartcontractkit/chainlink/core/auth"
Expand Down Expand Up @@ -119,6 +120,7 @@ var (
FluxAggAddress = common.HexToAddress("0x3cCad4715152693fE3BC4460591e3D3Fbd071b42")
storeCounter uint64
minimumContractPayment = assets.NewLink(100)
source rand.Source
)

func init() {
Expand Down Expand Up @@ -154,6 +156,9 @@ func init() {
logger.Debugf("Using seed: %v", seed)
rand.Seed(seed)

// Also seed the local source
source = rand.NewSource(seed)

defaultP2PPeerID, err := p2ppeer.Decode(DefaultPeerID)
if err != nil {
panic(err)
Expand Down Expand Up @@ -2120,3 +2125,12 @@ func MustSendingKeys(t *testing.T, ethKeyStore *keystore.Eth) (keys []ethkey.Key
require.NoError(t, err)
return keys
}

func MustRandomP2PPeerID(t *testing.T) p2ppeer.ID {
reader := rand.New(source)
p2pPrivkey, _, err := cryptop2p.GenerateEd25519Key(reader)
require.NoError(t, err)
id, err := p2ppeer.IDFromPrivateKey(p2pPrivkey)
require.NoError(t, err)
return id
}
6 changes: 4 additions & 2 deletions core/services/offchainreporting/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (d Delegate) ServicesForSpec(jobSpec job.Job) (services []job.Service, err
if err != nil {
return nil, err
}
v2BootstrapPeers := d.config.P2PV2Bootstrappers()

loggerWith := logger.CreateLogger(logger.Default.With(
"contractAddress", concreteSpec.ContractAddress,
Expand Down Expand Up @@ -171,7 +172,7 @@ func (d Delegate) ServicesForSpec(jobSpec job.Job) (services []job.Service, err
if concreteSpec.IsBootstrapPeer {
bootstrapper, err := ocr.NewBootstrapNode(ocr.BootstrapNodeArgs{
BootstrapperFactory: peerWrapper.Peer,
Bootstrappers: bootstrapPeers,
V1Bootstrappers: bootstrapPeers,
ContractConfigTracker: tracker,
Database: ocrdb,
LocalConfig: lc,
Expand Down Expand Up @@ -233,7 +234,8 @@ func (d Delegate) ServicesForSpec(jobSpec job.Job) (services []job.Service, err
PrivateKeys: &ocrkey,
BinaryNetworkEndpointFactory: peerWrapper.Peer,
Logger: ocrLogger,
Bootstrappers: bootstrapPeers,
V1Bootstrappers: bootstrapPeers,
V2Bootstrappers: v2BootstrapPeers,
MonitoringEndpoint: d.monitoringEndpoint,
})
if err != nil {
Expand Down
62 changes: 62 additions & 0 deletions core/services/offchainreporting/discoverer_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package offchainreporting

import (
"context"
"database/sql"

"github.com/lib/pq"
p2ppeer "github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
ocrnetworking "github.com/smartcontractkit/libocr/networking"
"go.uber.org/multierr"
)

var _ ocrnetworking.DiscovererDatabase = &DiscovererDatabase{}

type DiscovererDatabase struct {
db *sql.DB
peerID string
}

func NewDiscovererDatabase(db *sql.DB, peerID p2ppeer.ID) *DiscovererDatabase {
return &DiscovererDatabase{
db,
peerID.Pretty(),
}
}

// StoreAnnouncement has key-value-store semantics and stores a peerID (key) and an associated serialized
// announcement (value).
func (d *DiscovererDatabase) StoreAnnouncement(ctx context.Context, peerID string, ann []byte) error {
_, err := d.db.ExecContext(ctx, `
INSERT INTO offchainreporting_discoverer_announcements (local_peer_id, remote_peer_id, ann, created_at, updated_at)
VALUES ($1,$2,$3,NOW(),NOW()) ON CONFLICT (local_peer_id, remote_peer_id) DO UPDATE SET
ann = EXCLUDED.ann,
updated_at = EXCLUDED.updated_at
;`, d.peerID, peerID, ann)
return errors.Wrap(err, "DiscovererDatabase failed to StoreAnnouncement")
}

// ReadAnnouncements returns one serialized announcement (if available) for each of the peerIDs in the form of a map
// keyed by each announcement's corresponding peer ID.
func (d *DiscovererDatabase) ReadAnnouncements(ctx context.Context, peerIDs []string) (map[string][]byte, error) {
rows, err := d.db.QueryContext(ctx, `
SELECT remote_peer_id, ann FROM offchainreporting_discoverer_announcements WHERE remote_peer_id = ANY($1) AND local_peer_id = $2`, pq.Array(peerIDs), d.peerID)
if err != nil {
return nil, errors.Wrap(err, "DiscovererDatabase failed to ReadAnnouncements")
}
results := make(map[string][]byte)
for rows.Next() {
var peerID string
var ann []byte
err := rows.Scan(&peerID, &ann)
if err != nil {
return nil, multierr.Combine(err, rows.Close())
}
results[peerID] = ann
}
if err := rows.Close(); err != nil {
return nil, errors.WithStack(err)
}
return results, nil
}
83 changes: 83 additions & 0 deletions core/services/offchainreporting/discoverer_database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package offchainreporting_test

import (
"testing"

"github.com/smartcontractkit/chainlink/core/internal/cltest"
"github.com/smartcontractkit/chainlink/core/services/offchainreporting"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_DiscovererDatabase(t *testing.T) {
store, cleanup := cltest.NewStore(t)
defer cleanup()

db := store.DB
require.NoError(t, db.Exec(`SET CONSTRAINTS offchainreporting_discoverer_announcements_local_peer_id_fkey DEFERRED`).Error)

sqlDB := store.MustSQLDB()

localPeerID1 := cltest.MustRandomP2PPeerID(t)
localPeerID2 := cltest.MustRandomP2PPeerID(t)

dd1 := offchainreporting.NewDiscovererDatabase(sqlDB, localPeerID1)
dd2 := offchainreporting.NewDiscovererDatabase(sqlDB, localPeerID2)

t.Run("StoreAnnouncement writes a value", func(t *testing.T) {
ann := []byte{1, 2, 3}
err := dd1.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

// test upsert
ann = []byte{4, 5, 6}
err = dd1.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

// write a different value
ann = []byte{7, 8, 9}
err = dd1.StoreAnnouncement(ctx, "remote2", ann)
assert.NoError(t, err)
})

t.Run("ReadAnnouncements reads values filtered by given peerIDs", func(t *testing.T) {
announcements, err := dd1.ReadAnnouncements(ctx, []string{"remote1", "remote2"})
require.NoError(t, err)

assert.Len(t, announcements, 2)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
assert.Equal(t, []byte{7, 8, 9}, announcements["remote2"])

announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)

assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})

t.Run("is scoped to local peer ID", func(t *testing.T) {
ann := []byte{10, 11, 12}
err := dd2.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

announcements, err := dd2.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{10, 11, 12}, announcements["remote1"])

announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})

t.Run("persists data across restarts", func(t *testing.T) {
dd3 := offchainreporting.NewDiscovererDatabase(sqlDB, localPeerID1)

announcements, err := dd3.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])

})
}
28 changes: 20 additions & 8 deletions core/services/offchainreporting/peer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package offchainreporting
import (
"strings"

p2ppeer "github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/services/keystore"
Expand Down Expand Up @@ -99,6 +100,11 @@ func (p *SingletonPeerWrapper) Start() error {
if err != nil {
return errors.Wrap(err, "could not make new pstorewrapper")
}
sqlDB, err := p.db.DB()
if err != nil {
return err
}
discovererDB := NewDiscovererDatabase(sqlDB, p2ppeer.ID(p.PeerID))

// If the P2PAnnounceIP is set we must also set the P2PAnnouncePort
// Fallback to P2PListenPort if it wasn't made explicit
Expand All @@ -112,21 +118,27 @@ func (p *SingletonPeerWrapper) Start() error {
peerLogger := NewLogger(logger.Default, p.config.OCRTraceLogging(), func(string) {})

p.Peer, err = ocrnetworking.NewPeer(ocrnetworking.PeerConfig{
PrivKey: key.PrivKey,
ListenIP: p.config.P2PListenIP(),
ListenPort: listenPort,
AnnounceIP: p.config.P2PAnnounceIP(),
AnnouncePort: announcePort,
Logger: peerLogger,
Peerstore: p.pstoreWrapper.Peerstore,
NetworkingStack: p.config.P2PNetworkingStack(),
PrivKey: key.PrivKey,
V1ListenIP: p.config.P2PListenIP(),
V1ListenPort: listenPort,
V1AnnounceIP: p.config.P2PAnnounceIP(),
V1AnnouncePort: announcePort,
Logger: peerLogger,
V1Peerstore: p.pstoreWrapper.Peerstore,
V2ListenAddresses: p.config.P2PV2ListenAddresses(),
V2AnnounceAddresses: p.config.P2PV2AnnounceAddresses(),
V2DeltaReconcile: p.config.P2PV2DeltaReconcile().Duration(),
V2DeltaDial: p.config.P2PV2DeltaDial().Duration(),
V2DiscovererDatabase: discovererDB,
EndpointConfig: ocrnetworking.EndpointConfig{
IncomingMessageBufferSize: p.config.OCRIncomingMessageBufferSize(),
OutgoingMessageBufferSize: p.config.OCROutgoingMessageBufferSize(),
NewStreamTimeout: p.config.OCRNewStreamTimeout(),
DHTLookupInterval: p.config.OCRDHTLookupInterval(),
BootstrapCheckInterval: p.config.OCRBootstrapCheckInterval(),
},
DHTAnnouncementCounterUserPrefix: p.config.P2PDHTAnnouncementCounterUserPrefix(),
V1DHTAnnouncementCounterUserPrefix: p.config.P2PDHTAnnouncementCounterUserPrefix(),
})
if err != nil {
return errors.Wrap(err, "error calling NewPeer")
Expand Down
31 changes: 31 additions & 0 deletions core/store/migrations/0044_create_table_ocr_discoverer_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package migrations

import (
"gorm.io/gorm"
)

const up44 = `
CREATE TABLE offchainreporting_discoverer_announcements (
local_peer_id text NOT NULL REFERENCES encrypted_p2p_keys (peer_id) DEFERRABLE INITIALLY IMMEDIATE,
remote_peer_id text NOT NULL,
ann bytea NOT NULL,
created_at timestamptz not null,
updated_at timestamptz not null,
PRIMARY KEY(local_peer_id, remote_peer_id)
);
`
const down44 = `
DROP TABLE offchainreporting_discoverer_announcements;
`

func init() {
Migrations = append(Migrations, &Migration{
ID: "0044_create_table_offchainreporting_discoverer_announcements",
Migrate: func(db *gorm.DB) error {
return db.Exec(up44).Error
},
Rollback: func(db *gorm.DB) error {
return db.Exec(down44).Error
},
})
}
68 changes: 67 additions & 1 deletion core/store/orm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/multiformats/go-multiaddr"

ocrnetworking "github.com/smartcontractkit/libocr/networking"
ocr "github.com/smartcontractkit/libocr/offchainreporting"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting/types"

Expand Down Expand Up @@ -1357,6 +1358,71 @@ func (c Config) P2PBootstrapPeers(override []string) ([]string, error) {
return []string{}, nil
}

// P2PNetworkingStack returns the preferred networking stack for libocr
func (c Config) P2PNetworkingStack() (n ocrnetworking.NetworkingStack) {
str := c.P2PNetworkingStackRaw()
err := n.UnmarshalText([]byte(str))
if err != nil {
logger.Fatalf("P2PNetworkingStack failed to unmarshal '%s': %s", str, err)
}
return n
}

// P2PNetworkingStackRaw returns the raw string passed as networking stack
func (c Config) P2PNetworkingStackRaw() string {
return c.viper.GetString(EnvVarName("P2PNetworkingStack"))
}

// P2PV2ListenAddresses contains the addresses the peer will listen to on the network in <host>:<port> form as
// accepted by net.Listen, but host and port must be fully specified and cannot be empty.
func (c Config) P2PV2ListenAddresses() []string {
return c.viper.GetStringSlice(EnvVarName("P2PV2ListenAddresses"))
}

// P2PV2AnnounceAddresses contains the addresses the peer will advertise on the network in <host>:<port> form as
// accepted by net.Dial. The addresses should be reachable by peers of interest.
func (c Config) P2PV2AnnounceAddresses() []string {
if c.viper.IsSet(EnvVarName("P2PV2AnnounceAddresses")) {
return c.viper.GetStringSlice(EnvVarName("P2PV2AnnounceAddresses"))
}
return c.P2PV2ListenAddresses()
}

// P2PV2AnnounceAddressesRaw returns the raw value passed in
func (c Config) P2PV2AnnounceAddressesRaw() []string {
return c.viper.GetStringSlice(EnvVarName("P2PV2AnnounceAddresses"))
}

// P2PV2Bootstrappers returns the default bootstrapper peers for libocr's v2
// networking stack
func (c Config) P2PV2Bootstrappers() (locators []ocrtypes.BootstrapperLocator) {
bootstrappers := c.P2PV2BootstrappersRaw()
for _, s := range bootstrappers {
var locator ocrtypes.BootstrapperLocator
err := locator.UnmarshalText([]byte(s))
if err != nil {
logger.Fatalf("invalid format for bootstrapper '%s', got error: %s", s, err)
}
locators = append(locators, locator)
}
return
}

// P2PV2BootstrappersRaw returns the raw strings for v2 bootstrap peers
func (c Config) P2PV2BootstrappersRaw() []string {
return c.viper.GetStringSlice(EnvVarName("P2PV2Bootstrappers"))
}

// P2PV2DeltaDial controls how far apart Dial attempts are
func (c Config) P2PV2DeltaDial() models.Duration {
return models.MustMakeDuration(c.getWithFallback("P2PV2DeltaDial", parseDuration).(time.Duration))
}

// P2PV2DeltaReconcile controls how often a Reconcile message is sent to every peer.
func (c Config) P2PV2DeltaReconcile() models.Duration {
return models.MustMakeDuration(c.getWithFallback("P2PV2DeltaReconcile", parseDuration).(time.Duration))
}

// Port represents the port Chainlink should listen on for client requests.
func (c Config) Port() uint16 {
return c.getWithFallback("Port", parseUint16).(uint16)
Expand Down Expand Up @@ -1498,7 +1564,7 @@ func (c Config) getWithFallback(name string, parser func(string) (interface{}, e

v, err := parser(defaultValue)
if err != nil {
log.Fatalf(fmt.Sprintf(`Invalid default for %s: "%s"`, name, defaultValue))
log.Fatalf(`Invalid default for %s: "%s" (%s)`, name, defaultValue, err)
}
return v
}
Expand Down
Loading

0 comments on commit 6350464

Please sign in to comment.