Skip to content

Commit

Permalink
MultiNode Integration: Initial Setup (#824)
Browse files Browse the repository at this point in the history
* MultiNode integration setup

* Update MultiNode files

* Add MultiNode flag

* Remove internal dependency

* Fix build

* Fix import cycle

* tidy

* Update client_test.go

* lint

* Fix duplicate metrics

* Add chain multinode flag

* Extend client

* Address comments

* lint

* Fix lint overflow issues

* Update transaction_sender.go

* Fix lint

* Validate node config

* Update toml.go

* Add SendOnly nodes

* Use test context

* lint
  • Loading branch information
DylanTinianov authored Sep 20, 2024
1 parent eb6c83b commit 6ad738f
Show file tree
Hide file tree
Showing 27 changed files with 3,222 additions and 29 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/go-viper/mapstructure/v2 v2.1.0
github.com/google/uuid v1.6.0
github.com/hashicorp/go-plugin v1.6.2-0.20240829161738-06afb6d7ae99
github.com/jpillora/backoff v1.0.0
github.com/pelletier/go-toml/v2 v2.2.0
github.com/prometheus/client_golang v1.17.0
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913191949-44d96950c886
Expand Down Expand Up @@ -58,7 +59,6 @@ require (
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/invopop/jsonschema v0.12.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
Expand Down
92 changes: 87 additions & 5 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"math/big"
"math/rand"
"strconv"
Expand All @@ -22,6 +23,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
Expand Down Expand Up @@ -85,6 +88,10 @@ type chain struct {
balanceMonitor services.Service
lggr logger.Logger

// if multiNode is enabled, the clientCache will not be used
multiNode *mn.MultiNode[mn.StringID, *client.Client]
txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.Client]

// tracking node chain id for verification
clientCache map[string]*verifiedCachedClient // map URL -> {client, chainId} [mainnet/testnet/devnet/localnet]
clientLock sync.RWMutex
Expand Down Expand Up @@ -114,7 +121,8 @@ func (v *verifiedCachedClient) verifyChainID() (bool, error) {
v.chainIDVerifiedLock.Lock()
defer v.chainIDVerifiedLock.Unlock()

v.chainID, err = v.ReaderWriter.ChainID()
strID, err := v.ReaderWriter.ChainID(context.Background())
v.chainID = strID.String()
if err != nil {
v.chainIDVerified = false
return v.chainIDVerified, fmt.Errorf("failed to fetch ChainID in verifiedCachedClient: %w", err)
Expand Down Expand Up @@ -186,13 +194,13 @@ func (v *verifiedCachedClient) LatestBlockhash() (*rpc.GetLatestBlockhashResult,
return v.ReaderWriter.LatestBlockhash()
}

func (v *verifiedCachedClient) ChainID() (string, error) {
func (v *verifiedCachedClient) ChainID(ctx context.Context) (mn.StringID, error) {
verified, err := v.verifyChainID()
if !verified {
return "", err
}

return v.chainID, nil
return mn.StringID(v.chainID), nil
}

func (v *verifiedCachedClient) GetFeeForMessage(msg string) (uint64, error) {
Expand Down Expand Up @@ -221,6 +229,66 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L
lggr: logger.Named(lggr, "Chain"),
clientCache: map[string]*verifiedCachedClient{},
}

if cfg.MultiNodeEnabled() {
chainFamily := "solana"

mnCfg := cfg.MultiNodeConfig()

var nodes []mn.Node[mn.StringID, *client.Client]
var sendOnlyNodes []mn.SendOnlyNode[mn.StringID, *client.Client]

for i, nodeInfo := range cfg.ListNodes() {
rpcClient, err := client.NewClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name))
if err != nil {
lggr.Warnw("failed to create client", "name", *nodeInfo.Name, "solana-url", nodeInfo.URL.String(), "err", err.Error())
return nil, fmt.Errorf("failed to create client: %w", err)
}

newNode := mn.NewNode[mn.StringID, *client.Head, *client.Client](
mnCfg, mnCfg, lggr, *nodeInfo.URL.URL(), nil, *nodeInfo.Name,
i, mn.StringID(id), 0, rpcClient, chainFamily)

if nodeInfo.SendOnly {
sendOnlyNodes = append(sendOnlyNodes, newNode)
} else {
nodes = append(nodes, newNode)
}
}

multiNode := mn.NewMultiNode[mn.StringID, *client.Client](
lggr,
mn.NodeSelectionModeRoundRobin,
0,
nodes,
sendOnlyNodes,
mn.StringID(id),
chainFamily,
mnCfg.DeathDeclarationDelay(),
)

// TODO: implement error classification; move logic to separate file if large
// TODO: might be useful to reference anza-xyz/agave@master/sdk/src/transaction/error.rs
classifySendError := func(tx *solanago.Transaction, err error) mn.SendTxReturnCode {
return 0 // TODO ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, false)
}

txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.Client](
lggr,
mn.StringID(id),
chainFamily,
multiNode,
classifySendError,
0, // use the default value provided by the implementation
)

ch.multiNode = multiNode
ch.txSender = txSender

// clientCache will not be used if multinode is enabled
ch.clientCache = nil
}

tc := func() (client.ReaderWriter, error) {
return ch.getClient()
}
Expand Down Expand Up @@ -330,6 +398,10 @@ func (c *chain) ChainID() string {

// getClient returns a client, randomly selecting one from available and valid nodes
func (c *chain) getClient() (client.ReaderWriter, error) {
if c.cfg.MultiNodeEnabled() {
return c.multiNode.SelectRPC()
}

var node *config.Node
var client client.ReaderWriter
nodes := c.cfg.ListNodes()
Expand Down Expand Up @@ -409,7 +481,12 @@ func (c *chain) Start(ctx context.Context) error {
c.lggr.Debug("Starting txm")
c.lggr.Debug("Starting balance monitor")
var ms services.MultiStart
return ms.Start(ctx, c.txm, c.balanceMonitor)
startAll := []services.StartClose{c.txm, c.balanceMonitor}
if c.cfg.MultiNodeEnabled() {
c.lggr.Debug("Starting multinode")
startAll = append(startAll, c.multiNode, c.txSender)
}
return ms.Start(ctx, startAll...)
})
}

Expand All @@ -418,7 +495,12 @@ func (c *chain) Close() error {
c.lggr.Debug("Stopping")
c.lggr.Debug("Stopping txm")
c.lggr.Debug("Stopping balance monitor")
return services.CloseAll(c.txm, c.balanceMonitor)
closeAll := []io.Closer{c.txm, c.balanceMonitor}
if c.cfg.MultiNodeEnabled() {
c.lggr.Debug("Stopping multinode")
closeAll = append(closeAll, c.multiNode, c.txSender)
}
return services.CloseAll(closeAll...)
})
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/solana/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
solcfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
Expand Down Expand Up @@ -174,7 +175,7 @@ func TestSolanaChain_VerifiedClient(t *testing.T) {
testChain.id = "incorrect"
c, err = testChain.verifiedClient(node)
assert.NoError(t, err)
_, err = c.ChainID()
_, err = c.ChainID(tests.Context(t))
// expect error from id mismatch (even if using a cached client) when performing RPC calls
assert.Error(t, err)
assert.Equal(t, fmt.Sprintf("client returned mismatched chain id (expected: %s, got: %s): %s", "incorrect", "devnet", node.URL), err.Error())
Expand Down
82 changes: 78 additions & 4 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"errors"
"fmt"
"math/big"
"time"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"golang.org/x/sync/singleflight"
Expand Down Expand Up @@ -33,7 +36,7 @@ type Reader interface {
Balance(addr solana.PublicKey) (uint64, error)
SlotHeight() (uint64, error)
LatestBlockhash() (*rpc.GetLatestBlockhashResult, error)
ChainID() (string, error)
ChainID(ctx context.Context) (mn.StringID, error)
GetFeeForMessage(msg string) (uint64, error)
GetLatestBlock() (*rpc.GetBlockResult, error)
}
Expand Down Expand Up @@ -65,6 +68,27 @@ type Client struct {
requestGroup *singleflight.Group
}

type Head struct {
rpc.GetBlockResult
}

func (h *Head) BlockNumber() int64 {
if !h.IsValid() {
return 0
}
// nolint:gosec
// G115: integer overflow conversion uint64 -> int64
return int64(*h.BlockHeight)
}

func (h *Head) BlockDifficulty() *big.Int {
return nil
}

func (h *Head) IsValid() bool {
return h.BlockHeight != nil
}

func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) {
return &Client{
url: endpoint,
Expand All @@ -79,6 +103,56 @@ func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration,
}, nil
}

var _ mn.RPCClient[mn.StringID, *Head] = (*Client)(nil)
var _ mn.SendTxRPCClient[*solana.Transaction] = (*Client)(nil)

// TODO: BCI-4061: Implement Client for MultiNode

func (c *Client) Dial(ctx context.Context) error {
//TODO implement me
panic("implement me")
}

func (c *Client) SubscribeToHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) {
//TODO implement me
panic("implement me")
}

