Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: node and protocols health #2080

Merged
merged 14 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";

export { ConnectionManager } from "./lib/connection_manager.js";

export { getHealthManager } from "./lib/health_manager.js";

export { KeepAliveManager } from "./lib/keep_alive_manager.js";
export { StreamManager } from "./lib/stream_manager/index.js";

Expand Down
90 changes: 90 additions & 0 deletions packages/core/src/lib/health_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import {
HealthStatus,
type IHealthManager,
NodeHealth,
type ProtocolHealth,
Protocols
} from "@waku/interfaces";

class HealthManager implements IHealthManager {
public static instance: HealthManager;
private readonly health: NodeHealth;

private constructor() {
this.health = {
overallStatus: HealthStatus.Unhealthy,
protocolStatuses: new Map()
};
}

public static getInstance(): HealthManager {
if (!HealthManager.instance) {
HealthManager.instance = new HealthManager();
}
return HealthManager.instance;
}

public getHealthStatus(): HealthStatus {
return this.health.overallStatus;
}

public getProtocolStatus(protocol: Protocols): ProtocolHealth | undefined {
return this.health.protocolStatuses.get(protocol);
}

public updateProtocolHealth(
multicodec: string,
connectedPeers: number
): void {
const protocol = this.getNameFromMulticodec(multicodec);

let status: HealthStatus = HealthStatus.Unhealthy;
if (connectedPeers == 1) {
status = HealthStatus.MinimallyHealthy;
} else if (connectedPeers >= 2) {
status = HealthStatus.SufficientlyHealthy;
}

this.health.protocolStatuses.set(protocol, {
name: protocol,
status: status,
lastUpdate: new Date()
});

this.updateOverallHealth();
}

private getNameFromMulticodec(multicodec: string): Protocols {
let name: Protocols;
if (multicodec.includes("filter")) {
name = Protocols.Filter;
} else if (multicodec.includes("lightpush")) {
name = Protocols.LightPush;
} else if (multicodec.includes("store")) {
name = Protocols.Store;
} else {
throw new Error(`Unknown protocol: ${multicodec}`);
}
return name;
}

private updateOverallHealth(): void {
const relevantProtocols = [Protocols.LightPush, Protocols.Filter];
const statuses = relevantProtocols.map(
(p) => this.getProtocolStatus(p)?.status
);

if (statuses.some((status) => status === HealthStatus.Unhealthy)) {
this.health.overallStatus = HealthStatus.Unhealthy;
} else if (
statuses.some((status) => status === HealthStatus.MinimallyHealthy)
) {
this.health.overallStatus = HealthStatus.MinimallyHealthy;
} else {
this.health.overallStatus = HealthStatus.SufficientlyHealthy;
}
}
}

export const getHealthManager = (): HealthManager =>
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
HealthManager.getInstance();
26 changes: 26 additions & 0 deletions packages/interfaces/src/health_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Protocols } from "./protocols";

export enum HealthStatus {
Unhealthy = "Unhealthy",
MinimallyHealthy = "MinimallyHealthy",
SufficientlyHealthy = "SufficientlyHealthy"
}

export interface IHealthManager {
getHealthStatus: () => HealthStatus;
getProtocolStatus: (protocol: Protocols) => ProtocolHealth | undefined;
updateProtocolHealth: (multicodec: string, connectedPeers: number) => void;
}

export type NodeHealth = {
overallStatus: HealthStatus;
protocolStatuses: ProtocolsHealthStatus;
};

export type ProtocolHealth = {
name: Protocols;
status: HealthStatus;
lastUpdate: Date;
};

export type ProtocolsHealthStatus = Map<Protocols, ProtocolHealth>;
1 change: 1 addition & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ export * from "./dns_discovery.js";
export * from "./metadata.js";
export * from "./constants.js";
export * from "./local_storage.js";
export * from "./health_manager.js";
2 changes: 1 addition & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ export type IBaseProtocolCore = {
};

export type IBaseProtocolSDK = {
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
readonly connectedPeers: Peer[];
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
readonly numPeersToUse: number;
};

Expand Down
3 changes: 3 additions & 0 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { MultiaddrInput } from "@multiformats/multiaddr";

import { IConnectionManager } from "./connection_manager.js";
import type { IFilterSDK } from "./filter.js";
import { IHealthManager } from "./health_manager.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPushSDK } from "./light_push.js";
import { Protocols } from "./protocols.js";
Expand All @@ -27,6 +28,8 @@ export interface Waku {
isStarted(): boolean;

isConnected(): boolean;

health: IHealthManager;
}

