diff --git a/packages/peer-test-app/src/App.tsx b/packages/peer-test-app/src/App.tsx index 0fcd2d688..170c5f9de 100644 --- a/packages/peer-test-app/src/App.tsx +++ b/packages/peer-test-app/src/App.tsx @@ -14,6 +14,7 @@ declare global { interface Window { broadcast: (message: string) => void; flood: (message: string) => void; + peer: Peer; } } @@ -28,6 +29,9 @@ function App() { return } + // For debugging + window.peer = peer; + // Subscribe to messages from remote peers const unsubscribeMessage = peer.subscribeMessage((peerId, message) => { console.log(`${peerId.toString()} > ${message}`) diff --git a/packages/peer/src/constants.ts b/packages/peer/src/constants.ts index 180ca3607..b7f995deb 100644 --- a/packages/peer/src/constants.ts +++ b/packages/peer/src/constants.ts @@ -21,6 +21,13 @@ export const RELAY_TAG = { value: 100 }; +// Interval time in ms to check connection with ping for connected peer +// Currently only checking for relay node +export const CONN_CHECK_INTERVAL = 10000; // 10 seconds + +// Delay time in ms to redial relay node on failing to connect +export const RELAY_REDIAL_DELAY = 5000; // 5 sconds + // Peer connection manager config constants // Number of max concurrent dials per peer diff --git a/packages/peer/src/index.ts b/packages/peer/src/index.ts index 684624661..ae68317b6 100644 --- a/packages/peer/src/index.ts +++ b/packages/peer/src/index.ts @@ -24,7 +24,7 @@ import { multiaddr, Multiaddr } from '@multiformats/multiaddr'; import { floodsub } from '@libp2p/floodsub'; import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; -import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG } from './constants.js'; +import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, CONN_CHECK_INTERVAL } from './constants.js'; export const CHAT_PROTOCOL = '/chat/1.0.0'; export const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star'; @@ -36,10 +36,11 @@ export class Peer { _wrtcStar: WebRTCStarTuple _relayNodeMultiaddr?: Multiaddr - _remotePeerIds: PeerId[] = [] + _remotePeerIds: Set = new Set() _peerStreamMap: Map> = new Map() _messageHandlers: Array<(peerId: PeerId, message: any) => void> = [] _topicHandlers: Map void>> = new Map() + _peerHeartbeatIntervalIdsMap: Map = new Map(); constructor (nodejs?: boolean) { // Instantiation in nodejs. @@ -110,18 +111,7 @@ export class Peer { // Dial to the HOP enabled relay node if available if (this._relayNodeMultiaddr) { - const relayMultiaddr = this._relayNodeMultiaddr; - - console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`); - await this._node.dial(relayMultiaddr); - - // Tag the relay node with a high value to prioritize it's connection - // in connection pruning on crossing peer's maxConnections limit - const relayPeerId = this._node.getPeers().find( - peerId => peerId.toString() === relayMultiaddr.getPeerId() - ); - assert(relayPeerId); - this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value }); + await this._dialRelay(); } // Listen for change in stored multiaddrs @@ -144,9 +134,9 @@ export class Peer { }); // Listen for peers connection - this._node.connectionManager.addEventListener('peer:connect', (evt) => { + this._node.connectionManager.addEventListener('peer:connect', async (evt) => { console.log('event peer:connect', evt); - this._handleConnect(evt.detail); + await this._handleConnect(evt.detail); }); // Listen for peers disconnecting @@ -175,7 +165,8 @@ export class Peer { this._node.pubsub.removeEventListener('message'); await this._node.unhandle(CHAT_PROTOCOL); - const hangUpPromises = this._remotePeerIds.map(async peerId => this._node?.hangUp(peerId)); + this._remotePeerIds.forEach(remotePeerId => this._stopHeartbeatChecks(remotePeerId)); + const hangUpPromises = [...this._remotePeerIds].map(async peerId => this._node?.hangUp(peerId)); await Promise.all(hangUpPromises); } @@ -232,32 +223,138 @@ export class Peer { return unsubscribe; } + async _dialRelay (): Promise { + assert(this._relayNodeMultiaddr); + assert(this._node); + const relayMultiaddr = this._relayNodeMultiaddr; + + // Keep dialling relay node until it connects + while (true) { + try { + console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`); + const connection = await this._node.dial(relayMultiaddr); + const relayPeerId = connection.remotePeer; + + // TODO: Check if tag already exists. When checking tags issue with relay node connect event + // Tag the relay node with a high value to prioritize it's connection + // in connection pruning on crossing peer's maxConnections limit + this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value }); + + // Start heartbeat check for relay node + await this._startHeartbeatChecks( + relayPeerId, + async () => await this._handleRelayDisconnect(relayPeerId) + ); + + break; + } catch (err) { + console.log(`Could not dial relay ${relayMultiaddr.toString()}`, err); + + // TODO: Use wait method from util package. + // Issue using util package in react app. + await new Promise(resolve => setTimeout(resolve, RELAY_REDIAL_DELAY)); + } + } + } + + async _handleRelayDisconnect (relayPeerId: PeerId): Promise { + assert(this._node); + + // Close existing connection of relay node + console.log(`closing connections for ${relayPeerId}`); + await this._node.hangUp(relayPeerId); + console.log('closed'); + + // Reconnect to relay node + await this._dialRelay(); + } + _handleDiscovery (peer: PeerInfo): void { // Check connected peers as they are discovered repeatedly. - if (!this._remotePeerIds.some(remotePeerId => remotePeerId.toString() === peer.id.toString())) { + if (![...this._remotePeerIds].some(remotePeerId => remotePeerId.toString() === peer.id.toString())) { console.log('Discovered peer multiaddrs', peer.multiaddrs.map(addr => addr.toString())); this._connectPeer(peer); } } - _handleConnect (connection: Connection): void { + async _handleConnect (connection: Connection): Promise { const remotePeerId = connection.remotePeer; - this._remotePeerIds.push(remotePeerId); + this._remotePeerIds.add(remotePeerId); // Log connected peer console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`); console.log(`Current number of peers connected: ${this._node?.getPeers().length}`); } + async _startHeartbeatChecks (peerId: PeerId, handleDisconnect: () => Promise): Promise { + if (this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) { + // Do not start connection check interval if already present + return; + } + + const intervalId = setInterval(async () => { + await this._validatePing( + peerId, + async () => { + // Check if connection check interval for peer is already cleared + if (!this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) { + return; + } + + // Clear and remove check interval for remote peer if not connected + this._stopHeartbeatChecks(peerId); + + await handleDisconnect(); + } + ); + }, CONN_CHECK_INTERVAL); + + this._peerHeartbeatIntervalIdsMap.set(peerId.toString(), intervalId); + } + + async _validatePing (peerId: PeerId, handleDisconnect: () => Promise): Promise { + assert(this._node); + + try { + // Ping remote peer + await this._node.ping(peerId); + } catch (err) { + // On error i.e. no pong + console.log(`Not connected to peer ${peerId.toString()}`); + + await handleDisconnect(); + } + } + _handleDisconnect (connection: Connection): void { + assert(this._node); const disconnectedPeerId = connection.remotePeer; - this._remotePeerIds = this._remotePeerIds.filter(remotePeerId => remotePeerId.toString() !== disconnectedPeerId.toString()); + const peerConnections = this._node.getConnections(disconnectedPeerId); + + if (!peerConnections.length) { + // Remove peer if no remaining connections + this._remotePeerIds = new Set([...this._remotePeerIds].filter(remotePeerId => remotePeerId.toString() !== disconnectedPeerId.toString())); + + // Stop connection check for disconnected peer + this._stopHeartbeatChecks(disconnectedPeerId); + } // Log disconnected peer console.log(`Disconnected from ${disconnectedPeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`); console.log(`Current number of peers connected: ${this._node?.getPeers().length}`); } + _stopHeartbeatChecks (peerId: PeerId): void { + // Clear check interval for disconnected peer + const intervalId = this._peerHeartbeatIntervalIdsMap.get(peerId.toString()); + + if (intervalId) { + clearInterval(intervalId); + } + + this._peerHeartbeatIntervalIdsMap.delete(peerId.toString()); + } + async _connectPeer (peer: PeerInfo): Promise { assert(this._node);