Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: peerstore backed phonebook #5710

Merged
merged 16 commits into from
Aug 31, 2023
327 changes: 326 additions & 1 deletion network/p2p/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,52 @@

import (
"fmt"
"math"
"math/rand"
"time"

"github.com/libp2p/go-libp2p/core/peer"
libp2p "github.com/libp2p/go-libp2p/core/peerstore"
mempstore "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"golang.org/x/exp/slices"
)

// when using GetAddresses with getAllAddresses, all the addresses will be retrieved, regardless
// of how many addresses the phonebook actually has. ( with the retry-after logic applied )
const getAllAddresses = math.MaxInt32

// PhoneBookEntryRoles defines the roles that a single entry on the phonebook can take.
// currently, we have two roles : relay role and archiver role, which are mutually exclusive.
//
//msgp:ignore PhoneBookEntryRoles
type PhoneBookEntryRoles int

const addressDataKey string = "addressData"

// PeerStore implements Peerstore and CertifiedAddrBook.
type PeerStore struct {
peerStoreCAB
connectionsRateLimitingCount uint
connectionsRateLimitingWindow time.Duration
}

// addressData: holds the information associated with each phonebook address.
type addressData struct {
// retryAfter is the time to wait before retrying to connect to the address.
retryAfter time.Time

// recentConnectionTimes is the log of connection times used to observe the maximum
// connections to the address in a given time window.
recentConnectionTimes []time.Time

// networkNames: lists the networks to which the given address belongs.
networkNames map[string]bool

// role is the role that this address serves.
role PhoneBookEntryRoles

// persistent is set true for peers whose record should not be removed for the peer list
persistent bool
Eric-Warehime marked this conversation as resolved.
Show resolved Hide resolved
}

// peerStoreCAB combines the libp2p Peerstore and CertifiedAddrBook interfaces.
Expand All @@ -47,6 +84,294 @@
info := addrInfo[i]
ps.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
}
pstore := &PeerStore{ps}
pstore := &PeerStore{peerStoreCAB: ps}
return pstore, nil
}

// MakePhonebook creates a phonebook with the passed configuration values
func MakePhonebook(connectionsRateLimitingCount uint,
connectionsRateLimitingWindow time.Duration) (*PeerStore, error) {
ps, err := mempstore.NewPeerstore()
if err != nil {
return &PeerStore{}, fmt.Errorf("cannot initialize a peerstore: %w", err)

Check warning on line 96 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L96

Added line #L96 was not covered by tests
}
pstore := &PeerStore{peerStoreCAB: ps,
connectionsRateLimitingCount: connectionsRateLimitingCount,
connectionsRateLimitingWindow: connectionsRateLimitingWindow,
}
return pstore, nil
}

// GetAddresses returns up to N addresses, but may return fewer
func (ps *PeerStore) GetAddresses(n int, role PhoneBookEntryRoles) []string {
return shuffleSelect(ps.filterRetryTime(time.Now(), role), n)
}

// UpdateRetryAfter updates the retryAfter time for the given address.
func (ps *PeerStore) UpdateRetryAfter(addr string, retryAfter time.Time) {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return

Check warning on line 114 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L111-L114

Added lines #L111 - L114 were not covered by tests
}
metadata, _ := ps.Get(info.ID, addressDataKey)
if metadata != nil {
ad, ok := metadata.(addressData)
if !ok {
return

Check warning on line 120 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L116-L120

Added lines #L116 - L120 were not covered by tests
}
ad.retryAfter = retryAfter
_ = ps.Put(info.ID, addressDataKey, ad)

Check warning on line 123 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L122-L123

Added lines #L122 - L123 were not covered by tests
}

}

