Skip to content

Commit

Permalink
Ensure connection with peers after going offline (#297)
Browse files Browse the repository at this point in the history
* Redial relay node on failure and after going offline

* Redial only relay node if not connected

* Refactor and rename methods

* Only close existing connection to relay node

* Hearbeat check only for relay node

* Refactor startHeartbeatCheck method
  • Loading branch information
nikugogoi authored Jan 19, 2023
1 parent 9d38306 commit b07e288
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 21 deletions.
4 changes: 4 additions & 0 deletions packages/peer-test-app/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ declare global {
interface Window {
broadcast: (message: string) => void;
flood: (message: string) => void;
peer: Peer;
}
}

Expand All @@ -28,6 +29,9 @@ function App() {
return
}

// For debugging
window.peer = peer;

// Subscribe to messages from remote peers
const unsubscribeMessage = peer.subscribeMessage((peerId, message) => {
console.log(`${peerId.toString()} > ${message}`)
Expand Down
7 changes: 7 additions & 0 deletions packages/peer/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ export const RELAY_TAG = {
value: 100
};

// Interval time in ms to check connection with ping for connected peer
// Currently only checking for relay node
export const CONN_CHECK_INTERVAL = 10000; // 10 seconds

// Delay time in ms to redial relay node on failing to connect
export const RELAY_REDIAL_DELAY = 5000; // 5 sconds

// Peer connection manager config constants

// Number of max concurrent dials per peer
Expand Down
139 changes: 118 additions & 21 deletions packages/peer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { multiaddr, Multiaddr } from '@multiformats/multiaddr';
import { floodsub } from '@libp2p/floodsub';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';

import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG } from './constants.js';
import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, CONN_CHECK_INTERVAL } from './constants.js';

export const CHAT_PROTOCOL = '/chat/1.0.0';
export const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star';
Expand All @@ -36,10 +36,11 @@ export class Peer {
_wrtcStar: WebRTCStarTuple
_relayNodeMultiaddr?: Multiaddr

_remotePeerIds: PeerId[] = []
_remotePeerIds: Set<PeerId> = new Set()
_peerStreamMap: Map<string, Pushable<any>> = new Map()
_messageHandlers: Array<(peerId: PeerId, message: any) => void> = []
_topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map()
_peerHeartbeatIntervalIdsMap: Map<string, NodeJS.Timer> = new Map();

constructor (nodejs?: boolean) {
// Instantiation in nodejs.
Expand Down Expand Up @@ -110,18 +111,7 @@ export class Peer {

// Dial to the HOP enabled relay node if available
if (this._relayNodeMultiaddr) {
const relayMultiaddr = this._relayNodeMultiaddr;

console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`);
await this._node.dial(relayMultiaddr);

// Tag the relay node with a high value to prioritize it's connection
// in connection pruning on crossing peer's maxConnections limit
const relayPeerId = this._node.getPeers().find(
peerId => peerId.toString() === relayMultiaddr.getPeerId()
);
assert(relayPeerId);
this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value });
await this._dialRelay();
}

// Listen for change in stored multiaddrs
Expand All @@ -144,9 +134,9 @@ export class Peer {
});

// Listen for peers connection
this._node.connectionManager.addEventListener('peer:connect', (evt) => {
this._node.connectionManager.addEventListener('peer:connect', async (evt) => {
console.log('event peer:connect', evt);
this._handleConnect(evt.detail);
await this._handleConnect(evt.detail);
});

// Listen for peers disconnecting
Expand Down Expand Up @@ -175,7 +165,8 @@ export class Peer {
this._node.pubsub.removeEventListener('message');

await this._node.unhandle(CHAT_PROTOCOL);
const hangUpPromises = this._remotePeerIds.map(async peerId => this._node?.hangUp(peerId));
this._remotePeerIds.forEach(remotePeerId => this._stopHeartbeatChecks(remotePeerId));
const hangUpPromises = [...this._remotePeerIds].map(async peerId => this._node?.hangUp(peerId));
await Promise.all(hangUpPromises);
}

Expand Down Expand Up @@ -232,32 +223,138 @@ export class Peer {
return unsubscribe;
}

async _dialRelay (): Promise<void> {
assert(this._relayNodeMultiaddr);
assert(this._node);
const relayMultiaddr = this._relayNodeMultiaddr;

// Keep dialling relay node until it connects
while (true) {
try {
console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`);
const connection = await this._node.dial(relayMultiaddr);
const relayPeerId = connection.remotePeer;

// TODO: Check if tag already exists. When checking tags issue with relay node connect event
// Tag the relay node with a high value to prioritize it's connection
// in connection pruning on crossing peer's maxConnections limit
this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value });

// Start heartbeat check for relay node
await this._startHeartbeatChecks(
relayPeerId,
async () => await this._handleRelayDisconnect(relayPeerId)
);

break;
} catch (err) {
console.log(`Could not dial relay ${relayMultiaddr.toString()}`, err);

// TODO: Use wait method from util package.
// Issue using util package in react app.
await new Promise(resolve => setTimeout(resolve, RELAY_REDIAL_DELAY));
}
}
}