export interface LightNode extends Waku {
Expand Down
30 changes: 26 additions & 4 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager } from "@waku/core";
import { ConnectionManager, getHealthManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces";
import {
IBaseProtocolSDK,
IHealthManager,
ProtocolUseOptions
} from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";

interface Options {
Expand All @@ -14,6 +18,7 @@ const DEFAULT_NUM_PEERS_TO_USE = 3;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;

export class BaseProtocolSDK implements IBaseProtocolSDK {
private healthManager: IHealthManager;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should understand if we want to support waku.lightpush.health as well as waku.health API

I don't have any preferences here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having it on the Waku object, where a consumer can fetch health for both node and protocol is a good start. We can definitely expand into exposing health on the protocol SDK as well: waku.lightpush.health

public readonly numPeersToUse: number;
private peers: Peer[] = [];
private maintainPeersIntervalId: ReturnType<
Expand All @@ -32,6 +37,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
options: Options
) {
this.log = new Logger(`sdk:${core.multicodec}`);

this.healthManager = getHealthManager();

this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;
Expand Down Expand Up @@ -60,7 +68,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
);
}

this.peers = this.peers.filter((peer) => !peer.id.equals(peerToDisconnect));
const updatedPeers = this.peers.filter(
(peer) => !peer.id.equals(peerToDisconnect)
);
this.updatePeers(updatedPeers);

this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
Expand Down Expand Up @@ -192,7 +204,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {

await Promise.all(dials);

this.peers = [...this.peers, ...additionalPeers];
const updatedPeers = [...this.peers, ...additionalPeers];
this.updatePeers(updatedPeers);

this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}`
);
Expand Down Expand Up @@ -232,6 +246,14 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
throw error;
}
}

private updatePeers(peers: Peer[]): void {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not to change everything else - we can introduce private set peers and do update inside

this.peers = peers;
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.length
);
}
}

class RenewPeerLocker {
Expand Down
6 changes: 5 additions & 1 deletion packages/sdk/src/waku.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import type { Stream } from "@libp2p/interface";
import { isPeerId, PeerId } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager } from "@waku/core";
import { ConnectionManager, getHealthManager } from "@waku/core";
import type {
IFilterSDK,
IHealthManager,
ILightPushSDK,
IRelay,
IStoreSDK,
Expand Down Expand Up @@ -68,6 +69,7 @@ export class WakuNode implements Waku {
public lightPush?: ILightPushSDK;
public connectionManager: ConnectionManager;
public readonly pubsubTopics: PubsubTopic[];
public readonly health: IHealthManager;

public constructor(
options: WakuOptions,
Expand Down Expand Up @@ -105,6 +107,8 @@ export class WakuNode implements Waku {
this.relay
);

this.health = getHealthManager();
weboko marked this conversation as resolved.
Show resolved Hide resolved

if (protocolsEnabled.store) {
const store = wakuStore(this.connectionManager, options);
this.store = store(libp2p);
Expand Down
1 change: 1 addition & 0 deletions packages/tests/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from "./base64_utf8.js";
export * from "./waitForConnections.js";
export * from "./custom_mocha_hooks.js";
export * from "./waku_versions_utils.js";
export * from "./nodes.js";
115 changes: 115 additions & 0 deletions packages/tests/src/utils/nodes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { waitForRemotePeer } from "@waku/core";
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
import {
LightNode,
ProtocolCreateOptions,
Protocols,
ShardingParams,
Waku
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { isDefined, shardInfoToPubsubTopics } from "@waku/utils";
import { Context } from "mocha";
import pRetry from "p-retry";

import { DefaultTestPubsubTopic, NOISE_KEY_1 } from "../constants";
import { ServiceNodesFleet } from "../lib";
import { Args } from "../types";

import { waitForConnections } from "./waitForConnections";

export async function runMultipleNodes(
context: Context,
shardInfo?: ShardingParams,
customArgs?: Args,
strictChecking: boolean = false,
numServiceNodes = 3,
withoutFilter = false
): Promise<[ServiceNodesFleet, LightNode]> {
const pubsubTopics = shardInfo
? shardInfoToPubsubTopics(shardInfo)
: [DefaultTestPubsubTopic];
// create numServiceNodes nodes
const serviceNodes = await ServiceNodesFleet.createAndRun(
context,
pubsubTopics,
numServiceNodes,
strictChecking,
shardInfo,
customArgs,
withoutFilter
);

const wakuOptions: ProtocolCreateOptions = {
staticNoiseKey: NOISE_KEY_1,
libp2p: {
addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] }
}
};

if (shardInfo) {
wakuOptions.shardInfo = shardInfo;
} else {
wakuOptions.pubsubTopics = pubsubTopics;
}

const waku = await createLightNode(wakuOptions);
await waku.start();

if (!waku) {
throw new Error("Failed to initialize waku");
}

for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waitForRemotePeer(
waku,
[
!customArgs?.filter ? undefined : Protocols.Filter,
!customArgs?.lightpush ? undefined : Protocols.LightPush
].filter(isDefined)
);
await node.ensureSubscriptions(pubsubTopics);

const wakuConnections = waku.libp2p.getConnections();
const nodePeers = await node.peers();

if (wakuConnections.length < 1 || nodePeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and service nodes: ${nodePeers.length}`
);
}
}

await waitForConnections(numServiceNodes, waku);

return [serviceNodes, waku];
}

export async function teardownNodesWithRedundancy(
serviceNodes: ServiceNodesFleet,
wakuNodes: Waku | Waku[]
): Promise<void> {
const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes];

const stopNwakuNodes = serviceNodes.nodes.map(async (node) => {
await pRetry(
async () => {
await node.stop();
},
{ retries: 3 }
);
});

const stopWakuNodes = wNodes.map(async (waku) => {
if (waku) {
await pRetry(
async () => {
await waku.stop();
},
{ retries: 3 }
);
}
});

await Promise.all([...stopNwakuNodes, ...stopWakuNodes]);
}
Loading
Loading