Skip to content

Commit

Permalink
Refactor SDK (#2452)
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Kim <[email protected]>
Co-authored-by: Dan Laine <[email protected]>
Co-authored-by: Stephen Buttolph <[email protected]>
  • Loading branch information
3 people authored Dec 13, 2023
1 parent 82fbc97 commit 832632a
Show file tree
Hide file tree
Showing 12 changed files with 414 additions and 223 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/DataDog/zstd v1.5.2
github.com/Microsoft/go-winio v0.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231212220437-f9ec2ecc2714
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213002358-53424dd5480c
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231212220437-f9ec2ecc2714 h1:dhuxCYjB+4isvJMHViHkS50NgkQ/dHY2ZZmk8ESAyxw=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231212220437-f9ec2ecc2714/go.mod h1:LwQhIuKmd8JPemahE1f7TvsE3WRzCFdjvNWBPxXSaNo=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213002358-53424dd5480c h1:bWPdqoi+J6ztfVhEl7iexFSaKyaFlMpIltIMVTpXDQY=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213002358-53424dd5480c/go.mod h1:v8pqR8wC9VuyPTEbI6/wmflXPIAmUr6SUwEKP+hi9iU=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
9 changes: 5 additions & 4 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type CrossChainAppResponseCallback func(

type Client struct {
handlerID uint64
handlerIDStr string
handlerPrefix []byte
router *router
sender common.AppSender
Expand Down Expand Up @@ -95,8 +96,8 @@ func (c *Client) AppRequest(
}

c.router.pendingAppRequests[requestID] = pendingAppRequest{
AppResponseCallback: onResponse,
metrics: c.router.handlers[c.handlerID].metrics,
handlerID: c.handlerIDStr,
callback: onResponse,
}
c.router.requestID += 2
}
Expand Down Expand Up @@ -158,8 +159,8 @@ func (c *Client) CrossChainAppRequest(
}

c.router.pendingCrossChainAppRequests[requestID] = pendingCrossChainAppRequest{
CrossChainAppResponseCallback: onResponse,
metrics: c.router.handlers[c.handlerID].metrics,
handlerID: c.handlerIDStr,
callback: onResponse,
}
c.router.requestID += 2

Expand Down
12 changes: 7 additions & 5 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ func TestGossiperGossip(t *testing.T) {
responseSender := &common.FakeSender{
SentAppResponse: make(chan []byte, 1),
}
responseNetwork := p2p.NewNetwork(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "")
responseNetwork, err := p2p.NewNetwork(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "")
require.NoError(err)

responseBloom, err := NewBloomFilter(1000, 0.01)
require.NoError(err)
responseSet := testSet{
Expand All @@ -133,14 +135,14 @@ func TestGossiperGossip(t *testing.T) {

handler, err := NewHandler[*testTx](responseSet, tt.config, prometheus.NewRegistry())
require.NoError(err)
_, err = responseNetwork.NewAppProtocol(0x0, handler)
require.NoError(err)
require.NoError(responseNetwork.AddHandler(0x0, handler))

requestSender := &common.FakeSender{
SentAppRequest: make(chan []byte, 1),
}

requestNetwork := p2p.NewNetwork(logging.NoLog{}, requestSender, prometheus.NewRegistry(), "")
requestNetwork, err := p2p.NewNetwork(logging.NoLog{}, requestSender, prometheus.NewRegistry(), "")
require.NoError(err)
require.NoError(requestNetwork.Connected(context.Background(), ids.EmptyNodeID, nil))

bloom, err := NewBloomFilter(1000, 0.01)
Expand All @@ -153,7 +155,7 @@ func TestGossiperGossip(t *testing.T) {
require.NoError(requestSet.Add(item))
}

requestClient, err := requestNetwork.NewAppProtocol(0x0, nil)
requestClient := requestNetwork.NewClient(0x0)
require.NoError(err)

config := Config{
Expand Down
36 changes: 28 additions & 8 deletions network/p2p/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,48 @@ func (NoOpHandler) CrossChainAppRequest(context.Context, ids.ID, time.Time, []by
return nil, nil
}

func NewValidatorHandler(
handler Handler,
validatorSet ValidatorSet,
log logging.Logger,
) *ValidatorHandler {
return &ValidatorHandler{
handler: handler,
validatorSet: validatorSet,
log: log,
}
}

// ValidatorHandler drops messages from non-validators
type ValidatorHandler struct {
Handler
ValidatorSet ValidatorSet
Log logging.Logger
handler Handler
validatorSet ValidatorSet
log logging.Logger
}

func (v ValidatorHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) {
if !v.ValidatorSet.Has(ctx, nodeID) {
v.Log.Debug("dropping message", zap.Stringer("nodeID", nodeID))
if !v.validatorSet.Has(ctx, nodeID) {
v.log.Debug(
"dropping message",
zap.Stringer("nodeID", nodeID),
zap.String("reason", "not a validator"),
)
return
}

v.Handler.AppGossip(ctx, nodeID, gossipBytes)
v.handler.AppGossip(ctx, nodeID, gossipBytes)
}

func (v ValidatorHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
if !v.ValidatorSet.Has(ctx, nodeID) {
if !v.validatorSet.Has(ctx, nodeID) {
return nil, ErrNotValidator
}

return v.Handler.AppRequest(ctx, nodeID, deadline, requestBytes)
return v.handler.AppRequest(ctx, nodeID, deadline, requestBytes)
}

func (v ValidatorHandler) CrossChainAppRequest(ctx context.Context, chainID ids.ID, deadline time.Time, requestBytes []byte) ([]byte, error) {
return v.handler.CrossChainAppRequest(ctx, chainID, deadline, requestBytes)
}

// responder automatically sends the response for a given request
Expand Down
20 changes: 10 additions & 10 deletions network/p2p/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ func TestValidatorHandlerAppGossip(t *testing.T) {
require := require.New(t)

called := false
handler := ValidatorHandler{
Handler: testHandler{
handler := NewValidatorHandler(
&testHandler{
appGossipF: func(context.Context, ids.NodeID, []byte) {
called = true
},
},
ValidatorSet: tt.validatorSet,
Log: logging.NoLog{},
}
tt.validatorSet,
logging.NoLog{},
)

handler.AppGossip(context.Background(), tt.nodeID, []byte("foobar"))
require.Equal(tt.expected, called)
Expand Down Expand Up @@ -100,11 +100,11 @@ func TestValidatorHandlerAppRequest(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)

handler := ValidatorHandler{
Handler: NoOpHandler{},
ValidatorSet: tt.validatorSet,
Log: logging.NoLog{},
}
handler := NewValidatorHandler(
NoOpHandler{},
tt.validatorSet,
logging.NoLog{},
)

_, err := handler.AppRequest(context.Background(), tt.nodeID, time.Time{}, []byte("foobar"))
require.ErrorIs(err, tt.expected)
Expand Down
139 changes: 117 additions & 22 deletions network/p2p/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package p2p
import (
"context"
"encoding/binary"
"strconv"
"sync"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
Expand All @@ -23,6 +25,9 @@ var (
_ validators.Connector = (*Network)(nil)
_ common.AppHandler = (*Network)(nil)
_ NodeSampler = (*peerSampler)(nil)

handlerLabel = "handlerID"
labelNames = []string{handlerLabel}
)

// ClientOption configures Client
Expand Down Expand Up @@ -53,28 +58,117 @@ type clientOptions struct {
func NewNetwork(
log logging.Logger,
sender common.AppSender,
metrics prometheus.Registerer,
registerer prometheus.Registerer,
namespace string,
) *Network {
return &Network{
Peers: &Peers{},
log: log,
sender: sender,
metrics: metrics,
namespace: namespace,
router: newRouter(log, sender, metrics, namespace),
) (*Network, error) {
metrics := metrics{
appRequestTime: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "app_request_time",
Help: "app request time (ns)",
}, labelNames),
appRequestCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "app_request_count",
Help: "app request count (n)",
}, labelNames),
appResponseTime: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "app_response_time",
Help: "app response time (ns)",
}, labelNames),
appResponseCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "app_response_count",
Help: "app response count (n)",
}, labelNames),
appRequestFailedTime: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "app_request_failed_time",
Help: "app request failed time (ns)",
}, labelNames),
appRequestFailedCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "app_request_failed_count",
Help: "app request failed count (ns)",
}, labelNames),
appGossipTime: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "app_gossip_time",
Help: "app gossip time (ns)",
}, labelNames),
appGossipCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "app_gossip_count",
Help: "app gossip count (n)",
}, labelNames),
crossChainAppRequestTime: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "cross_chain_app_request_time",
Help: "cross chain app request time (ns)",
}, labelNames),
crossChainAppRequestCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "cross_chain_app_request_count",
Help: "cross chain app request count (n)",
}, labelNames),
crossChainAppResponseTime: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "cross_chain_app_response_time",
Help: "cross chain app response time (ns)",
}, labelNames),
crossChainAppResponseCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "cross_chain_app_response_count",
Help: "cross chain app response count (n)",
}, labelNames),
crossChainAppRequestFailedTime: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "cross_chain_app_request_failed_time",
Help: "cross chain app request failed time (ns)",
}, labelNames),
crossChainAppRequestFailedCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "cross_chain_app_request_failed_count",
Help: "cross chain app request failed count (n)",
}, labelNames),
}

