Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: peerstore backed phonebook #5710

Merged
merged 16 commits into from
Aug 31, 2023
363 changes: 362 additions & 1 deletion network/p2p/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,88 @@

import (
"fmt"
"math"
"math/rand"
"time"

"github.com/libp2p/go-libp2p/core/peer"
libp2p "github.com/libp2p/go-libp2p/core/peerstore"
mempstore "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"golang.org/x/exp/slices"
)

// GetAllAddresses when using GetAddresses with getAllAddresses, all the addresses will be retrieved, regardless
// of how many addresses the phonebook actually has. ( with the retry-after logic applied )
const GetAllAddresses = math.MaxInt32

// PhoneBookEntryRoles defines the roles that a single entry on the phonebook can take.
// currently, we have two roles : relay role and archiver role, which are mutually exclusive.
//
//msgp:ignore PhoneBookEntryRoles
type PhoneBookEntryRoles int

// PhoneBookEntryRelayRole used for all the relays that are provided either via the algobootstrap SRV record
// or via a configuration file.
const PhoneBookEntryRelayRole = 1

// PhoneBookEntryArchiverRole used for all the archivers that are provided via the archive SRV record.
const PhoneBookEntryArchiverRole = 2
shiqizng marked this conversation as resolved.
Show resolved Hide resolved

// Phonebook stores or looks up addresses of nodes we might contact
type Phonebook interface {
cce marked this conversation as resolved.
Show resolved Hide resolved
// GetAddresses(N) returns up to N addresses, but may return fewer
GetAddresses(n int, role PhoneBookEntryRoles) []string

// UpdateRetryAfter updates the retry-after field for the entries matching the given address
UpdateRetryAfter(addr string, retryAfter time.Time)

// GetConnectionWaitTime will calculate and return the wait
// time to prevent exceeding connectionsRateLimitingCount.
// The connection should be established when the waitTime is 0.
// It will register a provisional next connection time when the waitTime is 0.
// The provisional time should be updated after the connection with UpdateConnectionTime
GetConnectionWaitTime(addr string) (addrInPhonebook bool,
waitTime time.Duration, provisionalTime time.Time)

// UpdateConnectionTime will update the provisional connection time.
// Returns true of the addr was in the phonebook
UpdateConnectionTime(addr string, provisionalTime time.Time) bool

// ReplacePeerList merges a set of addresses with that passed in for networkName
// new entries in dnsAddresses are being added
// existing items that aren't included in dnsAddresses are being removed
// matching entries don't change
ReplacePeerList(dnsAddresses []string, networkName string, role PhoneBookEntryRoles)

// AddPersistentPeers stores addresses of peers which are persistent.
// i.e. they won't be replaced by ReplacePeerList calls
AddPersistentPeers(dnsAddresses []string, networkName string, role PhoneBookEntryRoles)
}

// PeerStore implements Peerstore and CertifiedAddrBook.
type PeerStore struct {
peerStoreCAB
connectionsRateLimitingCount uint
connectionsRateLimitingWindow time.Duration
}

// addressData: holds the information associated with each phonebook address.
type addressData struct {
// retryAfter is the time to wait before retrying to connect to the address.
retryAfter time.Time

// recentConnectionTimes is the log of connection times used to observe the maximum
// connections to the address in a given time window.
recentConnectionTimes []time.Time

// networkNames: lists the networks to which the given address belongs.
networkNames map[string]bool

// role is the role that this address serves.
role PhoneBookEntryRoles

// persistent is set true for peers whose record should not be removed for the peer list
persistent bool
Eric-Warehime marked this conversation as resolved.
Show resolved Hide resolved
}

// peerStoreCAB combines the libp2p Peerstore and CertifiedAddrBook interfaces.
Expand All @@ -47,6 +120,294 @@
info := addrInfo[i]
ps.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
}
pstore := &PeerStore{ps}
pstore := &PeerStore{peerStoreCAB: ps}
return pstore, nil
}

// MakePhonebook creates a phonebook with the passed configuration values
func MakePhonebook(connectionsRateLimitingCount uint,
connectionsRateLimitingWindow time.Duration) (*PeerStore, error) {
ps, err := mempstore.NewPeerstore()
if err != nil {
return &PeerStore{}, fmt.Errorf("cannot initialize a peerstore: %w", err)

Check warning on line 132 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L132

Added line #L132 was not covered by tests
}
pstore := &PeerStore{peerStoreCAB: ps,
connectionsRateLimitingCount: connectionsRateLimitingCount,
connectionsRateLimitingWindow: connectionsRateLimitingWindow,
}
return pstore, nil
}

// GetAddresses returns up to N addresses, but may return fewer
func (ps *PeerStore) GetAddresses(n int, role PhoneBookEntryRoles) []string {
return shuffleSelect(ps.filterRetryTime(time.Now(), role), n)
}

