Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a IsIncomingP2PEnabled Flag #1491

Merged
merged 10 commits into from
Sep 5, 2023
Merged
5 changes: 5 additions & 0 deletions go/common/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ type BatchMsg struct {
Batches []*common.ExtBatch // The batches being sent.
IsLive bool // true if these batches are being sent as new, false if in response to a p2p request
}

type P2PHostService interface {
Service
P2P
}
8 changes: 8 additions & 0 deletions go/config/host_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type HostInputConfig struct {

// Min interval before creating the next rollup (only used by Sequencer nodes)
RollupInterval time.Duration

// Whether inbound p2p is enabled or not
IsInboundP2PEnabled bool
}

// ToHostConfig returns a HostConfig given a HostInputConfig
Expand Down Expand Up @@ -120,6 +123,7 @@ func (p HostInputConfig) ToHostConfig() *HostConfig {
DebugNamespaceEnabled: p.DebugNamespaceEnabled,
BatchInterval: p.BatchInterval,
RollupInterval: p.RollupInterval,
IsInboundP2PEnabled: p.IsInboundP2PEnabled,
}
}

Expand Down Expand Up @@ -195,6 +199,9 @@ type HostConfig struct {

// Min interval before creating the next rollup (only used by Sequencer nodes)
RollupInterval time.Duration

// Whether p2p is enabled or not
IsInboundP2PEnabled bool
}

// DefaultHostParsedConfig returns a HostConfig with default values.
Expand Down Expand Up @@ -229,5 +236,6 @@ func DefaultHostParsedConfig() *HostInputConfig {
DebugNamespaceEnabled: false,
BatchInterval: 1 * time.Second,
RollupInterval: 5 * time.Second,
IsInboundP2PEnabled: true,
}
}
4 changes: 4 additions & 0 deletions go/host/container/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type HostConfigToml struct {
DebugNamespaceEnabled bool
BatchInterval string
RollupInterval string
IsInboundP2PEnabled bool
}

// ParseConfig returns a config.HostInputConfig based on either the file identified by the `config` flag, or the flags with
Expand Down Expand Up @@ -84,6 +85,7 @@ func ParseConfig() (*config.HostInputConfig, error) {
debugNamespaceEnabled := flag.Bool(debugNamespaceEnabledName, cfg.DebugNamespaceEnabled, flagUsageMap[debugNamespaceEnabledName])
batchInterval := flag.String(batchIntervalName, cfg.BatchInterval.String(), flagUsageMap[batchIntervalName])
rollupInterval := flag.String(rollupIntervalName, cfg.RollupInterval.String(), flagUsageMap[rollupIntervalName])
isInboundP2PEnabled := flag.Bool(isInboundP2PEnabledName, cfg.IsInboundP2PEnabled, flagUsageMap[isInboundP2PEnabledName])

flag.Parse()

Expand Down Expand Up @@ -132,6 +134,7 @@ func ParseConfig() (*config.HostInputConfig, error) {
if err != nil {
return nil, err
}
cfg.IsInboundP2PEnabled = *isInboundP2PEnabled

return cfg, nil
}
Expand Down Expand Up @@ -192,5 +195,6 @@ func fileBasedConfig(configPath string) (*config.HostInputConfig, error) {
LevelDBPath: tomlConfig.LevelDBPath,
BatchInterval: batchInterval,
RollupInterval: rollupInterval,
IsInboundP2PEnabled: tomlConfig.IsInboundP2PEnabled,
}, nil
}
2 changes: 2 additions & 0 deletions go/host/container/cli_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
debugNamespaceEnabledName = "debugNamespaceEnabled"
batchIntervalName = "batchInterval"
rollupIntervalName = "rollupInterval"
isInboundP2PEnabledName = "isInboundP2PEnabled"
)