func (c *Client) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) {
//TODO implement me
panic("implement me")
}

func (c *Client) Ping(ctx context.Context) error {
//TODO implement me
panic("implement me")
}

func (c *Client) IsSyncing(ctx context.Context) (bool, error) {
//TODO implement me
panic("implement me")
}

func (c *Client) UnsubscribeAllExcept(subs ...mn.Subscription) {
//TODO implement me
panic("implement me")
}

func (c *Client) Close() {
//TODO implement me
panic("implement me")
}

func (c *Client) GetInterceptedChainInfo() (latest, highestUserObservations mn.ChainInfo) {
//TODO implement me
panic("implement me")
}

func (c *Client) SendTransaction(ctx context.Context, tx *solana.Transaction) error {
// TODO: Implement
return nil
}

func (c *Client) latency(name string) func() {
start := time.Now()
return func() {
Expand Down Expand Up @@ -142,11 +216,11 @@ func (c *Client) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) {
return v.(*rpc.GetLatestBlockhashResult), err
}

func (c *Client) ChainID() (string, error) {
func (c *Client) ChainID(ctx context.Context) (mn.StringID, error) {
done := c.latency("chain_id")
defer done()

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()
v, err, _ := c.requestGroup.Do("GetGenesisHash", func() (interface{}, error) {
return c.rpc.GetGenesisHash(ctx)
Expand All @@ -168,7 +242,7 @@ func (c *Client) ChainID() (string, error) {
c.log.Warnf("unknown genesis hash - assuming solana chain is 'localnet'")
network = "localnet"
}
return network, nil
return mn.StringID(network), nil
}

func (c *Client) GetFeeForMessage(msg string) (uint64, error) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/solana/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)
Expand Down Expand Up @@ -76,9 +77,9 @@ func TestClient_Reader_Integration(t *testing.T) {
assert.Equal(t, uint64(5000), fee)

// get chain ID based on gensis hash
network, err := c.ChainID()
network, err := c.ChainID(context.Background())
assert.NoError(t, err)
assert.Equal(t, "localnet", network)
assert.Equal(t, mn.StringID("localnet"), network)

// get account info (also tested inside contract_test)
res, err := c.GetAccountInfoWithOpts(context.TODO(), solana.PublicKey{}, &rpc.GetAccountInfoOpts{Commitment: rpc.CommitmentFinalized})
Expand Down Expand Up @@ -120,9 +121,9 @@ func TestClient_Reader_ChainID(t *testing.T) {

// get chain ID based on gensis hash
for _, n := range networks {
network, err := c.ChainID()
network, err := c.ChainID(context.Background())
assert.NoError(t, err)
assert.Equal(t, n, network)
assert.Equal(t, mn.StringID(n), network)
}
}

Expand Down
Loading

0 comments on commit 6ad738f

Please sign in to comment.