// UpdateRetryAfter updates the retryAfter time for the given address.
func (ps *PeerStore) UpdateRetryAfter(addr string, retryAfter time.Time) {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return

Check warning on line 150 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L147-L150

Added lines #L147 - L150 were not covered by tests
}
metadata, _ := ps.Get(info.ID, "addressData")
if metadata != nil {
ad, ok := metadata.(addressData)
if !ok {
return

Check warning on line 156 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L152-L156

Added lines #L152 - L156 were not covered by tests
}
ad.retryAfter = retryAfter
_ = ps.Put(info.ID, "addressData", ad)

Check warning on line 159 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L158-L159

Added lines #L158 - L159 were not covered by tests
}

}

// GetConnectionWaitTime will calculate and return the wait
// time to prevent exceeding connectionsRateLimitingCount.
// The connection should be established when the waitTime is 0.
// It will register a provisional next connection time when the waitTime is 0.
// The provisional time should be updated after the connection with UpdateConnectionTime
func (ps *PeerStore) GetConnectionWaitTime(addr string) (bool, time.Duration, time.Time) {
curTime := time.Now()
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return false, 0 /* not used */, curTime /* not used */

Check warning on line 173 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L173

Added line #L173 was not covered by tests
}
var timeSince time.Duration
var numElmtsToRemove int
metadata, err := ps.Get(info.ID, "addressData")
if err != nil {
return false, 0 /* not used */, curTime /* not used */
}
ad, ok := metadata.(addressData)
if !ok {
return false, 0 /* not used */, curTime /* not used */

Check warning on line 183 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L183

Added line #L183 was not covered by tests
}
// Remove from recentConnectionTimes the times later than ConnectionsRateLimitingWindowSeconds
for numElmtsToRemove < len(ad.recentConnectionTimes) {
timeSince = curTime.Sub(ad.recentConnectionTimes[numElmtsToRemove])
if timeSince >= ps.connectionsRateLimitingWindow {
numElmtsToRemove++
} else {
break // break the loop. The rest are earlier than 1 second
}
}

// Remove the expired elements from e.data[addr].recentConnectionTimes
ps.popNElements(numElmtsToRemove, peer.ID(addr))
// If there are max number of connections within the time window, wait
metadata, _ = ps.Get(info.ID, "addressData")
ad, ok = metadata.(addressData)
if !ok {
return false, 0 /* not used */, curTime /* not used */

Check warning on line 201 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L201

Added line #L201 was not covered by tests
}
numElts := len(ad.recentConnectionTimes)
if uint(numElts) >= ps.connectionsRateLimitingCount {
return true, /* true */
ps.connectionsRateLimitingWindow - timeSince, curTime /* not used */
}

// Else, there is space in connectionsRateLimitingCount. The
// connection request of the caller will proceed
// Update curTime, since it may have significantly changed if waited
provisionalTime := time.Now()
// Append the provisional time for the next connection request
ps.appendTime(info.ID, provisionalTime)
return true, 0 /* no wait. proceed */, provisionalTime
}

// UpdateConnectionTime updates the connection time for the given address.
func (ps *PeerStore) UpdateConnectionTime(addr string, provisionalTime time.Time) bool {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return false

Check warning on line 222 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L222

Added line #L222 was not covered by tests
}
metadata, err := ps.Get(info.ID, "addressData")
if err != nil {
return false
}
ad, ok := metadata.(addressData)
if !ok {
return false

Check warning on line 230 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L230

Added line #L230 was not covered by tests
}
defer func() {
_ = ps.Put(info.ID, "addressData", ad)

}()

// Find the provisionalTime and update it
entry := ad.recentConnectionTimes
for indx, val := range entry {
if provisionalTime == val {
entry[indx] = time.Now()
return true
}
}

// Case where the time is not found: it was removed from the list.
// This may happen when the time expires before the connection was established with the server.
// The time should be added again.
entry = append(entry, time.Now())
ad.recentConnectionTimes = entry

Check warning on line 250 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L249-L250

Added lines #L249 - L250 were not covered by tests

return true

Check warning on line 252 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L252

Added line #L252 was not covered by tests
}