// Returns a map of the flag usages.
Expand Down Expand Up @@ -68,5 +69,6 @@ func getFlagUsageMap() map[string]string {
debugNamespaceEnabledName: "Whether the debug names is enabled",
batchIntervalName: "Duration between each batch. Can be put down as 1.0s",
rollupIntervalName: "Duration between each rollup. Can be put down as 1.0s",
isInboundP2PEnabledName: "Whether inbound p2p is enabled",
}
}
4 changes: 3 additions & 1 deletion go/host/container/host_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func NewHostContainerFromConfig(parsedConfig *config.HostInputConfig, logger get
enclaveClient := enclaverpc.NewClient(cfg, logger)
p2pLogger := logger.New(log.CmpKey, log.P2PCmp)
metricsService := metrics.New(cfg.MetricsEnabled, cfg.MetricsHTTPPort, logger)

aggP2P := p2p.NewSocketP2PLayer(cfg, services, p2pLogger, metricsService.Registry())

rpcServer := clientrpc.NewServer(cfg, logger)

mgmtContractLib := mgmtcontractlib.NewMgmtContractLib(&cfg.ManagementContractAddress, logger)
Expand All @@ -141,7 +143,7 @@ func NewHostContainerFromConfig(parsedConfig *config.HostInputConfig, logger get

// NewHostContainer builds a host container with dependency injection rather than from config.
// Useful for testing etc. (want to be able to pass in logger, and also have option to mock out dependencies)
func NewHostContainer(cfg *config.HostConfig, services *host.ServicesRegistry, p2p host.P2PHostService, l1Client ethadapter.EthClient, enclaveClient common.Enclave, contractLib mgmtcontractlib.MgmtContractLib, hostWallet wallet.Wallet, rpcServer clientrpc.Server, logger gethlog.Logger, metricsService *metrics.Service) *HostContainer {
func NewHostContainer(cfg *config.HostConfig, services *host.ServicesRegistry, p2p hostcommon.P2PHostService, l1Client ethadapter.EthClient, enclaveClient common.Enclave, contractLib mgmtcontractlib.MgmtContractLib, hostWallet wallet.Wallet, rpcServer clientrpc.Server, logger gethlog.Logger, metricsService *metrics.Service) *HostContainer {
h := host.NewHost(cfg, services, p2p, l1Client, enclaveClient, hostWallet, contractLib, logger, metricsService.Registry())

hostContainer := &HostContainer{
Expand Down
3 changes: 2 additions & 1 deletion go/host/container/test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ ObscuroChainID = 777
ProfilerEnabled = false
DebugNamespaceEnabled = false
BatchInterval = "1.0s"
RollupInterval = "5.0s"
RollupInterval = "5.0s"
isInboundP2PEnabled = true
7 changes: 1 addition & 6 deletions go/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ import (
hostcommon "github.com/obscuronet/go-obscuro/go/common/host"
)

type P2PHostService interface {
hostcommon.Service
hostcommon.P2P
}

// Implementation of host.Host.
type host struct {
config *config.HostConfig
Expand All @@ -50,7 +45,7 @@ type host struct {
enclaveConfig *common.ObscuroEnclaveInfo
}

func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p P2PHostService, ethClient ethadapter.EthClient, enclaveClient common.Enclave, ethWallet wallet.Wallet, mgmtContractLib mgmtcontractlib.MgmtContractLib, logger gethlog.Logger, regMetrics gethmetrics.Registry) hostcommon.Host {
func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p hostcommon.P2PHostService, ethClient ethadapter.EthClient, enclaveClient common.Enclave, ethWallet wallet.Wallet, mgmtContractLib mgmtcontractlib.MgmtContractLib, logger gethlog.Logger, regMetrics gethmetrics.Registry) hostcommon.Host {
database, err := db.CreateDBFromConfig(config, regMetrics, logger)
if err != nil {
logger.Crit("unable to create database for host", log.ErrKey, err)
Expand Down
61 changes: 44 additions & 17 deletions go/host/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import (
"sync/atomic"
"time"

"github.com/obscuronet/go-obscuro/go/common/subscription"
"github.com/pkg/errors"

"github.com/obscuronet/go-obscuro/go/common/measure"
"github.com/obscuronet/go-obscuro/go/common/retry"
"github.com/obscuronet/go-obscuro/go/common/subscription"
"github.com/pkg/errors"

"github.com/ethereum/go-ethereum/rlp"
"github.com/obscuronet/go-obscuro/go/common"
Expand Down Expand Up @@ -74,6 +73,8 @@ func NewSocketP2PLayer(config *config.HostConfig, serviceLocator p2pServiceLocat
peerTracker: newPeerTracker(),
metricsRegistry: metricReg,
logger: logger,

isIncomingP2PDisabled: !config.IsInboundP2PEnabled,
}
}

Expand All @@ -93,21 +94,29 @@ type Service struct {
peerAddresses []string
p2pTimeout time.Duration

peerTracker *peerTracker
metricsRegistry gethmetrics.Registry
logger gethlog.Logger
peerAddressesMutex sync.RWMutex
peerTracker *peerTracker
metricsRegistry gethmetrics.Registry
logger gethlog.Logger
peerAddressesMutex sync.RWMutex
isIncomingP2PDisabled bool
}

func (p *Service) Start() error {
p.running.Store(true)

if p.isIncomingP2PDisabled {
go p.RefreshPeerList()
return nil
}

// We listen for P2P connections.
listener, err := net.Listen("tcp", p.ourBindAddress)
if err != nil {
return fmt.Errorf("could not listen for P2P connections on %s: %w", p.ourBindAddress, err)
}

p.logger.Info("P2P server started listening", "bindAddress", p.ourBindAddress, "publicAddress", p.ourPublicAddress)
p.running.Store(true)

p.listener = listener

go p.handleConnections()
Expand Down Expand Up @@ -140,6 +149,9 @@ func (p *Service) HealthStatus() host.HealthStatus {
}

func (p *Service) SubscribeForBatches(handler host.P2PBatchHandler) func() {
if p.isIncomingP2PDisabled {
return nil
}
return p.batchSubscribers.Subscribe(handler)
}

Expand All @@ -148,6 +160,9 @@ func (p *Service) SubscribeForTx(handler host.P2PTxHandler) func() {
}

func (p *Service) SubscribeForBatchRequests(handler host.P2PBatchRequestHandler) func() {
if p.isIncomingP2PDisabled {
return nil
}
return p.batchReqHandlers.Subscribe(handler)
}

Expand Down Expand Up @@ -192,6 +207,9 @@ func (p *Service) SendTxToSequencer(tx common.EncryptedTx) error {
}

func (p *Service) BroadcastBatches(batches []*common.ExtBatch) error {
if p.isIncomingP2PDisabled {
return nil
}
if !p.isSequencer {
return errors.New("only sequencer can broadcast batches")
}
Expand All @@ -210,6 +228,9 @@ func (p *Service) BroadcastBatches(batches []*common.ExtBatch) error {
}

func (p *Service) RequestBatchesFromSequencer(fromSeqNo *big.Int) error {
if p.isIncomingP2PDisabled {
return nil
}
if p.isSequencer {
return errors.New("sequencer cannot request batches from itself")
}
Expand All @@ -234,6 +255,9 @@ func (p *Service) RequestBatchesFromSequencer(fromSeqNo *big.Int) error {
}

func (p *Service) RespondToBatchRequest(requestID string, batches []*common.ExtBatch) error {
if p.isIncomingP2PDisabled {
return nil
}
if !p.isSequencer {
return errors.New("only sequencer can respond to batch requests")
}
Expand All @@ -256,6 +280,9 @@ func (p *Service) RespondToBatchRequest(requestID string, batches []*common.ExtB
// if there's more than 100 failures on a given fail type
// if there's a known peer for which a message hasn't been received
func (p *Service) verifyHealth() error {
if p.isIncomingP2PDisabled {
return nil
}
var noMsgReceivedPeers []string
for peer, lastMsgTimestamp := range p.peerTracker.receivedMessagesByPeer() {
if time.Now().After(lastMsgTimestamp.Add(_alertPeriod)) {
Expand Down Expand Up @@ -356,12 +383,15 @@ func (p *Service) broadcast(msg message) error {
copy(currentAddresses, p.peerAddresses)
p.peerAddressesMutex.RUnlock()

var wg sync.WaitGroup
for _, address := range currentAddresses {
wg.Add(1)
go p.sendBytesWithRetry(&wg, address, msgEncoded) //nolint: errcheck
closureAddr := address
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the purpose of the WaitGroup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it was to issue the payload to all nodes in parallel and wait until the whole broadcast was finished.
With a 2 min retry and one node with p2p off, this had a weird effect on the sequencer which would not update the host db in proper time.

When running the sim with this code, all nodes (host db) have height ~80 but the sequencer (host db) stays at high ~60

go func() {
err := p.sendBytesWithRetry(closureAddr, msgEncoded)
if err != nil {
p.logger.Error("unsuccessful broadcast", log.ErrKey, err)
}
}()
}
wg.Wait()

return nil
}
Expand All @@ -383,7 +413,7 @@ func (p *Service) send(msg message, to string) error {
if err != nil {
return fmt.Errorf("could not encode message to send to sequencer. Cause: %w", err)
}
err = p.sendBytesWithRetry(nil, to, msgEncoded)
err = p.sendBytesWithRetry(to, msgEncoded)
if err != nil {
return err
}
Expand All @@ -392,10 +422,7 @@ func (p *Service) send(msg message, to string) error {

// Sends the bytes to the provided address.
// Until introducing libp2p (or equivalent), we have a simple retry
func (p *Service) sendBytesWithRetry(wg *sync.WaitGroup, address string, msgEncoded []byte) error {
if wg != nil {
defer wg.Done()
}
func (p *Service) sendBytesWithRetry(address string, msgEncoded []byte) error {
// retry for about 2 seconds
err := retry.Do(func() error {
return p.sendBytes(address, msgEncoded)
Expand Down
8 changes: 8 additions & 0 deletions go/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
debugNamespaceEnabled bool
profilerEnabled bool
logLevel int
isInboundP2PEnabled bool
}

func NewNodeConfig(opts ...Option) *Config {
Expand Down Expand Up @@ -102,6 +103,7 @@ func (c *Config) ToHostConfig() *config.HostInputConfig {
cfg.MetricsEnabled = false
cfg.DebugNamespaceEnabled = c.debugNamespaceEnabled
cfg.LogLevel = c.logLevel
cfg.IsInboundP2PEnabled = c.isInboundP2PEnabled

return cfg
}
Expand Down Expand Up @@ -275,3 +277,9 @@ func WithLogLevel(i int) Option {
c.logLevel = i
}
}

func WithInboundP2PEnabled(b bool) Option {
return func(c *Config) {
c.isInboundP2PEnabled = b
}
}
1 change: 1 addition & 0 deletions go/node/docker_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (d *DockerNode) startHost() error {
"-batchInterval=1s",
"-rollupInterval=3s",
fmt.Sprintf("-logLevel=%d", d.cfg.logLevel),
fmt.Sprintf("-isInboundP2PEnabled=%t", d.cfg.isInboundP2PEnabled),
}
if !d.cfg.hostInMemDB {
cmd = append(cmd, "-levelDBPath", _hostDataDir)
Expand Down
7 changes: 6 additions & 1 deletion integration/simulation/network/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (n *basicNetworkOfInMemoryNodes) Create(params *params.SimParams, stats *st
n.l2Clients = make([]rpc.Client, params.NumberOfNodes)
obscuroHosts := make([]host.Host, params.NumberOfNodes)

p2pNetw := p2p.NewMockP2PNetwork(params.AvgBlockDuration, params.AvgNetworkLatency)
p2pNetw := p2p.NewMockP2PNetwork(params.AvgBlockDuration, params.AvgNetworkLatency, params.NodeWithIncomingP2PDisabled)

// Invent some addresses to assign as the L1 erc20 contracts
dummyOBXAddress := datagenerator.RandomAddress()
Expand All @@ -46,6 +46,10 @@ func (n *basicNetworkOfInMemoryNodes) Create(params *params.SimParams, stats *st
for i := 0; i < params.NumberOfNodes; i++ {
isGenesis := i == 0

incomingP2PEnabled := true
if !isGenesis && i == params.NodeWithIncomingP2PDisabled {
incomingP2PEnabled = false
}
// create the in memory l1 and l2 node
miner := createMockEthNode(int64(i), params.NumberOfNodes, params.AvgBlockDuration, params.AvgNetworkLatency, stats)

Expand All @@ -62,6 +66,7 @@ func (n *basicNetworkOfInMemoryNodes) Create(params *params.SimParams, stats *st
&disabledBus,
common.Hash{},
params.AvgBlockDuration/2,
incomingP2PEnabled,
)
obscuroClient := p2p.NewInMemObscuroClient(agg)

Expand Down
7 changes: 4 additions & 3 deletions integration/simulation/network/network_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import (
"github.com/obscuronet/go-obscuro/go/host/container"
"github.com/obscuronet/go-obscuro/go/wallet"
"github.com/obscuronet/go-obscuro/integration"
"github.com/obscuronet/go-obscuro/integration/simulation/p2p"

"github.com/obscuronet/go-obscuro/integration/common/testlog"
"github.com/obscuronet/go-obscuro/integration/ethereummock"
"github.com/obscuronet/go-obscuro/integration/simulation/stats"

gethcommon "github.com/ethereum/go-ethereum/common"
hostcommon "github.com/obscuronet/go-obscuro/go/common/host"
testcommon "github.com/obscuronet/go-obscuro/integration/common"
)

Expand Down Expand Up @@ -53,10 +52,11 @@ func createInMemObscuroNode(
genesisJSON []byte,
ethWallet wallet.Wallet,
ethClient ethadapter.EthClient,
mockP2P *p2p.MockP2P,
mockP2P hostcommon.P2PHostService,
l1BusAddress *gethcommon.Address,
l1StartBlk gethcommon.Hash,
batchInterval time.Duration,
incomingP2PEnabled bool,
) *container.HostContainer {
mgtContractAddress := mgmtContractLib.GetContractAddr()

Expand All @@ -69,6 +69,7 @@ func createInMemObscuroNode(
L1StartHash: l1StartBlk,
ManagementContractAddress: *mgtContractAddress,
BatchInterval: batchInterval,
IsInboundP2PEnabled: incomingP2PEnabled,
}

enclaveConfig := &config.EnclaveConfig{
Expand Down
Loading
Loading