// GetConnectionWaitTime will calculate and return the wait
// time to prevent exceeding connectionsRateLimitingCount.
// The connection should be established when the waitTime is 0.
// It will register a provisional next connection time when the waitTime is 0.
// The provisional time should be updated after the connection with UpdateConnectionTime
func (ps *PeerStore) GetConnectionWaitTime(addr string) (bool, time.Duration, time.Time) {
curTime := time.Now()
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return false, 0 /* not used */, curTime /* not used */

Check warning on line 137 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L137

Added line #L137 was not covered by tests
}
var timeSince time.Duration
var numElmtsToRemove int
metadata, err := ps.Get(info.ID, addressDataKey)
if err != nil {
return false, 0 /* not used */, curTime /* not used */
}
ad, ok := metadata.(addressData)
if !ok {
return false, 0 /* not used */, curTime /* not used */

Check warning on line 147 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L147

Added line #L147 was not covered by tests
}
// Remove from recentConnectionTimes the times later than ConnectionsRateLimitingWindowSeconds
for numElmtsToRemove < len(ad.recentConnectionTimes) {
timeSince = curTime.Sub(ad.recentConnectionTimes[numElmtsToRemove])
if timeSince >= ps.connectionsRateLimitingWindow {
numElmtsToRemove++
} else {
break // break the loop. The rest are earlier than 1 second
}
}

// Remove the expired elements from e.data[addr].recentConnectionTimes
ps.popNElements(numElmtsToRemove, peer.ID(addr))
// If there are max number of connections within the time window, wait
metadata, _ = ps.Get(info.ID, addressDataKey)
ad, ok = metadata.(addressData)
if !ok {
return false, 0 /* not used */, curTime /* not used */

Check warning on line 165 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L165

Added line #L165 was not covered by tests
}
numElts := len(ad.recentConnectionTimes)
if uint(numElts) >= ps.connectionsRateLimitingCount {
return true, /* true */
ps.connectionsRateLimitingWindow - timeSince, curTime /* not used */
}

// Else, there is space in connectionsRateLimitingCount. The
// connection request of the caller will proceed
// Update curTime, since it may have significantly changed if waited
provisionalTime := time.Now()
// Append the provisional time for the next connection request
ps.appendTime(info.ID, provisionalTime)
return true, 0 /* no wait. proceed */, provisionalTime
}

// UpdateConnectionTime updates the connection time for the given address.
func (ps *PeerStore) UpdateConnectionTime(addr string, provisionalTime time.Time) bool {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return false

Check warning on line 186 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L186

Added line #L186 was not covered by tests
}
metadata, err := ps.Get(info.ID, addressDataKey)
if err != nil {
return false
}
ad, ok := metadata.(addressData)
if !ok {
return false

Check warning on line 194 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L194

Added line #L194 was not covered by tests
}
defer func() {
_ = ps.Put(info.ID, addressDataKey, ad)

}()

// Find the provisionalTime and update it
entry := ad.recentConnectionTimes
for indx, val := range entry {
if provisionalTime == val {
entry[indx] = time.Now()
return true
}
}

// Case where the time is not found: it was removed from the list.
// This may happen when the time expires before the connection was established with the server.
// The time should be added again.
entry = append(entry, time.Now())
ad.recentConnectionTimes = entry

Check warning on line 214 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L213-L214

Added lines #L213 - L214 were not covered by tests

return true

Check warning on line 216 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L216

Added line #L216 was not covered by tests
}

// ReplacePeerList replaces the peer list for the given networkName and role.
func (ps *PeerStore) ReplacePeerList(addressesThey []string, networkName string, role PhoneBookEntryRoles) {
// prepare a map of items we'd like to remove.
removeItems := make(map[peer.ID]bool, 0)
peerIDs := ps.Peers()
for _, pid := range peerIDs {
data, _ := ps.Get(pid, addressDataKey)
if data != nil {
ad := data.(addressData)
if ad.networkNames[networkName] && ad.role == role && !ad.persistent {
removeItems[pid] = true

Check warning on line 229 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L229

Added line #L229 was not covered by tests
}
}

}
for _, addr := range addressesThey {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return

Check warning on line 237 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L237

Added line #L237 was not covered by tests
}
data, _ := ps.Get(info.ID, addressDataKey)
if data != nil {
// we already have this.
// Update the networkName
ad := data.(addressData)
ad.networkNames[networkName] = true

// do not remove this entry
delete(removeItems, info.ID)
} else {
// we don't have this item. add it.
ps.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
entry := makePhonebookEntryData(networkName, role, false)
_ = ps.Put(info.ID, addressDataKey, entry)
}
}

