Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue: inscrease max peers count and use p2p.peers.hi as maxPeers instead #255

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ethstorage/flags/p2p_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,14 @@ var (
Name: "p2p.peers.lo",
Usage: "Low-tide peer count. The node actively searches for new peer connections if below this amount.",
Required: false,
Value: 20,
Value: 60,
EnvVar: p2pEnv("PEERS_LO"),
}
PeersHi = cli.UintFlag{
Name: "p2p.peers.hi",
Usage: "High-tide peer count. The node starts pruning peer connections slowly after reaching this number.",
Required: false,
Value: 30,
Value: 70,
EnvVar: p2pEnv("PEERS_HI"),
}
PeersGrace = cli.DurationFlag{
Expand Down
2 changes: 2 additions & 0 deletions ethstorage/p2p/cli/load_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,12 @@ func loadSyncerParams(conf *p2p.Config, ctx *cli.Context) error {
maxRequestSize := ctx.GlobalUint64(flags.MaxRequestSize.Name)
syncConcurrency := ctx.GlobalUint64(flags.SyncConcurrency.Name)
fillEmptyConcurrency := ctx.GlobalInt(flags.FillEmptyConcurrency.Name)
maxPeers := ctx.GlobalInt(flags.PeersHi.Name)
if syncConcurrency < 1 {
return fmt.Errorf("p2p.sync.concurrency param is invalid: the value should larger than 0")
}
conf.SyncParams = &protocol.SyncerParams{
MaxPeers: maxPeers,
MaxRequestSize: maxRequestSize,
SyncConcurrency: syncConcurrency,
FillEmptyConcurrency: fillEmptyConcurrency,
Expand Down
1 change: 1 addition & 0 deletions ethstorage/p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
return nil, fmt.Errorf("failed to open connection gater: %w", err)
}

// TODO as we have MaxPeers to limit the connection count, do we still need this?
connMngr, err := conf.ConnMngr(conf)
if err != nil {
return nil, fmt.Errorf("failed to open connection manager: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion ethstorage/p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.EsConfig,
n.syncCl.RemovePeer(conn.RemotePeer())
},
})
n.syncCl.UpdateMaxPeers(int(setup.(*Config).PeersHi))

// the host may already be connected to peers, add them all to the sync client
for _, conn := range n.host.Network().Conns() {
shards := make(map[common.Address][]uint64)
Expand Down
2 changes: 1 addition & 1 deletion ethstorage/p2p/protocol/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
var (
contract = common.HexToAddress("0x0000000000000000000000000000000003330001")
empty = make([]byte, 0)
params = SyncerParams{MaxRequestSize: uint64(4 * 1024 * 1024), SyncConcurrency: 16, FillEmptyConcurrency: 16, MetaDownloadBatchSize: 16}
params = SyncerParams{MaxPeers: 30, MaxRequestSize: uint64(4 * 1024 * 1024), SyncConcurrency: 16, FillEmptyConcurrency: 16, MetaDownloadBatchSize: 16}
testLog = log.New("TestSync")
prover = prv.NewKZGProver(testLog)
)
Expand Down
17 changes: 4 additions & 13 deletions ethstorage/p2p/protocol/syncclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ const (
// after the rate-limit reservation hits the max throttle delay, give up on serving a request and just close the stream
maxThrottleDelay = time.Second * 20

defaultMaxPeerCount = 30

defaultMinPeersPerShard = 5

minSubTaskSize = 16
Expand Down Expand Up @@ -214,21 +212,13 @@ func NewSyncClient(log log.Logger, cfg *rollup.EsConfig, newStream newStreamFn,
resCancel: cancel,
storageManager: storageManager,
prover: prv.NewKZGProver(log),
maxPeers: defaultMaxPeerCount,
minPeersPerShard: getMinPeersPerShard(defaultMaxPeerCount, shardCount),
maxPeers: params.MaxPeers,
minPeersPerShard: getMinPeersPerShard(params.MaxPeers, shardCount),
syncerParams: params,
}
return c
}

func (s *SyncClient) UpdateMaxPeers(maxPeers int) {
s.lock.Lock()
defer s.lock.Unlock()
s.maxPeers = maxPeers
shardCount := len(s.storageManager.Shards())
s.minPeersPerShard = getMinPeersPerShard(maxPeers, shardCount)
}

func getMinPeersPerShard(maxPeers, shardCount int) int {
minPeersPerShard := (maxPeers + shardCount - 1) / shardCount
if minPeersPerShard < defaultMinPeersPerShard {
Expand Down Expand Up @@ -480,7 +470,8 @@ func (s *SyncClient) AddPeer(id peer.ID, shards map[common.Address][]uint64, dir
return false
}
if !s.needThisPeer(shards) {
s.log.Info("No need this peer, the connection would be closed later", "peer", id.String(), "shards", shards)
s.log.Info("No need this peer, the connection would be closed later", "maxPeers", s.maxPeers,
"Peer count", len(s.peers), "peer", id.String(), "shards", shards)
s.metrics.IncDropPeerCount()
s.lock.Unlock()
return false
Expand Down
1 change: 1 addition & 0 deletions ethstorage/p2p/protocol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type EthStorageSyncDone struct {
}

type SyncerParams struct {
MaxPeers int
MaxRequestSize uint64
SyncConcurrency uint64
FillEmptyConcurrency int
Expand Down
Loading