Skip to content

Commit

Permalink
Implement RPC Client Methods (#845)
Browse files Browse the repository at this point in the history
* MultiNode integration setup

* Update MultiNode files

* Add MultiNode flag

* Remove internal dependency

* Fix build

* Fix import cycle

* tidy

* Update client_test.go

* lint

* Fix duplicate metrics

* Add chain multinode flag

* Extend client

* Implement rpc client methods

* Add defaults

* Add latest block methods

* Address comments

* lint

* Fix lint overflow issues

* Update transaction_sender.go

* Fix lint

* Validate node config

* Update toml.go

* Add SendOnly nodes

* Use pointers on config

* Add test outlines

* Use test context

* Use configured selection mode

* Set defaults

* lint

* Add nil check

* Add client test

* Add subscription test

* tidy

* Fix imports

* Update chain_test.go

* Update multinode.go

* Add comments

* Update multinode.go

* Wrap multinode config

* Fix imports

* Update .golangci.yml

* Use MultiNode

* Add multinode to txm

* Use MultiNode

* Update chain.go

* Update balance_test.go

* Add retries

* Fix head

* Update client.go

* lint

* lint

* Address comments

* Remove total difficulty

* Register polling subs

* Extract MultiNodeClient

* Remove caching changes

* Undo cache changes

* Fix tests

* Fix variables

* Fix imports

* lint

* Update txm_internal_test.go

* lint

* Update multinode_client.go

* Add tests

* Add dial comment

* Update pkg/solana/client/multinode_client.go

Co-authored-by: Dmytro Haidashenko <[email protected]>

---------

Co-authored-by: Dmytro Haidashenko <[email protected]>
  • Loading branch information
DylanTinianov and dhaidashenko authored Oct 17, 2024
1 parent 564b037 commit 665f519
Show file tree
Hide file tree
Showing 5 changed files with 530 additions and 88 deletions.
26 changes: 14 additions & 12 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ type chain struct {
lggr logger.Logger

// if multiNode is enabled, the clientCache will not be used
multiNode *mn.MultiNode[mn.StringID, *client.Client]
txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.Client]
multiNode *mn.MultiNode[mn.StringID, *client.MultiNodeClient]
txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.MultiNodeClient]

// tracking node chain id for verification
clientCache map[string]*verifiedCachedClient // map URL -> {client, chainId} [mainnet/testnet/devnet/localnet]
Expand Down Expand Up @@ -235,28 +235,29 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L

mnCfg := &cfg.MultiNode

var nodes []mn.Node[mn.StringID, *client.Client]
var sendOnlyNodes []mn.SendOnlyNode[mn.StringID, *client.Client]
var nodes []mn.Node[mn.StringID, *client.MultiNodeClient]
var sendOnlyNodes []mn.SendOnlyNode[mn.StringID, *client.MultiNodeClient]

for i, nodeInfo := range cfg.ListNodes() {
rpcClient, err := client.NewClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name))
rpcClient, err := client.NewMultiNodeClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name))
if err != nil {
lggr.Warnw("failed to create client", "name", *nodeInfo.Name, "solana-url", nodeInfo.URL.String(), "err", err.Error())
return nil, fmt.Errorf("failed to create client: %w", err)
}

newNode := mn.NewNode[mn.StringID, *client.Head, *client.Client](
mnCfg, mnCfg, lggr, *nodeInfo.URL.URL(), nil, *nodeInfo.Name,
i, mn.StringID(id), 0, rpcClient, chainFamily)

if nodeInfo.SendOnly {
sendOnlyNodes = append(sendOnlyNodes, newNode)
newSendOnly := mn.NewSendOnlyNode[mn.StringID, *client.MultiNodeClient](
lggr, *nodeInfo.URL.URL(), *nodeInfo.Name, mn.StringID(id), rpcClient)
sendOnlyNodes = append(sendOnlyNodes, newSendOnly)
} else {
newNode := mn.NewNode[mn.StringID, *client.Head, *client.MultiNodeClient](
mnCfg, mnCfg, lggr, *nodeInfo.URL.URL(), nil, *nodeInfo.Name,
i, mn.StringID(id), 0, rpcClient, chainFamily)
nodes = append(nodes, newNode)
}
}