// ReplacePeerList replaces the peer list for the given networkName and role.
func (ps *PeerStore) ReplacePeerList(addressesThey []string, networkName string, role PhoneBookEntryRoles) {
// prepare a map of items we'd like to remove.
removeItems := make(map[peer.ID]bool, 0)
peerIDs := ps.Peers()
for _, pid := range peerIDs {
data, _ := ps.Get(pid, "addressData")
if data != nil {
ad := data.(addressData)
if ad.networkNames[networkName] && ad.role == role && !ad.persistent {
removeItems[pid] = true

Check warning on line 265 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L265

Added line #L265 was not covered by tests
}
}

}
for _, addr := range addressesThey {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return

Check warning on line 273 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L273

Added line #L273 was not covered by tests
}
data, _ := ps.Get(info.ID, "addressData")
if data != nil {
// we already have this.
// Update the networkName
ad := data.(addressData)
ad.networkNames[networkName] = true

// do not remove this entry
delete(removeItems, info.ID)
} else {
// we don't have this item. add it.
ps.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
entry := makePhonebookEntryData(networkName, role, false)
_ = ps.Put(info.ID, "addressData", entry)
}
}

// remove items that were missing in addressesThey
for k := range removeItems {
ps.deletePhonebookEntry(k, networkName)

Check warning on line 294 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L294

Added line #L294 was not covered by tests
}
}

// AddPersistentPeers stores addresses of peers which are persistent.
// i.e. they won't be replaced by ReplacePeerList calls
func (ps *PeerStore) AddPersistentPeers(dnsAddresses []string, networkName string, role PhoneBookEntryRoles) {

for _, addr := range dnsAddresses {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return

Check warning on line 305 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L305

Added line #L305 was not covered by tests
}
data, _ := ps.Get(info.ID, "addressData")
if data != nil {
// we already have this.
// Make sure the persistence field is set to true
ad := data.(addressData)
ad.persistent = true
_ = ps.Put("peerID", "addressData", data)

} else {
// we don't have this item. add it.
ps.AddAddrs(info.ID, info.Addrs, libp2p.PermanentAddrTTL)
entry := makePhonebookEntryData(networkName, role, true)
_ = ps.Put(info.ID, "addressData", entry)
}
}
}

// Length returns the number of addrs in peerstore
func (ps *PeerStore) Length() int {
return len(ps.Peers())
}

// makePhonebookEntryData creates a new address entry for provided network name and role.
func makePhonebookEntryData(networkName string, role PhoneBookEntryRoles, persistent bool) addressData {
pbData := addressData{
networkNames: make(map[string]bool),
recentConnectionTimes: make([]time.Time, 0),
role: role,
persistent: persistent,
}
pbData.networkNames[networkName] = true
return pbData
}

func (ps *PeerStore) deletePhonebookEntry(peerID peer.ID, networkName string) {
data, err := ps.Get(peerID, "addressData")
if err != nil {
return

Check warning on line 344 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L341-L344

Added lines #L341 - L344 were not covered by tests
}
ad := data.(addressData)
delete(ad.networkNames, networkName)
if 0 == len(ad.networkNames) {
ps.ClearAddrs(peerID)
_ = ps.Put(peerID, "addressData", nil)

Check warning on line 350 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L346-L350

Added lines #L346 - L350 were not covered by tests
}
}

// AppendTime adds the current time to recentConnectionTimes in
// addressData of addr
func (ps *PeerStore) appendTime(peerID peer.ID, t time.Time) {
data, _ := ps.Get(peerID, "addressData")
ad := data.(addressData)
ad.recentConnectionTimes = append(ad.recentConnectionTimes, t)
_ = ps.Put(peerID, "addressData", ad)
}

// PopEarliestTime removes the earliest time from recentConnectionTimes in
// addressData for addr
// It is expected to be later than ConnectionsRateLimitingWindow
func (ps *PeerStore) popNElements(n int, peerID peer.ID) {
data, _ := ps.Get(peerID, "addressData")
ad := data.(addressData)
ad.recentConnectionTimes = ad.recentConnectionTimes[n:]
_ = ps.Put(peerID, "addressData", ad)
}

func (ps *PeerStore) filterRetryTime(t time.Time, role PhoneBookEntryRoles) []string {
o := make([]string, 0, len(ps.Peers()))
for _, peerID := range ps.Peers() {
data, _ := ps.Get(peerID, "addressData")
if data != nil {
ad := data.(addressData)
if t.After(ad.retryAfter) && role == ad.role {
o = append(o, string(peerID))
}
}
}
return o
}

func shuffleSelect(set []string, n int) []string {
if n >= len(set) || n == GetAllAddresses {
// return shuffled copy of everything
out := slices.Clone(set)
shuffleStrings(out)
return out
}
// Pick random indexes from the set
indexSample := make([]int, n)
for i := range indexSample {
indexSample[i] = rand.Intn(len(set)-i) + i
for oi, ois := range indexSample[:i] {
if ois == indexSample[i] {
indexSample[i] = oi
}
}
}
out := make([]string, n)
for i, index := range indexSample {
out[i] = set[index]
}
return out
}

func shuffleStrings(set []string) {
rand.Shuffle(len(set), func(i, j int) { set[i], set[j] = set[j], set[i] })
}
Loading