Skip to content

Commit

Permalink
fixed issue in the p2p lib
Browse files Browse the repository at this point in the history
  • Loading branch information
otherview committed Sep 1, 2023
1 parent fed099e commit cf2f58f
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 52 deletions.
1 change: 0 additions & 1 deletion go/enclave/events/subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func (s *SubscriptionManager) GetSubscribedLogsForBatch(batch *core.Batch, recei
if relevant {
relevantLogsForSub = append(relevantLogsForSub, logItem)
}
s.logger.Info(fmt.Sprintf("Subscription %s. Account %s. Log %v. Extracted addresses: %v. Relevant: %t", id, sub.Account, logItem, userAddrs, relevant))
}
if len(relevantLogsForSub) > 0 {
relevantLogsPerSubscription[id] = relevantLogsForSub
Expand Down
1 change: 0 additions & 1 deletion go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func (g *Guardian) mainLoop() {
// check enclave status on every loop (this will happen whenever we hit an error while trying to resolve a state,
// or after the monitoring interval if we are healthy)
g.checkEnclaveStatus()
g.logger.Trace("mainLoop - enclave status", "status", g.state.GetStatus())
switch g.state.GetStatus() {
case Disconnected, Unavailable:
// nothing to do, we are waiting for the enclave to be available
Expand Down
48 changes: 17 additions & 31 deletions go/host/p2p/no_inbound_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,17 @@ import (
"github.com/obscuronet/go-obscuro/go/common/host"
"github.com/obscuronet/go-obscuro/go/common/log"
"github.com/obscuronet/go-obscuro/go/common/retry"
"github.com/obscuronet/go-obscuro/go/common/stopcontrol"
"github.com/obscuronet/go-obscuro/go/common/subscription"
"github.com/obscuronet/go-obscuro/go/config"

gethlog "github.com/ethereum/go-ethereum/log"
)

type NoInboundP2P struct {
batchSubscribers *subscription.Manager[host.P2PBatchHandler]
txSubscribers *subscription.Manager[host.P2PTxHandler]
batchReqHandlers *subscription.Manager[host.P2PBatchRequestHandler]
txSubscribers *subscription.Manager[host.P2PTxHandler]

isSequencer bool
ourPublicAddress string
stopControl *stopcontrol.StopControl
logger gethlog.Logger
sl p2pServiceLocator
sequencerAddress string
Expand All @@ -42,14 +38,11 @@ type NoInboundP2P struct {

func NewNoInboundP2P(config *config.HostConfig, serviceLocator p2pServiceLocator, logger gethlog.Logger) *NoInboundP2P {
return &NoInboundP2P{
batchSubscribers: subscription.NewManager[host.P2PBatchHandler](),
txSubscribers: subscription.NewManager[host.P2PTxHandler](),
batchReqHandlers: subscription.NewManager[host.P2PBatchRequestHandler](),
txSubscribers: subscription.NewManager[host.P2PTxHandler](),

isSequencer: config.NodeType == common.Sequencer,
ourPublicAddress: config.P2PPublicAddress,
ourBindAddress: config.P2PBindAddress,
stopControl: stopcontrol.New(),
logger: logger,
sl: serviceLocator,
p2pTimeout: config.P2PConnectionTimeout,
Expand All @@ -58,19 +51,18 @@ func NewNoInboundP2P(config *config.HostConfig, serviceLocator p2pServiceLocator

func (n *NoInboundP2P) Start() error {
// Only the sequencer accepts data in
if !n.isSequencer {
return nil
}
listener, err := net.Listen("tcp", n.ourBindAddress)
if err != nil {
return fmt.Errorf("could not listen for P2P connections on %s: %w", n.ourBindAddress, err)
if n.isSequencer {
listener, err := net.Listen("tcp", n.ourBindAddress)
if err != nil {
return fmt.Errorf("could not listen for P2P connections on %s: %w", n.ourBindAddress, err)
}
n.listener = listener

n.logger.Info("P2P server started listening", "bindAddress", n.ourBindAddress, "publicAddress", n.ourPublicAddress)
go n.handleConnections()
}

n.logger.Info("P2P server started listening", "bindAddress", n.ourBindAddress, "publicAddress", n.ourPublicAddress)
n.running.Store(true)
n.listener = listener

go n.handleConnections()

// ensure we have re-synced the peer list from management contract after startup
go n.RefreshPeerList()
Expand All @@ -88,16 +80,16 @@ func (n *NoInboundP2P) HealthStatus() host.HealthStatus {
}
}

func (n *NoInboundP2P) SubscribeForBatches(handler host.P2PBatchHandler) func() {
return func() {}
func (n *NoInboundP2P) SubscribeForBatches(_ host.P2PBatchHandler) func() {
return nil
}

func (n *NoInboundP2P) SubscribeForTx(handler host.P2PTxHandler) func() {
return n.txSubscribers.Subscribe(handler)
}

func (n *NoInboundP2P) SubscribeForBatchRequests(handler host.P2PBatchRequestHandler) func() {
return func() {}
func (n *NoInboundP2P) SubscribeForBatchRequests(_ host.P2PBatchRequestHandler) func() {
return nil
}

func (n *NoInboundP2P) RefreshPeerList() {
Expand All @@ -108,7 +100,7 @@ func (n *NoInboundP2P) RefreshPeerList() {

var newPeers []string
err := retry.Do(func() error {
if n.stopControl.IsStopping() {
if !n.running.Load() {
return retry.FailFast(fmt.Errorf("p2p service is stopped - abandoning peer list refresh"))
}

Expand Down Expand Up @@ -137,6 +129,7 @@ func (n *NoInboundP2P) SendTxToSequencer(tx common.EncryptedTx) error {
if n.isSequencer {
return errors.New("sequencer cannot send tx to itself")
}

msg := message{Sender: n.ourPublicAddress, Type: msgTypeTx, Contents: tx}
if n.sequencerAddress == "" {
return fmt.Errorf("failed to find sequencer - no sequencerAddress")
Expand Down Expand Up @@ -260,13 +253,6 @@ func (n *NoInboundP2P) handle(conn net.Conn) {
n.logger.Error("received batch from peer, but this is a sequencer node")
return
}
var batchMsg *host.BatchMsg
err := rlp.DecodeBytes(msg.Contents, &batchMsg)
if err != nil {
n.logger.Warn("unable to decode batch received from peer", log.ErrKey, err)
// nothing to send to subscribers
break
}

case msgTypeBatchRequest:
if !n.isSequencer {
Expand Down
23 changes: 11 additions & 12 deletions go/host/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import (
"sync/atomic"
"time"

"github.com/obscuronet/go-obscuro/go/common/subscription"
"github.com/pkg/errors"

"github.com/obscuronet/go-obscuro/go/common/measure"
"github.com/obscuronet/go-obscuro/go/common/retry"
"github.com/obscuronet/go-obscuro/go/common/subscription"
"github.com/pkg/errors"

"github.com/ethereum/go-ethereum/rlp"
"github.com/obscuronet/go-obscuro/go/common"
Expand Down Expand Up @@ -356,12 +355,15 @@ func (p *Service) broadcast(msg message) error {
copy(currentAddresses, p.peerAddresses)
p.peerAddressesMutex.RUnlock()

var wg sync.WaitGroup
for _, address := range currentAddresses {
wg.Add(1)
go p.sendBytesWithRetry(&wg, address, msgEncoded) //nolint: errcheck
closureAddr := address
go func() {
err := p.sendBytesWithRetry(closureAddr, msgEncoded)
if err != nil {
p.logger.Error("unsuccessful broadcast", log.ErrKey, err)
}
}()
}
wg.Wait()

return nil
}
Expand All @@ -383,7 +385,7 @@ func (p *Service) send(msg message, to string) error {
if err != nil {
return fmt.Errorf("could not encode message to send to sequencer. Cause: %w", err)
}
err = p.sendBytesWithRetry(nil, to, msgEncoded)
err = p.sendBytesWithRetry(to, msgEncoded)
if err != nil {
return err
}
Expand All @@ -392,10 +394,7 @@ func (p *Service) send(msg message, to string) error {

// Sends the bytes to the provided address.
// Until introducing libp2p (or equivalent), we have a simple retry
func (p *Service) sendBytesWithRetry(wg *sync.WaitGroup, address string, msgEncoded []byte) error {
if wg != nil {
defer wg.Done()
}
func (p *Service) sendBytesWithRetry(address string, msgEncoded []byte) error {
// retry for about 2 seconds
err := retry.Do(func() error {
return p.sendBytes(address, msgEncoded)
Expand Down
3 changes: 1 addition & 2 deletions integration/simulation/network/network_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"math/big"
"time"

hostcommon "github.com/obscuronet/go-obscuro/go/common/host"

"github.com/obscuronet/go-obscuro/go/host"

"github.com/obscuronet/go-obscuro/go/common"
Expand All @@ -26,6 +24,7 @@ import (
"github.com/obscuronet/go-obscuro/integration/simulation/stats"

gethcommon "github.com/ethereum/go-ethereum/common"
hostcommon "github.com/obscuronet/go-obscuro/go/common/host"
testcommon "github.com/obscuronet/go-obscuro/integration/common"
)

Expand Down
1 change: 0 additions & 1 deletion integration/simulation/network/obscuro_node_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func startInMemoryObscuroNodes(params *params.SimParams, genesisJSON []byte, l1C
// Create the in memory obscuro nodes, each connect each to a geth node
obscuroNodes := make([]*hostcontainer.HostContainer, params.NumberOfNodes)
obscuroHosts := make([]host.Host, params.NumberOfNodes)

mockP2PNetw := p2p.NewMockP2PNetwork(params.AvgBlockDuration, params.AvgNetworkLatency, params.NodeWithIncomingP2PDisabled)

for i := 0; i < params.NumberOfNodes; i++ {
Expand Down
1 change: 1 addition & 0 deletions integration/simulation/network/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (n *networkOfSocketNodes) Create(simParams *params.SimParams, _ *stats.Stat
node.WithL1Host("127.0.0.1"),
node.WithL1WSPort(simParams.StartPort+100),
node.WithInboundP2PEnabled(isIncomingP2PEnabled),
node.WithLogLevel(4),
),
)

Expand Down
4 changes: 2 additions & 2 deletions integration/simulation/simulation_full_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func TestFullNetworkMonteCarloSimulation(t *testing.T) {
L2ToL1EfficiencyThreshold: 0.7, // nodes might stop producing rollups but the geth network is still going
Wallets: wallets,
StartPort: integration.StartPortSimulationFullNetwork,
ReceiptTimeout: 15 * time.Second,
ReceiptTimeout: 65 * time.Second,
StoppingDelay: 10 * time.Second,
NodeWithIncomingP2PDisabled: 0,
NodeWithIncomingP2PDisabled: 2,
}
simParams.AvgNetworkLatency = simParams.AvgBlockDuration / 15

Expand Down
5 changes: 3 additions & 2 deletions integration/simulation/validate_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func checkObscuroBlockchainValidity(t *testing.T, s *Simulation, maxL1Height uin
min, max := minMax(heights)
// This checks that all the nodes are in sync. When a node falls behind with processing blocks it might highlight a problem.
if max-min > max/10 {
t.Errorf("There is a problem with the Obscuro chain. Nodes fell out of sync. Max height: %d. Min height: %d", max, min)
t.Errorf("There is a problem with the Obscuro chain. Nodes fell out of sync. Max height: %d. Min height: %d -> %+v", max, min, heights)
}
}

Expand Down Expand Up @@ -218,7 +218,8 @@ func checkRollups(t *testing.T, s *Simulation, nodeIdx int, rollups []*common.Ex
client := clients[0]
batchOnNode, err := client.BatchHeaderByHash(batchHeader.Hash())
if err != nil {
t.Fatalf("Node %d: Could not find batch header [idx=%s, hash=%s]. Cause: %s", nodeIdx, batchHeader.Number, batchHeader.Hash(), err)
t.Errorf("Node %d: Could not find batch header [idx=%s, hash=%s]. Cause: %s", nodeIdx, batchHeader.Number, batchHeader.Hash(), err)
continue
}
if batchOnNode.Hash() != batchHeader.Hash() {
t.Errorf("Node %d: Batches mismatch!", nodeIdx)
Expand Down

0 comments on commit cf2f58f

Please sign in to comment.