Skip to content

Commit

Permalink
remove cross-chain handlers (#1286)
Browse files Browse the repository at this point in the history
* remove cross-chain handlers

* fix version

* bump avalanchego

* update versions.sh
  • Loading branch information
darioush authored Aug 15, 2024
1 parent 21c0ba2 commit 9f36083
Show file tree
Hide file tree
Showing 18 changed files with 69 additions and 869 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21.12

require (
github.com/VictoriaMetrics/fastcache v1.12.1
github.com/ava-labs/avalanchego v1.11.11-0.20240805202431-479145a6602d
github.com/ava-labs/avalanchego v1.11.11-0.20240813203340-ab83fb41528d
github.com/cespare/cp v0.1.0
github.com/crate-crypto/go-ipa v0.0.0-20231025140028-3c0104f4b233
github.com/davecgh/go-spew v1.1.1
Expand Down Expand Up @@ -54,7 +54,7 @@ require (
require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/ava-labs/coreth v0.13.8-0.20240802110637-b3e5088d062d // indirect
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240813194342-7635a96aa180 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanchego v1.11.11-0.20240805202431-479145a6602d h1:T8sDX5uo7zSEjwDtVccS1WtzuC3pRXs9NXYbmGGagJ4=
github.com/ava-labs/avalanchego v1.11.11-0.20240805202431-479145a6602d/go.mod h1:9e0UPXJboybmgFjeTj+SFbK4ugbrdG4t68VdiUW5oQ8=
github.com/ava-labs/coreth v0.13.8-0.20240802110637-b3e5088d062d h1:klPTcKVvqfA2KSKaRvQAO56Pd4XAqGhwgMTQ6/W+w7w=
github.com/ava-labs/coreth v0.13.8-0.20240802110637-b3e5088d062d/go.mod h1:tXDujonxXFOF6oK5HS2EmgtSXJK3Gy6RpZxb5WzR9rM=
github.com/ava-labs/avalanchego v1.11.11-0.20240813203340-ab83fb41528d h1:LyrKJL9avIIxBY3uTcS2dFtUMBFmI2QpAgG6qYTdA6s=
github.com/ava-labs/avalanchego v1.11.11-0.20240813203340-ab83fb41528d/go.mod h1:UkyrRDXK2E15Lq2abyae2Pt+JsWvgsg1pe0/AtoMyAM=
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240813194342-7635a96aa180 h1:6aIHp7wbyGVYdhHVQUbG7BEcbCMEQ5SYopPPJyipyvk=
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240813194342-7635a96aa180/go.mod h1:/wNBVq7J7wlC2Kbov7kk6LV5xZvau7VF9zwTVOeyAjY=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
14 changes: 0 additions & 14 deletions peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ type NetworkClient interface {
// Returns response bytes, and ErrRequestFailed if the request should be retried.
SendAppRequest(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error)

// SendCrossChainRequest sends a request to a specific blockchain running on this node.
// Returns response bytes, and ErrRequestFailed if the request failed.
SendCrossChainRequest(ctx context.Context, chainID ids.ID, request []byte) ([]byte, error)

// TrackBandwidth should be called for each valid request with the bandwidth
// (length of response divided by request time), and with 0 if the response is invalid.
TrackBandwidth(nodeID ids.NodeID, bandwidth float64)
Expand Down Expand Up @@ -77,16 +73,6 @@ func (c *client) SendAppRequest(ctx context.Context, nodeID ids.NodeID, request
return waitingHandler.WaitForResult(ctx)
}

// SendCrossChainRequest synchronously sends request to the specified chainID
// Returns response bytes and ErrRequestFailed if the request should be retried.
func (c *client) SendCrossChainRequest(ctx context.Context, chainID ids.ID, request []byte) ([]byte, error) {
waitingHandler := newWaitingResponseHandler()
if err := c.network.SendCrossChainRequest(ctx, chainID, request, waitingHandler); err != nil {
return nil, err
}
return waitingHandler.WaitForResult(ctx)
}

func (c *client) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {
c.network.TrackBandwidth(nodeID, bandwidth)
}
172 changes: 8 additions & 164 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ type Network interface {
// SendAppRequest sends message to given nodeID, notifying handler when there's a response or timeout
SendAppRequest(ctx context.Context, nodeID ids.NodeID, message []byte, handler message.ResponseHandler) error

// SendCrossChainRequest sends a message to given chainID notifying handler when there's a response or timeout
SendCrossChainRequest(ctx context.Context, chainID ids.ID, message []byte, handler message.ResponseHandler) error

// Shutdown stops all peer channel listeners and marks the node to have stopped
// n.Start() can be called again but the peers will have to be reconnected
// by calling OnPeerConnected for each peer
Expand All @@ -65,9 +62,6 @@ type Network interface {
// SetRequestHandler sets the provided request handler as the request handler
SetRequestHandler(handler message.RequestHandler)

// SetCrossChainHandler sets the provided cross chain request handler as the cross chain request handler
SetCrossChainRequestHandler(handler message.CrossChainRequestHandler)

// Size returns the size of the network in number of connected peers
Size() uint32

Expand All @@ -89,17 +83,13 @@ type network struct {
requestIDGen uint32 // requestID counter used to track outbound requests
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests
p2pNetwork *p2p.Network
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
appRequestHandler message.RequestHandler // maps request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics

// Set to true when Shutdown is called, after which all operations on this
// struct are no-ops.
Expand All @@ -112,22 +102,18 @@ type network struct {
closed utils.Atomic[bool]
}

func NewNetwork(p2pNetwork *p2p.Network, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
func NewNetwork(p2pNetwork *p2p.Network, appSender common.AppSender, codec codec.Manager, self ids.NodeID, maxActiveAppRequests int64) Network {
return &network{
appSender: appSender,
codec: codec,
crossChainCodec: crossChainCodec,
self: self,
outstandingRequestHandlers: make(map[uint32]message.ResponseHandler),
activeAppRequests: semaphore.NewWeighted(maxActiveAppRequests),
activeCrossChainRequests: semaphore.NewWeighted(maxActiveCrossChainRequests),
p2pNetwork: p2pNetwork,
gossipHandler: message.NoopMempoolGossipHandler{},
appRequestHandler: message.NoopRequestHandler{},
crossChainRequestHandler: message.NoopCrossChainRequestHandler{},
peers: NewPeerTracker(),
appStats: stats.NewRequestHandlerStats(),
crossChainStats: stats.NewCrossChainRequestHandlerStats(),
}
}

Expand Down Expand Up @@ -225,141 +211,6 @@ func (n *network) sendAppRequest(ctx context.Context, nodeID ids.NodeID, request
return nil
}

// SendCrossChainRequest sends request message bytes to specified chainID and adds [handler] to [outstandingRequestHandlers]
// so that it can be invoked when the network receives either a response or failure message.
// Returns an error if [appSender] is unable to make the request.
func (n *network) SendCrossChainRequest(ctx context.Context, chainID ids.ID, request []byte, handler message.ResponseHandler) error {
// Take a slot from total [activeCrossChainRequests] and block until a slot becomes available.
if err := n.activeCrossChainRequests.Acquire(ctx, 1); err != nil {
return errAcquiringSemaphore
}

n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
n.activeCrossChainRequests.Release(1)
return nil
}

// If the context was cancelled, we can skip sending this request.
if err := ctx.Err(); err != nil {
n.activeCrossChainRequests.Release(1)
return err
}

requestID := n.nextRequestID()
n.outstandingRequestHandlers[requestID] = handler

// Send cross chain request to [chainID].
// On failure, release the slot from [activeCrossChainRequests] and delete
// request from [outstandingRequestHandlers].
//
// Cancellation is removed from this context to avoid erroring unexpectedly.
// SendCrossChainAppRequest should be non-blocking and any error other than
// context cancellation is unexpected.
//
// This guarantees that the network should never receive an unexpected
// CrossChainAppResponse.
ctxWithoutCancel := context.WithoutCancel(ctx)
if err := n.appSender.SendCrossChainAppRequest(ctxWithoutCancel, chainID, requestID, request); err != nil {
log.Error(
"request to chain failed",
"chainID", chainID,
"requestID", requestID,
"requestLen", len(request),
"error", err,
)

n.activeCrossChainRequests.Release(1)
delete(n.outstandingRequestHandlers, requestID)
return err
}

log.Debug("sent request message to chain", "chainID", chainID, "crossChainRequestID", requestID)
return nil
}

// CrossChainAppRequest notifies the VM when another chain in the network requests for data.
// Send a CrossChainAppResponse to [chainID] in response to a valid message using the same
// [requestID] before the deadline.
func (n *network) CrossChainAppRequest(ctx context.Context, requestingChainID ids.ID, requestID uint32, deadline time.Time, request []byte) error {
if n.closed.Get() {
return nil
}

log.Debug("received CrossChainAppRequest from chain", "requestingChainID", requestingChainID, "requestID", requestID, "requestLen", len(request))

var req message.CrossChainRequest
if _, err := n.crossChainCodec.Unmarshal(request, &req); err != nil {
log.Debug("failed to unmarshal CrossChainAppRequest", "requestingChainID", requestingChainID, "requestID", requestID, "requestLen", len(request), "err", err)
return nil
}

bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.crossChainStats)
if err != nil {
log.Debug("deadline to process CrossChainAppRequest has expired, skipping", "requestingChainID", requestingChainID, "requestID", requestID, "err", err)
return nil
}

log.Debug("processing incoming CrossChainAppRequest", "requestingChainID", requestingChainID, "requestID", requestID, "req", req)
handleCtx, cancel := context.WithDeadline(context.Background(), bufferedDeadline)
defer cancel()

responseBytes, err := req.Handle(handleCtx, requestingChainID, requestID, n.crossChainRequestHandler)
switch {
case err != nil && err != context.DeadlineExceeded:
return err // Return a fatal error
case responseBytes != nil:
return n.appSender.SendCrossChainAppResponse(ctx, requestingChainID, requestID, responseBytes) // Propagate fatal error
default:
return nil
}
}

// CrossChainAppRequestFailed can be called by the avalanchego -> VM in following cases:
// - respondingChain doesn't exist
// - invalid CrossChainAppResponse from respondingChain
// - invalid CrossChainRequest was sent to respondingChain
// - request times out before a response is provided
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) CrossChainAppRequestFailed(ctx context.Context, respondingChainID ids.ID, requestID uint32, _ *common.AppError) error {
log.Debug("received CrossChainAppRequestFailed from chain", "respondingChainID", respondingChainID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Can happen after the network has been closed.
log.Debug("received CrossChainAppRequestFailed to unknown request", "respondingChainID", respondingChainID, "requestID", requestID)
return nil
}

// We must release the slot
n.activeCrossChainRequests.Release(1)

return handler.OnFailure()
}

// CrossChainAppResponse is invoked when there is a
// response received from [respondingChainID] regarding a request the VM sent out
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) CrossChainAppResponse(ctx context.Context, respondingChainID ids.ID, requestID uint32, response []byte) error {
log.Debug("received CrossChainAppResponse from responding chain", "respondingChainID", respondingChainID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Can happen after the network has been closed.
log.Debug("received CrossChainAppResponse to unknown request", "respondingChainID", respondingChainID, "requestID", requestID, "responseLen", len(response))
return nil
}

// We must release the slot
n.activeCrossChainRequests.Release(1)

return handler.OnResponse(response)
}

// AppRequest is called by avalanchego -> VM when there is an incoming AppRequest from a peer
// error returned by this function is expected to be treated as fatal by the engine
// returns error if the requestHandler returns an error
Expand Down Expand Up @@ -443,7 +294,7 @@ func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, reque

// calculateTimeUntilDeadline calculates the time until deadline and drops it if we missed he deadline to response.
// This function updates metrics for both app requests and cross chain requests.
// This is called by either [AppRequest] or [CrossChainAppRequest].
// This is called by [AppRequest].
func calculateTimeUntilDeadline(deadline time.Time, stats stats.RequestHandlerStats) (time.Time, error) {
// calculate how much time is left until the deadline
timeTillDeadline := time.Until(deadline)
Expand Down Expand Up @@ -560,13 +411,6 @@ func (n *network) SetRequestHandler(handler message.RequestHandler) {
n.appRequestHandler = handler
}

func (n *network) SetCrossChainRequestHandler(handler message.CrossChainRequestHandler) {
n.lock.Lock()
defer n.lock.Unlock()

n.crossChainRequestHandler = handler
}

func (n *network) Size() uint32 {
n.lock.RLock()
defer n.lock.RUnlock()
Expand Down
Loading

0 comments on commit 9f36083

Please sign in to comment.