// remove items that were missing in addressesThey
for k := range removeItems {
ps.deletePhonebookEntry(k, networkName)

Check warning on line 258 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L258

Added line #L258 was not covered by tests
}
}

// AddPersistentPeers stores addresses of peers which are persistent.
// i.e. they won't be replaced by ReplacePeerList calls
func (ps *PeerStore) AddPersistentPeers(dnsAddresses []string, networkName string, role PhoneBookEntryRoles) {

for _, addr := range dnsAddresses {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return

Check warning on line 269 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L269

Added line #L269 was not covered by tests
}
data, _ := ps.Get(info.ID, addressDataKey)
if data != nil {
// we already have this.
// Make sure the persistence field is set to true
ad := data.(addressData)
ad.persistent = true
_ = ps.Put(info.ID, addressDataKey, data)

} else {
// we don't have this item. add it.
ps.AddAddrs(info.ID, info.Addrs, libp2p.PermanentAddrTTL)
entry := makePhonebookEntryData(networkName, role, true)
_ = ps.Put(info.ID, addressDataKey, entry)
}
}
}

// Length returns the number of addrs in peerstore
func (ps *PeerStore) Length() int {
return len(ps.Peers())
}

// makePhonebookEntryData creates a new address entry for provided network name and role.
func makePhonebookEntryData(networkName string, role PhoneBookEntryRoles, persistent bool) addressData {
pbData := addressData{
networkNames: make(map[string]bool),
recentConnectionTimes: make([]time.Time, 0),
role: role,
persistent: persistent,
}
pbData.networkNames[networkName] = true
return pbData
}

func (ps *PeerStore) deletePhonebookEntry(peerID peer.ID, networkName string) {
data, err := ps.Get(peerID, addressDataKey)
if err != nil {
return

Check warning on line 308 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L305-L308

Added lines #L305 - L308 were not covered by tests
}
ad := data.(addressData)
delete(ad.networkNames, networkName)
if 0 == len(ad.networkNames) {
ps.ClearAddrs(peerID)
_ = ps.Put(peerID, addressDataKey, nil)

Check warning on line 314 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L310-L314

Added lines #L310 - L314 were not covered by tests
}
}

// AppendTime adds the current time to recentConnectionTimes in
// addressData of addr
func (ps *PeerStore) appendTime(peerID peer.ID, t time.Time) {
data, _ := ps.Get(peerID, addressDataKey)
ad := data.(addressData)
ad.recentConnectionTimes = append(ad.recentConnectionTimes, t)
_ = ps.Put(peerID, addressDataKey, ad)
}

// PopEarliestTime removes the earliest time from recentConnectionTimes in
// addressData for addr
// It is expected to be later than ConnectionsRateLimitingWindow
func (ps *PeerStore) popNElements(n int, peerID peer.ID) {
data, _ := ps.Get(peerID, addressDataKey)
ad := data.(addressData)
ad.recentConnectionTimes = ad.recentConnectionTimes[n:]
_ = ps.Put(peerID, addressDataKey, ad)
}

func (ps *PeerStore) filterRetryTime(t time.Time, role PhoneBookEntryRoles) []string {
o := make([]string, 0, len(ps.Peers()))
for _, peerID := range ps.Peers() {
data, _ := ps.Get(peerID, addressDataKey)
if data != nil {
ad := data.(addressData)
if t.After(ad.retryAfter) && role == ad.role {
o = append(o, string(peerID))
}
}
}
return o
}

func shuffleSelect(set []string, n int) []string {
if n >= len(set) || n == getAllAddresses {
// return shuffled copy of everything
out := slices.Clone(set)
shuffleStrings(out)
return out
}
// Pick random indexes from the set
indexSample := make([]int, n)
for i := range indexSample {
indexSample[i] = rand.Intn(len(set)-i) + i
for oi, ois := range indexSample[:i] {
if ois == indexSample[i] {
indexSample[i] = oi
}
}
}
out := make([]string, n)
for i, index := range indexSample {
out[i] = set[index]
}
return out
}

func shuffleStrings(set []string) {
rand.Shuffle(len(set), func(i, j int) { set[i], set[j] = set[j], set[i] })
}
Loading