diff --git a/ethstorage/flags/p2p_flags.go b/ethstorage/flags/p2p_flags.go index cbb2f9ba..926ff203 100644 --- a/ethstorage/flags/p2p_flags.go +++ b/ethstorage/flags/p2p_flags.go @@ -188,14 +188,14 @@ var ( Name: "p2p.peers.lo", Usage: "Low-tide peer count. The node actively searches for new peer connections if below this amount.", Required: false, - Value: 20, + Value: 60, EnvVar: p2pEnv("PEERS_LO"), } PeersHi = cli.UintFlag{ Name: "p2p.peers.hi", Usage: "High-tide peer count. The node starts pruning peer connections slowly after reaching this number.", Required: false, - Value: 30, + Value: 70, EnvVar: p2pEnv("PEERS_HI"), } PeersGrace = cli.DurationFlag{ diff --git a/ethstorage/p2p/cli/load_config.go b/ethstorage/p2p/cli/load_config.go index 159bcfed..db7f4538 100644 --- a/ethstorage/p2p/cli/load_config.go +++ b/ethstorage/p2p/cli/load_config.go @@ -366,10 +366,12 @@ func loadSyncerParams(conf *p2p.Config, ctx *cli.Context) error { maxRequestSize := ctx.GlobalUint64(flags.MaxRequestSize.Name) syncConcurrency := ctx.GlobalUint64(flags.SyncConcurrency.Name) fillEmptyConcurrency := ctx.GlobalInt(flags.FillEmptyConcurrency.Name) + maxPeers := ctx.GlobalInt(flags.PeersHi.Name) if syncConcurrency < 1 { return fmt.Errorf("p2p.sync.concurrency param is invalid: the value should larger than 0") } conf.SyncParams = &protocol.SyncerParams{ + MaxPeers: maxPeers, MaxRequestSize: maxRequestSize, SyncConcurrency: syncConcurrency, FillEmptyConcurrency: fillEmptyConcurrency, diff --git a/ethstorage/p2p/host.go b/ethstorage/p2p/host.go index 359e7880..51633b0a 100644 --- a/ethstorage/p2p/host.go +++ b/ethstorage/p2p/host.go @@ -148,6 +148,7 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host, return nil, fmt.Errorf("failed to open connection gater: %w", err) } + // TODO as we have MaxPeers to limit the connection count, do we still need this? connMngr, err := conf.ConnMngr(conf) if err != nil { return nil, fmt.Errorf("failed to open connection manager: %w", err) diff --git a/ethstorage/p2p/node.go b/ethstorage/p2p/node.go index 191f2d02..ebdc0cc3 100644 --- a/ethstorage/p2p/node.go +++ b/ethstorage/p2p/node.go @@ -131,7 +131,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.EsConfig, n.syncCl.RemovePeer(conn.RemotePeer()) }, }) - n.syncCl.UpdateMaxPeers(int(setup.(*Config).PeersHi)) + // the host may already be connected to peers, add them all to the sync client for _, conn := range n.host.Network().Conns() { shards := make(map[common.Address][]uint64) diff --git a/ethstorage/p2p/protocol/sync_test.go b/ethstorage/p2p/protocol/sync_test.go index 6f1763a8..9c8c51cf 100644 --- a/ethstorage/p2p/protocol/sync_test.go +++ b/ethstorage/p2p/protocol/sync_test.go @@ -42,7 +42,7 @@ const ( var ( contract = common.HexToAddress("0x0000000000000000000000000000000003330001") empty = make([]byte, 0) - params = SyncerParams{MaxRequestSize: uint64(4 * 1024 * 1024), SyncConcurrency: 16, FillEmptyConcurrency: 16, MetaDownloadBatchSize: 16} + params = SyncerParams{MaxPeers: 30, MaxRequestSize: uint64(4 * 1024 * 1024), SyncConcurrency: 16, FillEmptyConcurrency: 16, MetaDownloadBatchSize: 16} testLog = log.New("TestSync") prover = prv.NewKZGProver(testLog) ) diff --git a/ethstorage/p2p/protocol/syncclient.go b/ethstorage/p2p/protocol/syncclient.go index 741e389c..53bd5eb9 100644 --- a/ethstorage/p2p/protocol/syncclient.go +++ b/ethstorage/p2p/protocol/syncclient.go @@ -42,8 +42,6 @@ const ( // after the rate-limit reservation hits the max throttle delay, give up on serving a request and just close the stream maxThrottleDelay = time.Second * 20 - defaultMaxPeerCount = 30 - defaultMinPeersPerShard = 5 minSubTaskSize = 16 @@ -214,21 +212,13 @@ func NewSyncClient(log log.Logger, cfg *rollup.EsConfig, newStream newStreamFn, resCancel: cancel, storageManager: storageManager, prover: prv.NewKZGProver(log), - maxPeers: defaultMaxPeerCount, - minPeersPerShard: getMinPeersPerShard(defaultMaxPeerCount, shardCount), + maxPeers: params.MaxPeers, + minPeersPerShard: getMinPeersPerShard(params.MaxPeers, shardCount), syncerParams: params, } return c } -func (s *SyncClient) UpdateMaxPeers(maxPeers int) { - s.lock.Lock() - defer s.lock.Unlock() - s.maxPeers = maxPeers - shardCount := len(s.storageManager.Shards()) - s.minPeersPerShard = getMinPeersPerShard(maxPeers, shardCount) -} - func getMinPeersPerShard(maxPeers, shardCount int) int { minPeersPerShard := (maxPeers + shardCount - 1) / shardCount if minPeersPerShard < defaultMinPeersPerShard { @@ -480,7 +470,8 @@ func (s *SyncClient) AddPeer(id peer.ID, shards map[common.Address][]uint64, dir return false } if !s.needThisPeer(shards) { - s.log.Info("No need this peer, the connection would be closed later", "peer", id.String(), "shards", shards) + s.log.Info("No need this peer, the connection would be closed later", "maxPeers", s.maxPeers, + "Peer count", len(s.peers), "peer", id.String(), "shards", shards) s.metrics.IncDropPeerCount() s.lock.Unlock() return false diff --git a/ethstorage/p2p/protocol/types.go b/ethstorage/p2p/protocol/types.go index 93a7da32..e7c075dd 100644 --- a/ethstorage/p2p/protocol/types.go +++ b/ethstorage/p2p/protocol/types.go @@ -135,6 +135,7 @@ type EthStorageSyncDone struct { } type SyncerParams struct { + MaxPeers int MaxRequestSize uint64 SyncConcurrency uint64 FillEmptyConcurrency int