Skip to content

Commit

Permalink
use p2p.peers.lo as maxPeers
Browse files Browse the repository at this point in the history
  • Loading branch information
ping-ke committed Mar 20, 2024
1 parent 8967d7c commit 43ea188
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 17 deletions.
4 changes: 2 additions & 2 deletions ethstorage/flags/p2p_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,14 @@ var (
Name: "p2p.peers.lo",
Usage: "Low-tide peer count. The node actively searches for new peer connections if below this amount.",
Required: false,
Value: 20,
Value: 60,
EnvVar: p2pEnv("PEERS_LO"),
}
PeersHi = cli.UintFlag{
Name: "p2p.peers.hi",
Usage: "High-tide peer count. The node starts pruning peer connections slowly after reaching this number.",
Required: false,
Value: 30,
Value: 70,
EnvVar: p2pEnv("PEERS_HI"),
}
PeersGrace = cli.DurationFlag{
Expand Down
2 changes: 2 additions & 0 deletions ethstorage/p2p/cli/load_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,12 @@ func loadSyncerParams(conf *p2p.Config, ctx *cli.Context) error {
maxRequestSize := ctx.GlobalUint64(flags.MaxRequestSize.Name)
syncConcurrency := ctx.GlobalUint64(flags.SyncConcurrency.Name)
fillEmptyConcurrency := ctx.GlobalInt(flags.FillEmptyConcurrency.Name)
maxPeers := ctx.GlobalInt(flags.PeersLo.Name)
if syncConcurrency < 1 {
return fmt.Errorf("p2p.sync.concurrency param is invalid: the value should larger than 0")
}
conf.SyncParams = &protocol.SyncerParams{
MaxPeers: maxPeers,
MaxRequestSize: maxRequestSize,
SyncConcurrency: syncConcurrency,
FillEmptyConcurrency: fillEmptyConcurrency,
Expand Down
1 change: 1 addition & 0 deletions ethstorage/p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
return nil, fmt.Errorf("failed to open connection gater: %w", err)
}

// TODO as we have MaxPeers to limit the connection count, do we still need this?
connMngr, err := conf.ConnMngr(conf)
if err != nil {
return nil, fmt.Errorf("failed to open connection manager: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion ethstorage/p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.EsConfig,
n.syncCl.RemovePeer(conn.RemotePeer())
},
})
n.syncCl.UpdateMaxPeers(int(setup.(*Config).PeersHi))

// the host may already be connected to peers, add them all to the sync client
for _, conn := range n.host.Network().Conns() {
shards := make(map[common.Address][]uint64)
Expand Down
2 changes: 1 addition & 1 deletion ethstorage/p2p/protocol/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
var (
contract = common.HexToAddress("0x0000000000000000000000000000000003330001")
empty = make([]byte, 0)
params = SyncerParams{MaxRequestSize: uint64(4 * 1024 * 1024), SyncConcurrency: 16, FillEmptyConcurrency: 16, MetaDownloadBatchSize: 16}
params = SyncerParams{MaxPeers: 30, MaxRequestSize: uint64(4 * 1024 * 1024), SyncConcurrency: 16, FillEmptyConcurrency: 16, MetaDownloadBatchSize: 16}
testLog = log.New("TestSync")
prover = prv.NewKZGProver(testLog)
)
Expand Down
17 changes: 4 additions & 13 deletions ethstorage/p2p/protocol/syncclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ const (
// after the rate-limit reservation hits the max throttle delay, give up on serving a request and just close the stream
maxThrottleDelay = time.Second * 20

defaultMaxPeerCount = 30

defaultMinPeersPerShard = 5

minSubTaskSize = 16
Expand Down Expand Up @@ -214,21 +212,13 @@ func NewSyncClient(log log.Logger, cfg *rollup.EsConfig, newStream newStreamFn,
resCancel: cancel,
storageManager: storageManager,
prover: prv.NewKZGProver(log),
maxPeers: defaultMaxPeerCount,
minPeersPerShard: getMinPeersPerShard(defaultMaxPeerCount, shardCount),
maxPeers: params.MaxPeers,
minPeersPerShard: getMinPeersPerShard(params.MaxPeers, shardCount),
syncerParams: params,
}
return c
}

func (s *SyncClient) UpdateMaxPeers(maxPeers int) {
s.lock.Lock()
defer s.lock.Unlock()
s.maxPeers = maxPeers
shardCount := len(s.storageManager.Shards())
s.minPeersPerShard = getMinPeersPerShard(maxPeers, shardCount)
}

func getMinPeersPerShard(maxPeers, shardCount int) int {
minPeersPerShard := (maxPeers + shardCount - 1) / shardCount
if minPeersPerShard < defaultMinPeersPerShard {
Expand Down Expand Up @@ -480,7 +470,8 @@ func (s *SyncClient) AddPeer(id peer.ID, shards map[common.Address][]uint64, dir
return false
}
if !s.needThisPeer(shards) {
s.log.Info("No need this peer, the connection would be closed later", "peer", id.String(), "shards", shards)
s.log.Info("No need this peer, the connection would be closed later", "maxPeers", s.maxPeers,
"Peer count", len(s.peers), "peer", id.String(), "shards", shards)
s.metrics.IncDropPeerCount()
s.lock.Unlock()
return false
Expand Down
1 change: 1 addition & 0 deletions ethstorage/p2p/protocol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type EthStorageSyncDone struct {
}

type SyncerParams struct {
MaxPeers int
MaxRequestSize uint64
SyncConcurrency uint64
FillEmptyConcurrency int
Expand Down

0 comments on commit 43ea188

Please sign in to comment.