Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: peerstore backed phonebook #5710

Merged
merged 16 commits into from
Aug 31, 2023
305 changes: 304 additions & 1 deletion network/p2p/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,41 @@

import (
"fmt"
"math/rand"
"time"

"github.com/libp2p/go-libp2p/core/peer"
libp2p "github.com/libp2p/go-libp2p/core/peerstore"
mempstore "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"golang.org/x/exp/slices"

"github.com/algorand/go-algorand/network"
shiqizng marked this conversation as resolved.
Show resolved Hide resolved
)

// PeerStore implements Peerstore and CertifiedAddrBook.
type PeerStore struct {
peerStoreCAB
connectionsRateLimitingCount uint
connectionsRateLimitingWindow time.Duration
}

// addressData: holds the information associated with each phonebook address.
type addressData struct {
// retryAfter is the time to wait before retrying to connect to the address.
retryAfter time.Time

// recentConnectionTimes: is the log of connection times used to observe the maximum
// connections to the address in a given time window.
shiqizng marked this conversation as resolved.
Show resolved Hide resolved
recentConnectionTimes []time.Time

// networkNames: lists the networks to which the given address belongs.
networkNames map[string]bool

// role is the role that this address serves.
role network.PhoneBookEntryRoles

// persistent is set true for peers whose record should not be removed for the peer list
persistent bool
Eric-Warehime marked this conversation as resolved.
Show resolved Hide resolved
}

// peerStoreCAB combines the libp2p Peerstore and CertifiedAddrBook interfaces.
Expand All @@ -47,6 +73,283 @@
info := addrInfo[i]
ps.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
}
pstore := &PeerStore{ps}
pstore := &PeerStore{peerStoreCAB: ps}
return pstore, nil
}

// MakePhonebook creates a phonebook with the passed configuration values
func MakePhonebook(connectionsRateLimitingCount uint,
connectionsRateLimitingWindow time.Duration) (*PeerStore, error) {
ps, err := mempstore.NewPeerstore()
if err != nil {
return &PeerStore{}, fmt.Errorf("cannot initialize a peerstore: %w", err)

Check warning on line 85 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L85

Added line #L85 was not covered by tests
}
pstore := &PeerStore{peerStoreCAB: ps,
connectionsRateLimitingCount: connectionsRateLimitingCount,
connectionsRateLimitingWindow: connectionsRateLimitingWindow,
}
return pstore, nil
}

// GetAddresses returns up to N addresses, but may return fewer
func (ps *PeerStore) GetAddresses(n int, role network.PhoneBookEntryRoles) []string {
return shuffleSelect(ps.filterRetryTime(time.Now(), role), n)
}

// UpdateRetryAfter updates the retryAfter time for the given address.
func (ps *PeerStore) UpdateRetryAfter(addr string, retryAfter time.Time) {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return

Check warning on line 103 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L100-L103

Added lines #L100 - L103 were not covered by tests
}
metadata, _ := ps.Get(info.ID, "addressData")
if metadata != nil {
ad := metadata.(addressData)
ad.retryAfter = retryAfter
_ = ps.Put(info.ID, "addressData", ad)

Check warning on line 109 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L105-L109

Added lines #L105 - L109 were not covered by tests
}

}

// GetConnectionWaitTime will calculate and return the wait
// time to prevent exceeding connectionsRateLimitingCount.
// The connection should be established when the waitTime is 0.
// It will register a provisional next connection time when the waitTime is 0.
// The provisional time should be updated after the connection with UpdateConnectionTime
func (ps *PeerStore) GetConnectionWaitTime(addr string) (bool, time.Duration, time.Time) {
curTime := time.Now()
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return false, 0 /* not used */, curTime /* not used */

Check warning on line 123 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L119-L123

Added lines #L119 - L123 were not covered by tests
}
var timeSince time.Duration
var numElmtsToRemove int
metadata, err := ps.Get(info.ID, "addressData")
if err != nil {
return false, 0 /* not used */, curTime /* not used */

Check warning on line 129 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L125-L129

Added lines #L125 - L129 were not covered by tests
}
ad := metadata.(addressData)

Check warning on line 131 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L131

Added line #L131 was not covered by tests
// Remove from recentConnectionTimes the times later than ConnectionsRateLimitingWindowSeconds
for numElmtsToRemove < len(ad.recentConnectionTimes) {
timeSince = curTime.Sub(ad.recentConnectionTimes[numElmtsToRemove])
if timeSince >= ps.connectionsRateLimitingWindow {
numElmtsToRemove++
} else {
break // break the loop. The rest are earlier than 1 second

Check warning on line 138 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L133-L138

Added lines #L133 - L138 were not covered by tests
}
}

