diff --git a/catchup/ledgerFetcher.go b/catchup/ledgerFetcher.go index 916627db8f..ae4c720108 100644 --- a/catchup/ledgerFetcher.go +++ b/catchup/ledgerFetcher.go @@ -81,7 +81,11 @@ func (lf *ledgerFetcher) requestLedger(ctx context.Context, peer network.HTTPPee } network.SetUserAgentHeader(request.Header) - return peer.GetHTTPClient().Do(request) + httpClient := peer.GetHTTPClient() + if httpClient == nil { + return nil, fmt.Errorf("requestLedger: HTTPPeer %s has no http client", peer.GetAddress()) + } + return httpClient.Do(request) } func (lf *ledgerFetcher) headLedger(ctx context.Context, peer network.Peer, round basics.Round) error { diff --git a/catchup/universalFetcher.go b/catchup/universalFetcher.go index 27b970fc26..c7a8a9a4cf 100644 --- a/catchup/universalFetcher.go +++ b/catchup/universalFetcher.go @@ -69,11 +69,15 @@ func (uf *universalBlockFetcher) fetchBlock(ctx context.Context, round basics.Ro } address = fetcherClient.address() } else if httpPeer, validHTTPPeer := peer.(network.HTTPPeer); validHTTPPeer { + httpClient := httpPeer.GetHTTPClient() + if httpClient == nil { + return nil, nil, time.Duration(0), fmt.Errorf("fetchBlock: HTTPPeer %s has no http client", httpPeer.GetAddress()) + } fetcherClient := &HTTPFetcher{ peer: httpPeer, rootURL: httpPeer.GetAddress(), net: uf.net, - client: httpPeer.GetHTTPClient(), + client: httpClient, log: uf.log, config: &uf.config} fetchedBuf, err = fetcherClient.getBlockBytes(ctx, round) diff --git a/cmd/updater/systemd-setup-user.sh b/cmd/updater/systemd-setup-user.sh index fa17a1db2b..c5b145eff7 100755 --- a/cmd/updater/systemd-setup-user.sh +++ b/cmd/updater/systemd-setup-user.sh @@ -21,9 +21,27 @@ setup_user() { sed -e s,@@BINDIR@@,"$bindir", "${SCRIPTPATH}/algorand@.service.template-user" \ > "$homedir/.config/systemd/user/algorand@.service" + if [[ ${HOSTMODE} == true ]]; then + echo "[INFO] Hosted mode - replacing algod with algoh" + sed -i 's/algod/algoh/g' "$homedir/.config/systemd/user/algorand@.service" + fi + systemctl --user daemon-reload } +HOSTMODE=false +while getopts H opt; do + case $opt in + H) + HOSTMODE=true + ;; + ?) + echo "Invalid option: -${OPTARG}" + exit 1 + ;; + esac +done +shift $((OPTIND-1)) if [ "$#" != 1 ]; then echo "Usage: $0 username" diff --git a/cmd/updater/systemd-setup.sh b/cmd/updater/systemd-setup.sh index bad8745137..fc27fd209c 100755 --- a/cmd/updater/systemd-setup.sh +++ b/cmd/updater/systemd-setup.sh @@ -14,9 +14,28 @@ setup_root() { sed ${sedargs} "${SCRIPTPATH}/algorand@.service.template" \ > /lib/systemd/system/algorand@.service + if [[ ${HOSTMODE} == true ]]; then + echo "[INFO] Hosted mode - replacing algod with algoh" + sed -i 's/algod/algoh/g' /lib/systemd/system/algorand@.service + fi + systemctl daemon-reload } +HOSTMODE=false +while getopts H opt; do + case $opt in + H) + HOSTMODE=true + ;; + ?) + echo "Invalid option: -${OPTARG}" + exit 1 + ;; + esac +done +shift $((OPTIND-1)) + if [ "$#" != 2 ] && [ "$#" != 3 ]; then echo "Usage: $0 username group [bindir]" exit 1 diff --git a/network/README-P2P.md b/network/README-P2P.md new file mode 100644 index 0000000000..c67bd53273 --- /dev/null +++ b/network/README-P2P.md @@ -0,0 +1,149 @@ +# P2P Network implementation overview + +Refer to [p2p sub-package overview](./p2p/README.md) for details about p2p sub-components. + +`P2PNetwork` implements the `GossipNode` interface similarly to `WsNetwork`. Both use +the same peer connection management and message broadcast functions but different +transport: lip2p-managed connections and HTTP + WebSocket, respectively. +`P2PNetwork` and `WsNetwork` require `config.NetAddress` to be set in order to start a server. + +In addition, `HybridNetwork` is an aggregate of `P2PNetwork` and `WsNetwork` allowing a node +to interact over both networks. In the case of hybrid operation, both `config.P2PNetAddress` and +`config.NetAddress` are used. + +## General design + +`P2PNetwork` follows the `WsNetwork` approach for peers management and message handling: + - `msgHandler` used process or route the network protocol messages to external handlers + (for example, transaction handler or agreement service) + - `broadcaster` implementing the broadcast functionality (see below) + - mesh thread to maintain `GossipFanout` number of outgoing peers + - HTTP Server for external HTTP services (block, catchpoints) + - `OnNetworkAdvance` listener to react on round advancing + +A key difference is that `P2PNetwork` uses `go-libp2p-pubsub` for TX message handling. +Upon start it subscribes to `/algo/tx/0.1.0` topic and publishes TX messages as needed. +The `pubsub` library divides message handling into two stages: validation and processing. Based on +the validation result, a message is either discarded or accepted for further +broadcasting to other peers. This necessitates having separate handlers for TX messages +in `TxHandler`, as we must synchronously determine whether a transaction group is valid: + - can't ignore fast and broadcast later - will be rejected as a seen message + - can't accept fast to prevent invalid/expired transactions broadcasting + +## Major Components + +### HTTP Services + +`P2PNetwork` uses libp2p's `http` submodule to handle HTTP traffic over libp2p-managed connection. +It is `http.Handler`-compatible so that service handlers are registered the same way as for `WsNetwork`. + +### Phonebook and Peerstore and peer classes + +Originally phonebook was designed as an address registry holding permanent (`-p` cli option +or `phonebook.json` extra configuration file) and dynamic (SRV DNS records) entries. +These entries later can be later retrieved by a peer role +(`PhoneBookEntryRelayRole` or `PhoneBookEntryArchivalRole`). +A new `PeerStore` (built on top of `libp2p.Peerstore`) resembles the original `Phonebook` +by strictly implementing some of its methods and has the remaining `Phonebook`'s methods +with a slightly different signature - `string` vs `peer.AddrInfo` for address representation. +The main issue is that entries in `PeerStore` are identified by `PeerID` +and each peer might have multiple addresses (versus the original WS peers with the only one +`host:port` connectivity option.) + +Both P2PNetwork and WsNetwork have an extra level of peer classification on top of two phonebook's +classes: `PeersConnectedOut`, `PeersConnectedIn`, `PeersPhonebookRelays`, `PeersPhonebookArchivalNodes`. +This allows network clients to be more precise on peers set they want to work with. For example, +ledger service wants `PeersPhonebookArchivalNodes`, and transaction syncer - `PeersConnectedOut`. + + +### wsPeer + +Peers are created in `wsStreamHandler` that is called for both incoming and outgoing connections +(and streams). `incoming` flag is set to true for incoming connection. +At the very beginning of the `wsStreamHandler` one byte read/write happens in order to make sure: + - Stream is operable + - A placeholder for a handshake where some meta-data can be exchanged + +Each peer gets a read channel `handler.readBuffer` where it enqueues incoming messages for routing +to appropriate handler. + +Connected peers are maintained as a `wsPeers` map similarly to the `WsNetwork`. +The main difference between `P2PNetwork` and `WsNetwork` is `http.Client`. Because wsPeers operate +over the multiplexed streams in libp2p-managed connection, a plain `http.Client` would not be able +to connect to a p2p HTTP server. This requires the `wsPeer` constructed in `P2PNetwork` to have a special +libp2p-streams compatible `http.Client` produced by `MakeHTTPClientWithRateLimit` helper method. +It implements a rate-limiting approach similar to the regular http clients from `WsNetwork`. + +### Broadcaster + +`msgBroadcaster` encapsulates a shared broadcasting logic: priority vs bulk messages (and queues), +data preparation, peers retrieving. Broadcast requests eventually hits +`peer.writeNonBlockMsgs` -> `peer.writeLoopSendMsg` -> `conn.WriteMessage`. +See the diagram denoting the broadcast data flow. + +```mermaid +graph LR + + p2pnet[P2PNetwork] + wsnet[WsNetwork] + B[broadcaster] + + p2pnet & wsnet --> B + + subgraph "wsPeer" + direction LR + writeNonBlockMsgs + Conn[conn.WriteMessage] + + subgraph "writeLoop" + writeLoopSendMsg + end + + writeNonBlockMsgs --> writeLoop + writeLoopSendMsg --> Conn + end + + B --> writeNonBlockMsgs + + Conn --> WMP2P & WMWS + + subgraph "wsPeerConnP2P" + WMP2P[WriteMessage] + end + + subgraph "websocket" + WMWS[WriteMessage] + end + + subgraph "libp2p" + stream.Write + end + + WMP2P --> libp2p +``` + +### DHT and Capabilities discovery + +DHT is controlled by the `EnableDHTProviders` configuration option and the capabilities +exposed by a node. These capabilities include: + - `archival`: a listening node with `Archival` config flag set + - `catchpointStoring`: a listening node configured to store catchpoints + - `gossip`: a listening node with `EnableGossipService` config flag set + +When the `P2PNetwork` starts, the node begins advertising its capabilities by running +a background goroutine. By default, the underlying DHT implementation pulls bootstrap nodes from +a peer store and attempts to connect immediately, which is not how go-algorand services operate. +To address this, a new `bootstrapper` abstraction has been added to control bootstrap peer +access using the DHT's `BootstrapFunc` mechanism. The callback function returns empty bootstrap +peers until the `P2PNetwork` starts. + +### Net identity based peers deduplication + +`WsNetwork` net identity was slightly extended to allow ws and p2p nodes cross-check +when running in a hybrid mode: + - `identityTracker` instance is shared between `WsNetwork` and `P2PNetwork` + - identity schema supplied to the `WsNetwork` uses a p2p-node private key based message signer + - `PublicAddress` must be set for hybrid nodes in order to operate properly + +Using the changes above `identityTracker` is able to deduplicate `WsNetwork` peer if it ends up +to be hybrid node already connected to via `P2PNetwork` and other way around. diff --git a/network/limitcaller/rateLimitingTransport.go b/network/limitcaller/rateLimitingTransport.go index 45bc0725ed..de68c9b371 100644 --- a/network/limitcaller/rateLimitingTransport.go +++ b/network/limitcaller/rateLimitingTransport.go @@ -50,19 +50,16 @@ var ErrConnectionQueueingTimeout = errors.New("rateLimitingTransport: queueing t // according to the entries in the phonebook. func MakeRateLimitingTransport(phonebook ConnectionTimeStore, queueingTimeout time.Duration, dialer *Dialer, maxIdleConnsPerHost int) RateLimitingTransport { defaultTransport := http.DefaultTransport.(*http.Transport) - return RateLimitingTransport{ - phonebook: phonebook, - innerTransport: &http.Transport{ - Proxy: defaultTransport.Proxy, - DialContext: dialer.innerDialContext, - MaxIdleConns: defaultTransport.MaxIdleConns, - IdleConnTimeout: defaultTransport.IdleConnTimeout, - TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout, - ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout, - MaxIdleConnsPerHost: maxIdleConnsPerHost, - }, - queueingTimeout: queueingTimeout, + innerTransport := &http.Transport{ + Proxy: defaultTransport.Proxy, + DialContext: dialer.innerDialContext, + MaxIdleConns: defaultTransport.MaxIdleConns, + IdleConnTimeout: defaultTransport.IdleConnTimeout, + TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout, + ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout, + MaxIdleConnsPerHost: maxIdleConnsPerHost, } + return MakeRateLimitingTransportWithRoundTripper(phonebook, queueingTimeout, innerTransport, nil, maxIdleConnsPerHost) } // MakeRateLimitingTransportWithRoundTripper creates a rate limiting http transport that would limit the requests rate diff --git a/network/netidentity.go b/network/netidentity.go index 30755f0648..74f9b09e62 100644 --- a/network/netidentity.go +++ b/network/netidentity.go @@ -23,6 +23,7 @@ import ( "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-deadlock" ) // netidentity.go implements functionality to participate in an "Identity Challenge Exchange" @@ -461,12 +462,14 @@ func (noopIdentityTracker) removeIdentity(p *wsPeer) {} // mapping from PublicKeys exchanged in identity challenges to a peer // this structure is not thread-safe; it is protected by wn.peersLock or p2p.wsPeersLock type publicKeyIdentTracker struct { + mu deadlock.Mutex peersByID map[crypto.PublicKey]*wsPeer } // NewIdentityTracker returns a new publicKeyIdentTracker func NewIdentityTracker() *publicKeyIdentTracker { return &publicKeyIdentTracker{ + mu: deadlock.Mutex{}, peersByID: make(map[crypto.PublicKey]*wsPeer), } } @@ -475,6 +478,8 @@ func NewIdentityTracker() *publicKeyIdentTracker { // returns false if it was unable to load the peer into the given identity // or true otherwise (if the peer was already there, or if it was added) func (t *publicKeyIdentTracker) setIdentity(p *wsPeer) bool { + t.mu.Lock() + defer t.mu.Unlock() existingPeer, exists := t.peersByID[p.identity] if !exists { // the identity is not occupied, so set it and return true @@ -489,6 +494,8 @@ func (t *publicKeyIdentTracker) setIdentity(p *wsPeer) bool { // removeIdentity removes the entry in the peersByID map if it exists // and is occupied by the given peer func (t *publicKeyIdentTracker) removeIdentity(p *wsPeer) { + t.mu.Lock() + defer t.mu.Unlock() if t.peersByID[p.identity] == p { delete(t.peersByID, p.identity) } diff --git a/network/p2p/README.md b/network/p2p/README.md index 8490e391b6..b95e5be32f 100644 --- a/network/p2p/README.md +++ b/network/p2p/README.md @@ -23,7 +23,7 @@ Libp2p also provides an implementation of a message-based gossip protocol, Gossi Algorand's current network protocol sends messages between peers over bidirectional WebSocket connections. Nodes that are configured to enable message-forwarding (including -nodes currently called "relays") validate incoming messages, then selectively forward +nodes currently called "relays") validate incoming messages, then selectively forward messages to other connected peers. This network implementation (`WebsocketNetwork`) sits behind the `GossipNode` interface in the network package. @@ -36,8 +36,8 @@ via peer connections managed by libp2p. The `P2PNetwork` implementation uses and [peer IDs](https://docs.libp2p.io/concepts/fundamentals/peers/#peer-ids-in-multiaddrs) to establish connections and identify peers. -Currently transactions (protocol tag `TX`) are distributed using the GossipSub protocol, -while all other messages are forwarded over a custom message protocol `/algorand-ws/1.0.0` +Currently transactions (protocol tag `TX`) are distributed using the GossipSub protocol (see [pubsub.go](./pubsub.go)), +while all other messages are forwarded over the pre-existing custom message protocol `/algorand-ws/1.0.0` (see [streams.go](./streams.go)) that uses the same message serialization as the existing `WebsocketNetwork` implementation. These two protocols are multiplexed over a single connection using libp2p streams. @@ -63,3 +63,85 @@ graph LR AW --> WS S --> T ``` + +The underlying libp2p implementation is abstracted as `p2p.Service` and is initialized in two steps: +1. Creating a p2p `Host` +2. Creating a service `serviceImpl` object + +`Host` is also used for p2p HTTP server and DHT Discovery service creation. It is also useful for unit testing. Note, `Host` is created with `NoListenAddrs` options that prevents automatic listening and networking until the `Service.Start()` is called. This follows the designs of Algod services (including the WsNetwork service). + +### Connection limiting + +libp2p's `ResourceManager` is used to limit the number of connections up to `cfg.P2PIncomingConnectionsLimit`. + +### DHT and capabilities + +Provides helper methods to construct DHT discovery service using `go-libp2p-kad-dht` library. +High level [CapabilitiesDiscovery](./capabilities.go) class supports retrieving (`PeersForCapability`) +peers by a given capability(-ies) or advertising own capabilities (`AdvertiseCapabilities`). + +Note, by default private and non-routable addresses are filtered (see `AddrsFactory`), +libp2p's `ObservedAddrManager` can track its own public address and makes it available +(and so that discoverable with DHT) if it was observed at least 4 times in 30 minutes (as of libp2p@v0.33.2). + +```mermaid +graph LR + + subgraph "node" + Cap[Capabilities] + end + + subgraph "P2P Implementation" + P2P[P2PNetwork] + AdvCap[AdvertiseCapabilities] + end + + P2P --> AdvCap + Cap -.-> P2P + + subgraph "libp2p" + Adv[Advertise] + Addr[Addrs] + OAM[ObservedAddrManager] + AF[AddrFactory] + KAD["/kad/1.0.0"] + end + + OAM -.-> Addr + AF -.-> Addr + AdvCap --> Adv + + subgraph "libp2p-kad-dht" + Pro[Provide] + end + + Addr -.-> Pro + Adv --> Pro + Pro --> KAD +``` + +### HTTP over libp2p connection + +libp2p@0.33 added ability to multiplex HTTP traffic in p2p connection. +A custom `/algorand-http/1.0.0` stream is utilized to expose HTTP server and allow +network service clients (catchup, catchpoint, txsync) to register its own handlers +similarly to the legacy ws-net implementation. + +### Peerstore + +In-memory peerstore implements `libp2p.Peerstore` and go-algorand `Phonebook` interfaces. +Peer classes (relays, archival, etc) and persistent peers (i.e. peers from command line or phonebook.json) +are supported. Possible enhancement is to save/load peerstore to/from disk to tolerate bootstrap nodes failures. + +### Logging + +lip2p uses zap logger as a separate `ipfs/go-log/v2` module. `EnableP2PLogging` helper adds +go-algorand's `logrus` as a custom zap core so that all libp2p logs go through go-algorand logging facility. +Unfortunately `ipfs/go-log/v2` has a primary logging core as module variable that makes impossible +to have custom `logrus` sub-loggers in unit tests. + +### Metrics + +`go-libp2p` uses Prometheus as a metrics library, `go-libp2p-kad-dht` relies on OpenCensus library. +go-algorand has two collectors (see `util/metrics`) for both Prometheus and OpenCensus for +counters and gauges with labels. Other types (summary, histogram, distribution) are not supported at the moment. \ No newline at end of file diff --git a/network/p2p/capabilities.go b/network/p2p/capabilities.go index e5781aa389..7a418767d1 100644 --- a/network/p2p/capabilities.go +++ b/network/p2p/capabilities.go @@ -56,13 +56,13 @@ type CapabilitiesDiscovery struct { wg sync.WaitGroup } -// Advertise implements the discovery.Discovery/discovery.Advertiser interface -func (c *CapabilitiesDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { +// advertise implements the discovery.Discovery/discovery.Advertiser interface +func (c *CapabilitiesDiscovery) advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { return c.disc.Advertise(ctx, ns, opts...) } -// FindPeers implements the discovery.Discovery/discovery.Discoverer interface -func (c *CapabilitiesDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { +// findPeers implements the discovery.Discovery/discovery.Discoverer interface +func (c *CapabilitiesDiscovery) findPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { return c.disc.FindPeers(ctx, ns, opts...) } @@ -78,8 +78,8 @@ func (c *CapabilitiesDiscovery) Host() host.Host { return c.dht.Host() } -// AddPeer adds a given peer.AddrInfo to the Host's Peerstore, and the DHT's routing table -func (c *CapabilitiesDiscovery) AddPeer(p peer.AddrInfo) (bool, error) { +// addPeer adds a given peer.AddrInfo to the Host's Peerstore, and the DHT's routing table +func (c *CapabilitiesDiscovery) addPeer(p peer.AddrInfo) (bool, error) { c.Host().Peerstore().AddAddrs(p.ID, p.Addrs, libpeerstore.AddressTTL) return c.dht.RoutingTable().TryAddPeer(p.ID, true, true) } @@ -93,7 +93,7 @@ func (c *CapabilitiesDiscovery) PeersForCapability(capability Capability, n int) var peers []peer.AddrInfo // +1 because it can include self but we exclude self from the returned list // that might confuse the caller (and tests assertions) - peersChan, err := c.FindPeers(ctx, string(capability), discovery.Limit(n+1)) + peersChan, err := c.findPeers(ctx, string(capability), discovery.Limit(n+1)) if err != nil { return nil, err } @@ -128,7 +128,7 @@ func (c *CapabilitiesDiscovery) AdvertiseCapabilities(capabilities ...Capability var err error advertisementInterval := maxAdvertisementInterval for _, capa := range capabilities { - ttl, err0 := c.Advertise(c.dht.Context(), string(capa)) + ttl, err0 := c.advertise(c.dht.Context(), string(capa)) if err0 != nil { err = err0 c.log.Errorf("failed to advertise for capability %s: %v", capa, err0) diff --git a/network/p2p/capabilities_test.go b/network/p2p/capabilities_test.go index 881860f647..7057eca017 100644 --- a/network/p2p/capabilities_test.go +++ b/network/p2p/capabilities_test.go @@ -62,7 +62,7 @@ func TestCapabilities_Discovery(t *testing.T) { for _, capD := range caps { peersAdded := 0 for _, addr := range addrs { - added, err := capD.AddPeer(addr) + added, err := capD.addPeer(addr) require.NoError(t, err) require.True(t, added) peersAdded++ @@ -83,7 +83,7 @@ func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT { tmpdir := t.TempDir() pk, err := GetPrivKey(cfg, tmpdir) require.NoError(t, err) - ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{}, "") + ps, err := peerstore.NewPeerStore(nil, "") require.NoError(t, err) h, err := libp2p.New( libp2p.ListenAddrStrings("/dns4/localhost/tcp/0"), @@ -134,7 +134,7 @@ func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*Cap tmpdir := t.TempDir() pk, err := GetPrivKey(cfg, tmpdir) require.NoError(t, err) - ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{}, "") + ps, err := peerstore.NewPeerStore(nil, "") require.NoError(t, err) h, err := libp2p.New( libp2p.ListenAddrStrings("/dns4/localhost/tcp/0"), diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index e908f148d8..3b467b0b27 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -61,7 +61,6 @@ type Service interface { IDSigner() *PeerIDChallengeSigner AddrInfo() peer.AddrInfo // return addrInfo for self - DialNode(context.Context, *peer.AddrInfo) error DialPeersUntilTargetCount(targetConnCount int) ClosePeer(peer.ID) error @@ -177,7 +176,7 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error) } // MakeService creates a P2P service instance -func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, bootstrapPeers []*peer.AddrInfo) (*serviceImpl, error) { +func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler) (*serviceImpl, error) { sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService) h.Network().Notify(sm) @@ -239,7 +238,7 @@ func (s *serviceImpl) IDSigner() *PeerIDChallengeSigner { // DialPeersUntilTargetCount attempts to establish connections to the provided phonebook addresses func (s *serviceImpl) DialPeersUntilTargetCount(targetConnCount int) { ps := s.host.Peerstore().(*pstore.PeerStore) - peerIDs := ps.GetAddresses(targetConnCount, phonebook.PhoneBookEntryRelayRole) + addrInfos := ps.GetAddresses(targetConnCount, phonebook.PhoneBookEntryRelayRole) conns := s.host.Network().Conns() var numOutgoingConns int for _, conn := range conns { @@ -247,8 +246,7 @@ func (s *serviceImpl) DialPeersUntilTargetCount(targetConnCount int) { numOutgoingConns++ } } - for _, peerInfo := range peerIDs { - peerInfo := peerInfo.(*peer.AddrInfo) + for _, peerInfo := range addrInfos { // if we are at our target count stop trying to connect if numOutgoingConns >= targetConnCount { return @@ -257,15 +255,15 @@ func (s *serviceImpl) DialPeersUntilTargetCount(targetConnCount int) { if len(s.host.Network().ConnsToPeer(peerInfo.ID)) > 0 { continue } - err := s.DialNode(context.Background(), peerInfo) // leaving the calls as blocking for now, to not over-connect beyond fanout + err := s.dialNode(context.Background(), peerInfo) // leaving the calls as blocking for now, to not over-connect beyond fanout if err != nil { s.log.Warnf("failed to connect to peer %s: %v", peerInfo.ID, err) } } } -// DialNode attempts to establish a connection to the provided peer -func (s *serviceImpl) DialNode(ctx context.Context, peer *peer.AddrInfo) error { +// dialNode attempts to establish a connection to the provided peer +func (s *serviceImpl) dialNode(ctx context.Context, peer *peer.AddrInfo) error { // don't try connecting to ourselves if peer.ID == s.host.ID() { return nil diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go index 3eda0d3686..5ae9c6aa04 100644 --- a/network/p2p/peerstore/peerstore.go +++ b/network/p2p/peerstore/peerstore.go @@ -22,12 +22,13 @@ import ( "math/rand" "time" - "github.com/algorand/go-algorand/network/phonebook" - "github.com/algorand/go-deadlock" "github.com/libp2p/go-libp2p/core/peer" libp2p "github.com/libp2p/go-libp2p/core/peerstore" mempstore "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "golang.org/x/exp/slices" + + "github.com/algorand/go-algorand/network/phonebook" + "github.com/algorand/go-deadlock" ) // when using GetAddresses with getAllAddresses, all the addresses will be retrieved, regardless @@ -76,14 +77,8 @@ func NewPeerStore(addrInfo []*peer.AddrInfo, network string) (*PeerStore, error) return nil, fmt.Errorf("cannot initialize a peerstore: %w", err) } - // initialize peerstore with addresses - peers := make([]interface{}, len(addrInfo)) - for i := 0; i < len(addrInfo); i++ { - peers[i] = addrInfo[i] - } - pstore := &PeerStore{peerStoreCAB: ps} - pstore.AddPersistentPeers(peers, network, phonebook.PhoneBookEntryRelayRole) + pstore.AddPersistentPeers(addrInfo, network, phonebook.PhoneBookEntryRelayRole) return pstore, nil } @@ -102,7 +97,7 @@ func MakePhonebook(connectionsRateLimitingCount uint, } // GetAddresses returns up to N addresses, but may return fewer -func (ps *PeerStore) GetAddresses(n int, role phonebook.PhoneBookEntryRoles) []interface{} { +func (ps *PeerStore) GetAddresses(n int, role phonebook.PhoneBookEntryRoles) []*peer.AddrInfo { return shuffleSelect(ps.filterRetryTime(time.Now(), role), n) } @@ -210,7 +205,7 @@ func (ps *PeerStore) UpdateConnectionTime(addrOrPeerID string, provisionalTime t } // ReplacePeerList replaces the peer list for the given networkName and role. -func (ps *PeerStore) ReplacePeerList(addressesThey []interface{}, networkName string, role phonebook.PhoneBookEntryRoles) { +func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName string, role phonebook.PhoneBookEntryRoles) { // prepare a map of items we'd like to remove. removeItems := make(map[peer.ID]bool, 0) peerIDs := ps.Peers() @@ -226,8 +221,7 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []interface{}, networkName st } } - for _, addr := range addressesThey { - info := addr.(*peer.AddrInfo) + for _, info := range addressesThey { data, _ := ps.Get(info.ID, addressDataKey) if data != nil { // we already have this. @@ -255,17 +249,15 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []interface{}, networkName st // AddPersistentPeers stores addresses of peers which are persistent. // i.e. they won't be replaced by ReplacePeerList calls -func (ps *PeerStore) AddPersistentPeers(dnsAddresses []interface{}, networkName string, role phonebook.PhoneBookEntryRoles) { - for _, addr := range dnsAddresses { - info := addr.(*peer.AddrInfo) +func (ps *PeerStore) AddPersistentPeers(addrInfo []*peer.AddrInfo, networkName string, role phonebook.PhoneBookEntryRoles) { + for _, info := range addrInfo { data, _ := ps.Get(info.ID, addressDataKey) if data != nil { // we already have this. // Make sure the persistence field is set to true ad := data.(addressData) ad.persistent = true - _ = ps.Put(info.ID, addressDataKey, data) - + _ = ps.Put(info.ID, addressDataKey, ad) } else { // we don't have this item. add it. ps.AddAddrs(info.ID, info.Addrs, libp2p.PermanentAddrTTL) @@ -328,8 +320,8 @@ func (ps *PeerStore) popNElements(n int, peerID peer.ID) { _ = ps.Put(peerID, addressDataKey, ad) } -func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []interface{} { - o := make([]interface{}, 0, len(ps.Peers())) +func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []*peer.AddrInfo { + o := make([]*peer.AddrInfo, 0, len(ps.Peers())) for _, peerID := range ps.Peers() { data, _ := ps.Get(peerID, addressDataKey) if data != nil { @@ -344,11 +336,11 @@ func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryR return o } -func shuffleSelect(set []interface{}, n int) []interface{} { +func shuffleSelect(set []*peer.AddrInfo, n int) []*peer.AddrInfo { if n >= len(set) || n == getAllAddresses { // return shuffled copy of everything out := slices.Clone(set) - shuffleStrings(out) + shuffleAddrInfos(out) return out } // Pick random indexes from the set @@ -361,13 +353,13 @@ func shuffleSelect(set []interface{}, n int) []interface{} { } } } - out := make([]interface{}, n) + out := make([]*peer.AddrInfo, n) for i, index := range indexSample { out[i] = set[index] } return out } -func shuffleStrings(set []interface{}) { +func shuffleAddrInfos(set []*peer.AddrInfo) { rand.Shuffle(len(set), func(i, j int) { set[i], set[j] = set[j], set[i] }) } diff --git a/network/p2p/peerstore/peerstore_test.go b/network/p2p/peerstore/peerstore_test.go index e855013d76..d82b34595d 100644 --- a/network/p2p/peerstore/peerstore_test.go +++ b/network/p2p/peerstore/peerstore_test.go @@ -91,8 +91,7 @@ func TestPeerstore(t *testing.T) { func testPhonebookAll(t *testing.T, set []*peer.AddrInfo, ph *PeerStore) { actual := ph.GetAddresses(len(set), PhoneBookEntryRelayRole) - for _, got := range actual { - info := got.(*peer.AddrInfo) + for _, info := range actual { ok := false for _, known := range set { if info.ID == known.ID { @@ -101,13 +100,12 @@ func testPhonebookAll(t *testing.T, set []*peer.AddrInfo, ph *PeerStore) { } } if !ok { - t.Errorf("get returned junk %#v", got) + t.Errorf("get returned junk %#v", info) } } for _, known := range set { ok := false - for _, got := range actual { - info := got.(*peer.AddrInfo) + for _, info := range actual { if info.ID == known.ID { ok = true break @@ -128,8 +126,7 @@ func testPhonebookUniform(t *testing.T, set []*peer.AddrInfo, ph *PeerStore, get } for i := 0; i < uniformityTestLength; i++ { actual := ph.GetAddresses(getsize, PhoneBookEntryRelayRole) - for _, xa := range actual { - info := xa.(*peer.AddrInfo) + for _, info := range actual { if _, ok := counts[info.ID.String()]; ok { counts[info.ID.String()]++ } @@ -226,11 +223,11 @@ func TestMultiPhonebook(t *testing.T) { require.NoError(t, err) infoSet = append(infoSet, info) } - pha := make([]interface{}, 0) + pha := make([]*peer.AddrInfo, 0) for _, e := range infoSet[:5] { pha = append(pha, e) } - phb := make([]interface{}, 0) + phb := make([]*peer.AddrInfo, 0) for _, e := range infoSet[5:] { phb = append(phb, e) } @@ -252,7 +249,7 @@ func TestMultiPhonebookPersistentPeers(t *testing.T) { info, err := peerInfoFromDomainPort("a:4041") require.NoError(t, err) - persistentPeers := []interface{}{info} + persistentPeers := []*peer.AddrInfo{info} set := []string{"b:4042", "c:4043", "d:4044", "e:4045", "f:4046", "g:4047", "h:4048", "i:4049", "j:4010"} infoSet := make([]*peer.AddrInfo, 0) for _, addr := range set { @@ -261,11 +258,11 @@ func TestMultiPhonebookPersistentPeers(t *testing.T) { infoSet = append(infoSet, info) } - pha := make([]interface{}, 0) + pha := make([]*peer.AddrInfo, 0) for _, e := range infoSet[:5] { pha = append(pha, e) } - phb := make([]interface{}, 0) + phb := make([]*peer.AddrInfo, 0) for _, e := range infoSet[5:] { phb = append(phb, e) } @@ -279,10 +276,8 @@ func TestMultiPhonebookPersistentPeers(t *testing.T) { testPhonebookAll(t, append(infoSet, info), ph) allAddresses := ph.GetAddresses(len(set)+len(persistentPeers), PhoneBookEntryRelayRole) for _, pp := range persistentPeers { - pp := pp.(*peer.AddrInfo) found := false for _, addr := range allAddresses { - addr := addr.(*peer.AddrInfo) if addr.ID == pp.ID { found = true break @@ -303,11 +298,11 @@ func TestMultiPhonebookDuplicateFiltering(t *testing.T) { infoSet = append(infoSet, info) } - pha := make([]interface{}, 0) + pha := make([]*peer.AddrInfo, 0) for _, e := range infoSet[:7] { pha = append(pha, e) } - phb := make([]interface{}, 0) + phb := make([]*peer.AddrInfo, 0) for _, e := range infoSet[3:] { phb = append(phb, e) } @@ -343,7 +338,7 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { // Test the addresses are populated in the phonebook and a // time can be added to one of them - entries.ReplacePeerList([]interface{}{info1, info2}, "default", PhoneBookEntryRelayRole) + entries.ReplacePeerList([]*peer.AddrInfo{info1, info2}, "default", PhoneBookEntryRelayRole) addrInPhonebook, waitTime, provisionalTime := entries.GetConnectionWaitTime(string(info1.ID)) require.Equal(t, true, addrInPhonebook) require.Equal(t, time.Duration(0), waitTime) @@ -458,14 +453,14 @@ func TestPhonebookRoles(t *testing.T) { relaysSet := []string{"relay1:4040", "relay2:4041", "relay3:4042"} archiverSet := []string{"archiver1:1111", "archiver2:1112", "archiver3:1113"} - infoRelaySet := make([]interface{}, 0) + infoRelaySet := make([]*peer.AddrInfo, 0) for _, addr := range relaysSet { info, err := peerInfoFromDomainPort(addr) require.NoError(t, err) infoRelaySet = append(infoRelaySet, info) } - infoArchiverSet := make([]interface{}, 0) + infoArchiverSet := make([]*peer.AddrInfo, 0) for _, addr := range archiverSet { info, err := peerInfoFromDomainPort(addr) require.NoError(t, err) @@ -485,12 +480,10 @@ func TestPhonebookRoles(t *testing.T) { entries := ph.GetAddresses(l, role) if role == PhoneBookEntryRelayRole { for _, entry := range entries { - entry := entry.(*peer.AddrInfo) require.Contains(t, string(entry.ID), "relay") } } else if role == PhoneBookEntryArchiverRole { for _, entry := range entries { - entry := entry.(*peer.AddrInfo) require.Contains(t, string(entry.ID), "archiver") } } diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 37c6cfcd52..f9dc04b785 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -261,15 +261,21 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo } log.Infof("P2P host created: peer ID %s addrs %s", h.ID(), h.Addrs()) - net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler, addrInfo) + net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler) if err != nil { return nil, err } + peerIDs := pstore.Peers() + addrInfos := make([]*peer.AddrInfo, 0, len(peerIDs)) + for _, peerID := range peerIDs { + addrInfo := pstore.PeerInfo(peerID) + addrInfos = append(addrInfos, &addrInfo) + } bootstrapper := &bootstrapper{ cfg: cfg, networkID: networkID, - phonebookPeers: addrInfo, + phonebookPeers: addrInfos, resolveController: dnsaddr.NewMultiaddrDNSResolveController(cfg.DNSSecurityTXTEnforced(), ""), log: net.log, } @@ -426,7 +432,7 @@ func (n *P2PNetwork) meshThreadInner() int { } peers := mergeP2PAddrInfoResolvedAddresses(dnsPeers, dhtPeers) - replace := make([]interface{}, 0, len(peers)) + replace := make([]*peer.AddrInfo, 0, len(peers)) for i := range peers { replace = append(replace, &peers[i]) } @@ -631,9 +637,8 @@ func (n *P2PNetwork) GetPeers(options ...PeerOption) []Peer { n.wsPeersLock.RUnlock() case PeersPhonebookRelays: const maxNodes = 100 - peerIDs := n.pstore.GetAddresses(maxNodes, phonebook.PhoneBookEntryRelayRole) - for _, peerInfo := range peerIDs { - peerInfo := peerInfo.(*peer.AddrInfo) + addrInfos := n.pstore.GetAddresses(maxNodes, phonebook.PhoneBookEntryRelayRole) + for _, peerInfo := range addrInfos { if peerCore, ok := addrInfoToWsPeerCore(n, peerInfo); ok { peers = append(peers, &peerCore) } @@ -767,6 +772,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea maxIdleConnsPerHost := int(n.config.ConnectionsRateLimitingCount) client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout, maxIdleConnsPerHost) if err != nil { + n.log.Warnf("Cannot construct HTTP Client for %s: %v", p2pPeer, err) client = nil } var netIdentPeerID algocrypto.PublicKey @@ -782,7 +788,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea peerCore := makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, client, addr) wsp := &wsPeer{ wsPeerCore: peerCore, - conn: &wsPeerConnP2PImpl{stream: stream}, + conn: &wsPeerConnP2P{stream: stream}, outgoing: !incoming, identity: netIdentPeerID, } @@ -844,7 +850,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea // peerRemoteClose called from wsPeer to report that it has closed func (n *P2PNetwork) peerRemoteClose(peer *wsPeer, reason disconnectReason) { - remotePeerID := peer.conn.(*wsPeerConnP2PImpl).stream.Conn().RemotePeer() + remotePeerID := peer.conn.(*wsPeerConnP2P).stream.Conn().RemotePeer() n.wsPeersLock.Lock() n.identityTracker.removeIdentity(peer) delete(n.wsPeers, remotePeerID) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 7cb35a0e82..302aa76147 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -345,11 +345,6 @@ func (s *mockService) AddrInfo() peer.AddrInfo { } } -func (s *mockService) DialNode(ctx context.Context, peer *peer.AddrInfo) error { - s.peers[peer.ID] = *peer - return nil -} - func (s *mockService) DialPeersUntilTargetCount(targetConnCount int) { } @@ -787,7 +782,7 @@ func TestP2PHTTPHandler(t *testing.T) { // zero clients allowed, rate limiting window (10s) is greater than queue deadline (1s) pstore, err := peerstore.MakePhonebook(0, 10*time.Second) require.NoError(t, err) - pstore.AddPersistentPeers([]interface{}{&peerInfoA}, "net", phonebook.PhoneBookEntryRelayRole) + pstore.AddPersistentPeers([]*peer.AddrInfo{&peerInfoA}, "net", phonebook.PhoneBookEntryRelayRole) httpClient, err = p2p.MakeHTTPClientWithRateLimit(&peerInfoA, pstore, 1*time.Second, 1) require.NoError(t, err) _, err = httpClient.Get("/test") diff --git a/network/p2pPeer.go b/network/p2pPeer.go index a5065f01bb..9a0ce2699d 100644 --- a/network/p2pPeer.go +++ b/network/p2pPeer.go @@ -31,15 +31,15 @@ import ( mnet "github.com/multiformats/go-multiaddr/net" ) -type wsPeerConnP2PImpl struct { +type wsPeerConnP2P struct { stream network.Stream } -func (c *wsPeerConnP2PImpl) RemoteAddrString() string { +func (c *wsPeerConnP2P) RemoteAddrString() string { return c.stream.Conn().RemoteMultiaddr().String() } -func (c *wsPeerConnP2PImpl) NextReader() (int, io.Reader, error) { +func (c *wsPeerConnP2P) NextReader() (int, io.Reader, error) { // read length var lenbuf [4]byte _, err := io.ReadFull(c.stream, lenbuf[:]) @@ -54,7 +54,7 @@ func (c *wsPeerConnP2PImpl) NextReader() (int, io.Reader, error) { return websocket.BinaryMessage, io.LimitReader(c.stream, int64(msglen)), nil } -func (c *wsPeerConnP2PImpl) WriteMessage(_ int, buf []byte) error { +func (c *wsPeerConnP2P) WriteMessage(_ int, buf []byte) error { // simple message framing: // 1. write encoding of the length var lenbuf [4]byte @@ -69,13 +69,13 @@ func (c *wsPeerConnP2PImpl) WriteMessage(_ int, buf []byte) error { } // Do nothing for now since this doesn't actually close the connection just sends the close message -func (c *wsPeerConnP2PImpl) CloseWithMessage([]byte, time.Time) error { +func (c *wsPeerConnP2P) CloseWithMessage([]byte, time.Time) error { return nil } -func (c *wsPeerConnP2PImpl) SetReadLimit(int64) {} +func (c *wsPeerConnP2P) SetReadLimit(int64) {} -func (c *wsPeerConnP2PImpl) CloseWithoutFlush() error { +func (c *wsPeerConnP2P) CloseWithoutFlush() error { err := c.stream.Close() if err != nil && err != yamux.ErrStreamClosed && err != yamux.ErrSessionShutdown && err != yamux.ErrStreamReset { return err @@ -83,9 +83,9 @@ func (c *wsPeerConnP2PImpl) CloseWithoutFlush() error { return nil } -func (c *wsPeerConnP2PImpl) UnderlyingConn() net.Conn { return nil } +func (c *wsPeerConnP2P) UnderlyingConn() net.Conn { return nil } -func (c *wsPeerConnP2PImpl) RemoteAddr() net.Addr { +func (c *wsPeerConnP2P) RemoteAddr() net.Addr { netaddr, err := mnet.ToNetAddr(c.stream.Conn().RemoteMultiaddr()) if err != nil { logging.Base().Errorf("Error converting multiaddr to netaddr: %v", err) diff --git a/network/phonebook/phonebook.go b/network/phonebook/phonebook.go index 634ca9c16c..b3aeafb0fa 100644 --- a/network/phonebook/phonebook.go +++ b/network/phonebook/phonebook.go @@ -204,7 +204,7 @@ func (e *phonebookImpl) AddPersistentPeers(dnsAddresses []string, networkName st // we already have this. // Make sure the persistence field is set to true pbData.persistent = true - + e.data[addr] = pbData } else { // we don't have this item. add it. e.data[addr] = makePhonebookEntryData(networkName, role, true) diff --git a/node/node.go b/node/node.go index b6118aadc0..04d2ced84c 100644 --- a/node/node.go +++ b/node/node.go @@ -393,10 +393,10 @@ func (node *AlgorandFullNode) Start() error { // Capabilities returns the node's capabilities for advertising to other nodes. func (node *AlgorandFullNode) Capabilities() []p2p.Capability { var caps []p2p.Capability - if node.config.Archival { + if node.config.Archival && node.config.IsGossipServer() { caps = append(caps, p2p.Archival) } - if node.config.StoresCatchpoints() { + if node.config.StoresCatchpoints() && node.config.IsGossipServer() { caps = append(caps, p2p.Catchpoints) } if node.config.EnableGossipService && node.config.IsGossipServer() {