Skip to content

Commit

Permalink
Use browser metrics package in peer (#322)
Browse files Browse the repository at this point in the history
* Use browser metrics package

* Get latency from heartbeat check

* Use metrics in nodejs CLI

* Avoid disconnect from primary relay on reaching limit

* Fix relay connections count
  • Loading branch information
nikugogoi authored Feb 20, 2023
1 parent 83ad5d8 commit 054600c
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 35 deletions.
1 change: 1 addition & 0 deletions packages/peer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"dependencies": {
"@cerc-io/libp2p": "0.42.2-laconic-0.1.1",
"@cerc-io/webrtc-direct": "^5.0.0-laconic-0.1.3",
"@cerc-io/prometheus-metrics": "1.1.4",
"@chainsafe/libp2p-noise": "^11.0.0",
"@libp2p/floodsub": "^6.0.0",
"@libp2p/mplex": "^7.1.1",
Expand Down
99 changes: 69 additions & 30 deletions packages/peer/src/peer-heartbeat-checker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ import type { PeerId } from '@libp2p/interface-peer-id';

import { CONN_CHECK_INTERVAL } from './constants.js';

interface PeerData {
intervalId: NodeJS.Timer;
latencyValues: Array<number>;
}

/**
* Used for tracking heartbeat of connected remote peers
*/
export class PeerHearbeatChecker {
_node: Libp2p;
_peerHeartbeatIntervalIdsMap: Map<string, NodeJS.Timer> = new Map();
_peerMap: Map<string, PeerData> = new Map()

constructor (node: Libp2p) {
this._node = node;
Expand All @@ -24,29 +29,68 @@ export class PeerHearbeatChecker {
* @param handleDisconnect
*/
async start (peerId: PeerId, handleDisconnect: () => Promise<void>): Promise<void> {
if (this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) {
const peerIdString = peerId.toString();

if (this._peerMap.has(peerIdString)) {
// Do not start connection check interval if already present
return;
}

const handlePingDisconnect = async () => {
// Check if connection check interval for peer is already cleared
if (!this._peerMap.get(peerIdString)) {
return;
}

// Clear and remove check interval for remote peer if not connected
this.stop(peerId);

await handleDisconnect();
};

const intervalId = setInterval(async () => {
await this._validatePing(
peerId,
async () => {
// Check if connection check interval for peer is already cleared
if (!this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) {
return;
}

// Clear and remove check interval for remote peer if not connected
this.stop(peerId);

await handleDisconnect();
}
handlePingDisconnect
);
}, CONN_CHECK_INTERVAL);

this._peerHeartbeatIntervalIdsMap.set(peerId.toString(), intervalId);
this._peerMap.set(
peerIdString,
{
intervalId,
latencyValues: []
}
);

await this._validatePing(
peerId,
handlePingDisconnect
);
}

/**
* Method to stop heartbeat checks for a peer
* @param peerId
*/
stop (peerId: PeerId): void {
// Clear check interval for disconnected peer
const peerData = this._peerMap.get(peerId.toString());

if (peerData) {
clearInterval(peerData.intervalId);
}

this._peerMap.delete(peerId.toString());
}

/**
* Get latency data for peer
*/
getLatencyData (peerId: PeerId): Array<number> {
const latencyValues = this._peerMap.get(peerId.toString())?.latencyValues;

return latencyValues ?? [];
}

/**
Expand All @@ -57,27 +101,22 @@ export class PeerHearbeatChecker {
async _validatePing (peerId: PeerId, handleDisconnect: () => Promise<void>): Promise<void> {
try {
// Ping remote peer
await this._node.ping(peerId);
const latency = await this._node.ping(peerId);

const latencyValues = this._peerMap.get(peerId.toString())?.latencyValues;

if (latencyValues) {
const length = latencyValues.unshift(latency);

if (length > 5) {
latencyValues.pop();
}
}
} catch (err) {
// On error i.e. no pong
console.log(`Not connected to peer ${peerId.toString()}`);

await handleDisconnect();
}
}

/**
* Method to stop heartbeat checks for a peer
* @param peerId
*/
stop (peerId: PeerId): void {
// Clear check interval for disconnected peer
const intervalId = this._peerHeartbeatIntervalIdsMap.get(peerId.toString());

if (intervalId) {
clearInterval(intervalId);
}

this._peerHeartbeatIntervalIdsMap.delete(peerId.toString());
}
}
23 changes: 19 additions & 4 deletions packages/peer/src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { createFromJSON, createEd25519PeerId } from '@libp2p/peer-id-factory';
import { multiaddr, Multiaddr } from '@multiformats/multiaddr';
import { floodsub } from '@libp2p/floodsub';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { PrometheusMetrics } from '@cerc-io/prometheus-metrics';

import {
MAX_CONCURRENT_DIALS_PER_PEER,
Expand Down Expand Up @@ -63,6 +64,7 @@ export class Peer {
_peerStreamMap: Map<string, Pushable<any>> = new Map()
_messageHandlers: Array<(peerId: PeerId, message: any) => void> = []
_topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map()
_metrics = new PrometheusMetrics()

constructor (relayNodeURL: string, nodejs?: boolean) {
this._relayNodeMultiaddr = multiaddr(relayNodeURL);
Expand Down Expand Up @@ -91,6 +93,10 @@ export class Peer {
return this._relayNodeMultiaddr;
}

get metrics (): PrometheusMetrics {
return this._metrics;
}

async init (
peerIdObj?: PeerIdObj,
maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS
Expand Down Expand Up @@ -135,7 +141,8 @@ export class Peer {
},
ping: {
timeout: PING_TIMEOUT
}
},
metrics: () => this._metrics
});
} catch (err: any) {
console.log('Could not initialize a libp2p node', err);
Expand Down Expand Up @@ -279,6 +286,14 @@ export class Peer {
return multiaddrString === this._relayNodeMultiaddr.toString();
}

getLatencyData (peerId: PeerId): Array<number> {
if (this._peerHeartbeatChecker) {
return this._peerHeartbeatChecker.getLatencyData(peerId);
}

return [];
}

async _handleChangeProtocols ({ peerId, protocols }: { peerId: PeerId, protocols: string[] }) {
assert(this._node);

Expand Down Expand Up @@ -369,14 +384,14 @@ export class Peer {
console.log(`Connected to ${remotePeerIdString} using multiaddr ${remoteAddrString}`);

if (this.isRelayPeerMultiaddr(remoteAddrString)) {
this._numRelayConnections++;

// Check if relay connections limit has already been reached
if (this._numRelayConnections >= maxRelayConnections) {
if (this._numRelayConnections > maxRelayConnections && !this.isPrimaryRelay(remoteAddrString)) {
console.log(`Closing connection to relay ${remotePeerIdString} as max relay connections limit reached`);
await connection.close();
return;
}

this._numRelayConnections++;
}

// Manage connections and streams
Expand Down
19 changes: 18 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,18 @@
wherearewe "^2.0.0"
xsalsa20 "^1.1.0"

"@cerc-io/[email protected]":
version "1.1.4"
resolved "https://git.vdb.to/api/packages/cerc-io/npm/%40cerc-io%2Fprometheus-metrics/-/1.1.4/prometheus-metrics-1.1.4.tgz#51006b0b5bf6168394390c78072a1c0bb2b02f28"
integrity sha512-Mqg7o1Wer8zKv3/0NWB1sCMmW8hyYI0Fw58d/MR62+5EDZ2yPhwMUrLZUhyqdo3qXJzxMylAPSVx8URDcthmKA==
dependencies:
"@libp2p/interface-connection" "^3.0.2"
"@libp2p/interface-metrics" "^4.0.2"
"@libp2p/logger" "^2.0.2"
it-foreach "^1.0.0"
it-stream-types "^1.0.4"
promjs "^0.4.2"

"@cerc-io/webrtc-direct@^5.0.0-laconic-0.1.3":
version "5.0.0-laconic-0.1.3"
resolved "https://git.vdb.to/api/packages/cerc-io/npm/%40cerc-io%2Fwebrtc-direct/-/5.0.0-laconic-0.1.3/webrtc-direct-5.0.0-laconic-0.1.3.tgz#14802ba88899c904bddc327082d96cb541523ffb"
Expand Down Expand Up @@ -2620,7 +2632,7 @@
interface-datastore "^7.0.0"
multiformats "^10.0.0"

"@libp2p/logger@^2.0.1", "@libp2p/logger@^2.0.5":
"@libp2p/logger@^2.0.1", "@libp2p/logger@^2.0.2", "@libp2p/logger@^2.0.5":
version "2.0.5"
resolved "https://registry.yarnpkg.com/@libp2p/logger/-/logger-2.0.5.tgz#cf0ee695ba21471fd085a7fda3e534e03946ad20"
integrity sha512-WEhxsc7+gsfuTcljI4vSgW/H2f18aBaC+JiO01FcX841Wxe9szjzHdBLDh9eqygUlzoK0LEeIBfctN7ibzus5A==
Expand Down Expand Up @@ -13505,6 +13517,11 @@ promise-to-callback@^1.0.0:
is-fn "^1.0.0"
set-immediate-shim "^1.0.1"

promjs@^0.4.2:
version "0.4.2"
resolved "https://registry.yarnpkg.com/promjs/-/promjs-0.4.2.tgz#9c2b4a60e00c1a0ecb69a3c1c322d1cfb47a300d"
integrity sha512-qvHcTU9xwEieFOf2Qnf5JYPKkdJU2lRbJfJvJspw6XpnoH7VPmNfnJJnOLPfN8ODJMBLRt8wEPVjxyyn0Or6RQ==

promzard@^0.3.0:
version "0.3.0"
resolved "https://registry.npmjs.org/promzard/-/promzard-0.3.0.tgz"
Expand Down

0 comments on commit 054600c

Please sign in to comment.