// Remove the expired elements from e.data[addr].recentConnectionTimes
ps.popNElements(numElmtsToRemove, peer.ID(addr))

Check warning on line 143 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L143

Added line #L143 was not covered by tests
// If there are max number of connections within the time window, wait
connectionsRateLimitingCount, err := ps.Get("peerID", "connectionsRateLimitingCount")
if err != nil {
return false, 0 /* not used */, curTime /* not used */

Check warning on line 147 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L145-L147

Added lines #L145 - L147 were not covered by tests
}
numElts := len(ad.recentConnectionTimes)
if uint(numElts) >= connectionsRateLimitingCount.(uint) {
return true, /* true */
ps.connectionsRateLimitingWindow - timeSince, curTime /* not used */

Check warning on line 152 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L149-L152

Added lines #L149 - L152 were not covered by tests
}

// Else, there is space in connectionsRateLimitingCount. The
// connection request of the caller will proceed
// Update curTime, since it may have significantly changed if waited
provisionalTime := time.Now()

Check warning on line 158 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L158

Added line #L158 was not covered by tests
// Append the provisional time for the next connection request
ps.appendTime(info.ID, provisionalTime)
return true, 0 /* no wait. proceed */, provisionalTime

Check warning on line 161 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L160-L161

Added lines #L160 - L161 were not covered by tests
}

// UpdateConnectionTime updates the connection time for the given address.
func (ps *PeerStore) UpdateConnectionTime(addr string, provisionalTime time.Time) bool {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return false

Check warning on line 168 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L165-L168

Added lines #L165 - L168 were not covered by tests
}
metadata, err := ps.Get(info.ID, "addressData")
if err != nil {
return false

Check warning on line 172 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L170-L172

Added lines #L170 - L172 were not covered by tests
}
ad := metadata.(addressData)
shiqizng marked this conversation as resolved.
Show resolved Hide resolved
entry := ad.recentConnectionTimes
defer func() {
_ = ps.Put(info.ID, "addressData", entry)

Check warning on line 177 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L174-L177

Added lines #L174 - L177 were not covered by tests

}()

Check warning on line 179 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L179

Added line #L179 was not covered by tests

// Find the provisionalTime and update it
for indx, val := range entry {
if provisionalTime == val {
entry[indx] = time.Now()
return true

Check warning on line 185 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L182-L185

Added lines #L182 - L185 were not covered by tests
}
}

// Case where the time is not found: it was removed from the list.
// This may happen when the time expires before the connection was established with the server.
// The time should be added again.
entry = append(entry, time.Now())

Check warning on line 192 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L192

Added line #L192 was not covered by tests

return true

Check warning on line 194 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L194

Added line #L194 was not covered by tests
}

// ReplacePeerList replaces the peer list for the given networkName and role.
func (ps *PeerStore) ReplacePeerList(addressesThey []string, networkName string, role network.PhoneBookEntryRoles) {
// prepare a map of items we'd like to remove.
removeItems := make(map[peer.ID]bool, 0)
peerIDs := ps.Peers()
for _, pid := range peerIDs {
data, _ := ps.Get(pid, "addressData")
if data != nil {
ad := data.(addressData)
if ad.networkNames[networkName] && ad.role == role && !ad.persistent {
removeItems[pid] = true

Check warning on line 207 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L207

Added line #L207 was not covered by tests
}
}

}
for _, addr := range addressesThey {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return

Check warning on line 215 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L215

Added line #L215 was not covered by tests
}
data, _ := ps.Get(info.ID, "addressData")
if data != nil {
// we already have this.
// Update the networkName
ad := data.(addressData)
ad.networkNames[networkName] = true

Check warning on line 222 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L221-L222

Added lines #L221 - L222 were not covered by tests

// do not remove this entry
delete(removeItems, info.ID)

Check warning on line 225 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L225

Added line #L225 was not covered by tests
} else {
// we don't have this item. add it.
ps.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
entry := makePhonebookEntryData(networkName, role, false)
_ = ps.Put(info.ID, "addressData", entry)
}
}

// remove items that were missing in addressesThey
for k := range removeItems {
ps.deletePhonebookEntry(k, networkName)

Check warning on line 236 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L236

Added line #L236 was not covered by tests
}
}