err := utils.Err(
registerer.Register(metrics.appRequestTime),
registerer.Register(metrics.appRequestCount),
registerer.Register(metrics.appResponseTime),
registerer.Register(metrics.appResponseCount),
registerer.Register(metrics.appRequestFailedTime),
registerer.Register(metrics.appRequestFailedCount),
registerer.Register(metrics.appGossipTime),
registerer.Register(metrics.appGossipCount),
registerer.Register(metrics.crossChainAppRequestTime),
registerer.Register(metrics.crossChainAppRequestCount),
registerer.Register(metrics.crossChainAppResponseTime),
registerer.Register(metrics.crossChainAppResponseCount),
registerer.Register(metrics.crossChainAppRequestFailedTime),
registerer.Register(metrics.crossChainAppRequestFailedCount),
)
if err != nil {
return nil, err
}

return &Network{
Peers: &Peers{},
log: log,
sender: sender,
router: newRouter(log, sender, metrics),
}, nil
}

// Network exposes networking state and supports building p2p application
// protocols
type Network struct {
Peers *Peers

log logging.Logger
sender common.AppSender
metrics prometheus.Registerer
namespace string
log logging.Logger
sender common.AppSender

router *router
}
Expand Down Expand Up @@ -117,16 +211,12 @@ func (n *Network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
return nil
}

// NewAppProtocol reserves an identifier for an application protocol handler and
// returns a Client that can be used to send messages for the corresponding
// protocol.
func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...ClientOption) (*Client, error) {
if err := n.router.addHandler(handlerID, handler); err != nil {
return nil, err
}

// NewClient returns a Client that can be used to send messages for the
// corresponding protocol.
func (n *Network) NewClient(handlerID uint64, options ...ClientOption) *Client {
client := &Client{
handlerID: handlerID,
handlerIDStr: strconv.FormatUint(handlerID, 10),
handlerPrefix: binary.AppendUvarint(nil, handlerID),
sender: n.sender,
router: n.router,
Expand All @@ -141,7 +231,12 @@ func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...C
option.apply(client.options)
}

return client, nil
return client
}

// AddHandler reserves an identifier for an application protocol
func (n *Network) AddHandler(handlerID uint64, handler Handler) error {
return n.router.addHandler(handlerID, handler)
}

// Peers contains metadata about the current set of connected peers
Expand Down
Loading

0 comments on commit 832632a

Please sign in to comment.