async _handleRelayDisconnect (relayPeerId: PeerId): Promise<void> {
assert(this._node);

// Close existing connection of relay node
console.log(`closing connections for ${relayPeerId}`);
await this._node.hangUp(relayPeerId);
console.log('closed');

// Reconnect to relay node
await this._dialRelay();
}

_handleDiscovery (peer: PeerInfo): void {
// Check connected peers as they are discovered repeatedly.
if (!this._remotePeerIds.some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
if (![...this._remotePeerIds].some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
console.log('Discovered peer multiaddrs', peer.multiaddrs.map(addr => addr.toString()));
this._connectPeer(peer);
}
}

_handleConnect (connection: Connection): void {
async _handleConnect (connection: Connection): Promise<void> {
const remotePeerId = connection.remotePeer;
this._remotePeerIds.push(remotePeerId);
this._remotePeerIds.add(remotePeerId);

// Log connected peer
console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
console.log(`Current number of peers connected: ${this._node?.getPeers().length}`);
}

async _startHeartbeatChecks (peerId: PeerId, handleDisconnect: () => Promise<void>): Promise<void> {
if (this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) {
// Do not start connection check interval if already present
return;
}

const intervalId = setInterval(async () => {
await this._validatePing(
peerId,
async () => {
// Check if connection check interval for peer is already cleared
if (!this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) {
return;
}

// Clear and remove check interval for remote peer if not connected
this._stopHeartbeatChecks(peerId);

await handleDisconnect();
}
);
}, CONN_CHECK_INTERVAL);

this._peerHeartbeatIntervalIdsMap.set(peerId.toString(), intervalId);
}

async _validatePing (peerId: PeerId, handleDisconnect: () => Promise<void>): Promise<void> {
assert(this._node);

try {
// Ping remote peer
await this._node.ping(peerId);
} catch (err) {
// On error i.e. no pong
console.log(`Not connected to peer ${peerId.toString()}`);

await handleDisconnect();
}
}

_handleDisconnect (connection: Connection): void {
assert(this._node);
const disconnectedPeerId = connection.remotePeer;
this._remotePeerIds = this._remotePeerIds.filter(remotePeerId => remotePeerId.toString() !== disconnectedPeerId.toString());
const peerConnections = this._node.getConnections(disconnectedPeerId);

if (!peerConnections.length) {
// Remove peer if no remaining connections
this._remotePeerIds = new Set([...this._remotePeerIds].filter(remotePeerId => remotePeerId.toString() !== disconnectedPeerId.toString()));

// Stop connection check for disconnected peer
this._stopHeartbeatChecks(disconnectedPeerId);
}

// Log disconnected peer
console.log(`Disconnected from ${disconnectedPeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
console.log(`Current number of peers connected: ${this._node?.getPeers().length}`);
}

_stopHeartbeatChecks (peerId: PeerId): void {
// Clear check interval for disconnected peer
const intervalId = this._peerHeartbeatIntervalIdsMap.get(peerId.toString());

if (intervalId) {
clearInterval(intervalId);
}

this._peerHeartbeatIntervalIdsMap.delete(peerId.toString());
}

async _connectPeer (peer: PeerInfo): Promise<void> {
assert(this._node);

Expand Down

0 comments on commit b07e288

Please sign in to comment.