// AddPersistentPeers stores addresses of peers which are persistent.
// i.e. they won't be replaced by ReplacePeerList calls
func (ps *PeerStore) AddPersistentPeers(dnsAddresses []string, networkName string, role network.PhoneBookEntryRoles) {

for _, addr := range dnsAddresses {
info, err := PeerInfoFromDomainPort(addr)
if err != nil {
return

Check warning on line 247 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L247

Added line #L247 was not covered by tests
}
data, _ := ps.Get(info.ID, "addressData")
if data != nil {
// we already have this.
// Make sure the persistence field is set to true
ad := data.(addressData)
ad.persistent = true
_ = ps.Put("peerID", "addressData", data)

} else {
// we don't have this item. add it.
ps.AddAddrs(info.ID, info.Addrs, libp2p.PermanentAddrTTL)
entry := makePhonebookEntryData(networkName, role, true)
_ = ps.Put(info.ID, "addressData", entry)
}
}
}

// Length returns the number of addrs in peerstore
func (ps *PeerStore) Length() int {
return len(ps.PeersWithKeys())

Check warning on line 268 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L267-L268

Added lines #L267 - L268 were not covered by tests
}

// makePhonebookEntryData creates a new address entry for provided network name and role.
func makePhonebookEntryData(networkName string, role network.PhoneBookEntryRoles, persistent bool) addressData {
pbData := addressData{
networkNames: make(map[string]bool),
recentConnectionTimes: make([]time.Time, 0),
role: role,
persistent: persistent,
}
pbData.networkNames[networkName] = true
return pbData
}

func (ps *PeerStore) deletePhonebookEntry(peerID peer.ID, networkName string) {
data, err := ps.Get(peerID, "addressData")
if err != nil {
return

Check warning on line 286 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L283-L286

Added lines #L283 - L286 were not covered by tests
}
ad := data.(addressData)
delete(ad.networkNames, networkName)
if 0 == len(ad.networkNames) {
ps.ClearAddrs(peerID)
_ = ps.Put(peerID, "addressData", nil)

Check warning on line 292 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L288-L292

Added lines #L288 - L292 were not covered by tests
}
}

// AppendTime adds the current time to recentConnectionTimes in
// addressData of addr
func (ps *PeerStore) appendTime(peerID peer.ID, t time.Time) {
data, _ := ps.Get(peerID, "addressData")
ad := data.(addressData)
ad.recentConnectionTimes = append(ad.recentConnectionTimes, t)
_ = ps.Put(peerID, "addressData", ad)

Check warning on line 302 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L298-L302

Added lines #L298 - L302 were not covered by tests
}

// PopEarliestTime removes the earliest time from recentConnectionTimes in
// addressData for addr
// It is expected to be later than ConnectionsRateLimitingWindow
func (ps *PeerStore) popNElements(n int, peerID peer.ID) {
data, _ := ps.Get(peerID, "addressData")
ad := data.(addressData)
ad.recentConnectionTimes = ad.recentConnectionTimes[n:]
_ = ps.Put(peerID, "addressData", ad)

Check warning on line 312 in network/p2p/peerstore/peerstore.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/peerstore/peerstore.go#L308-L312

Added lines #L308 - L312 were not covered by tests
}

func (ps *PeerStore) filterRetryTime(t time.Time, role network.PhoneBookEntryRoles) []string {
o := make([]string, 0, len(ps.Peers()))
for _, peerID := range ps.Peers() {
data, _ := ps.Get(peerID, "addressData")
if data != nil {
ad := data.(addressData)
if t.After(ad.retryAfter) && role == ad.role {
o = append(o, peerID.String())
}
}
}
return o
}

func shuffleSelect(set []string, n int) []string {
if n >= len(set) || n == network.GetAllAddresses {
// return shuffled copy of everything
out := slices.Clone(set)
shuffleStrings(out)
return out
}
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
// Pick random indexes from the set
indexSample := make([]int, n)
for i := range indexSample {
indexSample[i] = rand.Intn(len(set)-i) + i
for oi, ois := range indexSample[:i] {
if ois == indexSample[i] {
indexSample[i] = oi
}
}
}
out := make([]string, n)
for i, index := range indexSample {
out[i] = set[index]
}
return out
}

func shuffleStrings(set []string) {
rand.Shuffle(len(set), func(i, j int) { t := set[i]; set[i] = set[j]; set[j] = t })
shiqizng marked this conversation as resolved.
Show resolved Hide resolved
}
Loading