multiNode := mn.NewMultiNode[mn.StringID, *client.Client](
multiNode := mn.NewMultiNode[mn.StringID, *client.MultiNodeClient](
lggr,
mnCfg.SelectionMode(),
mnCfg.LeaseDuration(),
Expand All @@ -273,7 +274,7 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L
return 0 // TODO ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, false)
}

txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.Client](
txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.MultiNodeClient](
lggr,
mn.StringID(id),
chainFamily,
Expand Down Expand Up @@ -395,6 +396,7 @@ func (c *chain) ChainID() string {
}

// getClient returns a client, randomly selecting one from available and valid nodes
// If multinode is enabled, it will return a client using the multinode selection instead.
func (c *chain) getClient() (client.ReaderWriter, error) {
if c.cfg.MultiNode.Enabled() {
return c.multiNode.SelectRPC()
Expand Down
59 changes: 59 additions & 0 deletions pkg/solana/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestSolanaChain_GetClient(t *testing.T) {
ChainID: ptr("devnet"),
Chain: ch,
}
cfg.SetDefaults()
testChain := chain{
id: "devnet",
cfg: cfg,
Expand Down Expand Up @@ -125,6 +126,61 @@ func TestSolanaChain_GetClient(t *testing.T) {
assert.NoError(t, err)
}

func TestSolanaChain_MultiNode_GetClient(t *testing.T) {
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
out := fmt.Sprintf(TestSolanaGenesisHashTemplate, client.MainnetGenesisHash) // mainnet genesis hash
if !strings.Contains(r.URL.Path, "/mismatch") {
// devnet gensis hash
out = fmt.Sprintf(TestSolanaGenesisHashTemplate, client.DevnetGenesisHash)
}
_, err := w.Write([]byte(out))
require.NoError(t, err)
}))
defer mockServer.Close()

ch := solcfg.Chain{}
ch.SetDefaults()
mn := solcfg.MultiNodeConfig{
MultiNode: solcfg.MultiNode{
Enabled: ptr(true),
},
}
mn.SetDefaults()

cfg := &solcfg.TOMLConfig{
ChainID: ptr("devnet"),
Chain: ch,
MultiNode: mn,
}
cfg.Nodes = []*solcfg.Node{
{
Name: ptr("devnet"),
URL: config.MustParseURL(mockServer.URL + "/1"),
},
{
Name: ptr("devnet"),
URL: config.MustParseURL(mockServer.URL + "/2"),
},
}

testChain, err := newChain("devnet", cfg, nil, logger.Test(t))
require.NoError(t, err)

err = testChain.Start(tests.Context(t))
require.NoError(t, err)
defer func() {
closeErr := testChain.Close()
require.NoError(t, closeErr)
}()

selectedClient, err := testChain.getClient()
assert.NoError(t, err)

id, err := selectedClient.ChainID(tests.Context(t))
assert.NoError(t, err)
assert.Equal(t, "devnet", id.String())
}

func TestSolanaChain_VerifiedClient(t *testing.T) {
ctx := tests.Context(t)
called := false
Expand Down Expand Up @@ -157,6 +213,8 @@ func TestSolanaChain_VerifiedClient(t *testing.T) {
ChainID: ptr("devnet"),
Chain: ch,
}
cfg.SetDefaults()

testChain := chain{
cfg: cfg,
lggr: logger.Test(t),
Expand Down Expand Up @@ -205,6 +263,7 @@ func TestSolanaChain_VerifiedClient_ParallelClients(t *testing.T) {
Enabled: ptr(true),
Chain: ch,
}
cfg.SetDefaults()
testChain := chain{
id: "devnet",
cfg: cfg,
Expand Down
78 changes: 2 additions & 76 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ import (
"context"
"errors"
"fmt"
"math/big"
"time"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"golang.org/x/sync/singleflight"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"golang.org/x/sync/singleflight"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)
Expand Down Expand Up @@ -68,27 +65,6 @@ type Client struct {
requestGroup *singleflight.Group
}

type Head struct {
rpc.GetBlockResult
}

func (h *Head) BlockNumber() int64 {
if !h.IsValid() {
return 0
}
// nolint:gosec
// G115: integer overflow conversion uint64 -&gt; int64
return int64(*h.BlockHeight)
}

func (h *Head) BlockDifficulty() *big.Int {
return nil
}

func (h *Head) IsValid() bool {
return h.BlockHeight != nil
}

func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) {
return &Client{
url: endpoint,
Expand All @@ -103,56 +79,6 @@ func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration,
}, nil
}

var _ mn.RPCClient[mn.StringID, *Head] = (*Client)(nil)
var _ mn.SendTxRPCClient[*solana.Transaction] = (*Client)(nil)

// TODO: BCI-4061: Implement Client for MultiNode

func (c *Client) Dial(ctx context.Context) error {
//TODO implement me
panic("implement me")
}

func (c *Client) SubscribeToHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) {
//TODO implement me
panic("implement me")
}

func (c *Client) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) {
//TODO implement me
panic("implement me")
}

func (c *Client) Ping(ctx context.Context) error {
//TODO implement me
panic("implement me")
}

func (c *Client) IsSyncing(ctx context.Context) (bool, error) {
//TODO implement me
panic("implement me")
}

func (c *Client) UnsubscribeAllExcept(subs ...mn.Subscription) {
//TODO implement me
panic("implement me")
}

func (c *Client) Close() {
//TODO implement me
panic("implement me")
}

func (c *Client) GetInterceptedChainInfo() (latest, highestUserObservations mn.ChainInfo) {
//TODO implement me
panic("implement me")
}

func (c *Client) SendTransaction(ctx context.Context, tx *solana.Transaction) error {
// TODO: Implement
return nil
}

func (c *Client) latency(name string) func() {
start := time.Now()
return func() {
Expand Down
Loading

0 comments on commit 665f519

Please sign in to comment.