Skip to content

Commit

Permalink
fix: attempt to fix some of the Filter issues (#2183)
Browse files Browse the repository at this point in the history
* feat: lighten retry logic for LightPush

* update tests

* remove base protocol sdk from light push, add unit tests for light push

* remove replaced test

* ensure numPeersToUse is respected

* turn off check for missing messages

* fix recurring ping

* add useful logs

* skip tests

* remove comment

* feat: check filter subscriptions against lightPush (#2185)
  • Loading branch information
weboko authored Oct 16, 2024
1 parent 4049123 commit ded994f
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 146 deletions.
2 changes: 1 addition & 1 deletion packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export type SubscriptionCallback<T extends IDecodedMessage> = {
export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
maxMissedMessagesThreshold?: number;
enableLightPushFilterCheck?: boolean;
};

export interface ISubscription {
Expand Down
4 changes: 2 additions & 2 deletions packages/message-hash/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { sha256 } from "@noble/hashes/sha256";
import type { IDecodedMessage, IProtoMessage } from "@waku/interfaces";
import { isDefined } from "@waku/utils";
import {
bytesToUtf8,
bytesToHex,
concat,
numberToBytes,
utf8ToBytes
Expand Down Expand Up @@ -56,6 +56,6 @@ export function messageHashStr(
message: IProtoMessage | IDecodedMessage
): string {
const hash = messageHash(pubsubTopic, message);
const hashStr = bytesToUtf8(hash);
const hashStr = bytesToHex(hash);
return hashStr;
}
5 changes: 4 additions & 1 deletion packages/sdk/src/protocols/filter/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
export const DEFAULT_KEEP_ALIVE = 60_000;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000;

export const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE
keepAlive: DEFAULT_KEEP_ALIVE,
enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK
};
12 changes: 9 additions & 3 deletions packages/sdk/src/protocols/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
type IDecodedMessage,
type IDecoder,
type IFilter,
type ILightPush,
type Libp2p,
NetworkConfig,
type ProtocolCreateOptions,
Expand Down Expand Up @@ -38,7 +39,8 @@ class Filter extends BaseProtocolSDK implements IFilter {

public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private libp2p: Libp2p,
private lightPush?: ILightPush,
options?: ProtocolCreateOptions
) {
super(
Expand Down Expand Up @@ -195,7 +197,9 @@ class Filter extends BaseProtocolSDK implements IFilter {
this.protocol,
this.connectionManager,
() => this.connectedPeers,
this.renewPeer.bind(this)
this.renewPeer.bind(this),
this.libp2p,
this.lightPush
)
);

Expand Down Expand Up @@ -300,7 +304,9 @@ class Filter extends BaseProtocolSDK implements IFilter {

export function wakuFilter(
connectionManager: ConnectionManager,
lightPush?: ILightPush,
init?: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilter {
return (libp2p: Libp2p) => new Filter(connectionManager, libp2p, init);
return (libp2p: Libp2p) =>
new Filter(connectionManager, libp2p, lightPush, init);
}
150 changes: 118 additions & 32 deletions packages/sdk/src/protocols/filter/subscription_manager.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import type { Peer } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, FilterCore } from "@waku/core";
import {
ConnectionManager,
createDecoder,
createEncoder,
FilterCore,
LightPushCore
} from "@waku/core";
import {
type Callback,
type ContentTopic,
type CoreProtocolResult,
EConnectionStateEvents,
type IDecodedMessage,
type IDecoder,
type ILightPush,
type IProtoMessage,
type ISubscription,
type Libp2p,
type PeerIdStr,
ProtocolError,
type PubsubTopic,
Expand All @@ -23,14 +31,23 @@ import { groupByContentTopic, Logger } from "@waku/utils";
import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js";

import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
import {
DEFAULT_KEEP_ALIVE,
DEFAULT_LIGHT_PUSH_FILTER_CHECK,
DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL,
DEFAULT_SUBSCRIBE_OPTIONS
} from "./constants.js";

const log = new Logger("sdk:filter:subscription_manager");

export class SubscriptionManager implements ISubscription {
private reliabilityMonitor: ReceiverReliabilityMonitor;

private keepAliveTimer: number | null = null;
private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE;
private keepAliveInterval: ReturnType<typeof setInterval> | null = null;

private enableLightPushFilterCheck = DEFAULT_LIGHT_PUSH_FILTER_CHECK;

private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
Expand All @@ -43,7 +60,9 @@ export class SubscriptionManager implements ISubscription {
private readonly getPeers: () => Peer[],
private readonly renewPeer: (
peerToDisconnect: PeerId
) => Promise<Peer | undefined>
) => Promise<Peer | undefined>,
private readonly libp2p: Libp2p,
private readonly lightPush?: ILightPush
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
Expand All @@ -54,7 +73,8 @@ export class SubscriptionManager implements ISubscription {
this.renewPeer.bind(this),
() => Array.from(this.subscriptionCallbacks.keys()),
this.protocol.subscribe.bind(this.protocol),
this.protocol.addLibp2pEventListener.bind(this.protocol)
this.protocol.addLibp2pEventListener.bind(this.protocol),
this.sendLightPushCheckMessage.bind(this)
);
}

Expand All @@ -63,11 +83,10 @@ export class SubscriptionManager implements ISubscription {
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SDKProtocolResult> {
this.reliabilityMonitor.setMaxMissedMessagesThreshold(
options.maxMissedMessagesThreshold
);
this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed);
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.enableLightPushFilterCheck =
options?.enableLightPushFilterCheck || DEFAULT_LIGHT_PUSH_FILTER_CHECK;

const decodersArray = Array.isArray(decoders) ? decoders : [decoders];

Expand All @@ -85,11 +104,20 @@ export class SubscriptionManager implements ISubscription {
}
}

if (this.enableLightPushFilterCheck) {
decodersArray.push(
createDecoder(
this.buildLightPushContentTopic(),
this.pubsubTopic
) as IDecoder<T>
);
}

const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());

const promises = this.getPeers().map(async (peer) =>
this.protocol.subscribe(this.pubsubTopic, peer, contentTopics)
this.subscribeWithPeerVerification(peer, contentTopics)
);

const results = await Promise.allSettled(promises);
Expand All @@ -107,12 +135,17 @@ export class SubscriptionManager implements ISubscription {
callback
} as unknown as SubscriptionCallback<IDecodedMessage>;

// don't handle case of internal content topic
if (contentTopic === this.buildLightPushContentTopic()) {
return;
}

// The callback and decoder may override previous values, this is on
// purpose as the user may call `subscribe` to refresh the subscription
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});

this.startSubscriptionsMaintenance(this.keepAliveTimer);
this.startSubscriptionsMaintenance(this.keepAliveTimeout);

return finalResult;
}
Expand Down Expand Up @@ -174,10 +207,9 @@ export class SubscriptionManager implements ISubscription {
message: WakuMessage,
peerIdStr: PeerIdStr
): Promise<void> {
const alreadyReceived = this.reliabilityMonitor.processIncomingMessage(
message,
this.pubsubTopic,
peerIdStr
const alreadyReceived = this.reliabilityMonitor.notifyMessageReceived(
peerIdStr,
message as IProtoMessage
);

if (alreadyReceived) {
Expand All @@ -200,6 +232,19 @@ export class SubscriptionManager implements ISubscription {
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
}

private async subscribeWithPeerVerification(
peer: Peer,
contentTopics: string[]
): Promise<CoreProtocolResult> {
const result = await this.protocol.subscribe(
this.pubsubTopic,
peer,
contentTopics
);
await this.sendLightPushCheckMessage(peer);
return result;
}

private handleResult(
results: PromiseSettledResult<CoreProtocolResult>[],
type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll"
Expand Down Expand Up @@ -240,23 +285,26 @@ export class SubscriptionManager implements ISubscription {
let result;
try {
result = await this.protocol.ping(peer);
return result;
} catch (error) {
return {
result = {
success: null,
failure: {
peerId,
error: ProtocolError.GENERIC_FAIL
}
};
} finally {
void this.reliabilityMonitor.handlePingResult(peerId, result);
}

log.info(
`Received result from filter ping peerId:${peerId.toString()}\tsuccess:${result.success?.toString()}\tfailure:${result.failure?.error}`
);
await this.reliabilityMonitor.handlePingResult(peerId, result);
return result;
}

private startSubscriptionsMaintenance(interval: number): void {
private startSubscriptionsMaintenance(timeout: number): void {
log.info("Starting subscriptions maintenance");
this.startKeepAlivePings(interval);
this.startKeepAlivePings(timeout);
this.startConnectionListener();
}

Expand Down Expand Up @@ -295,31 +343,69 @@ export class SubscriptionManager implements ISubscription {
log.error(`networkStateListener failed to recover: ${err}`);
}

this.startKeepAlivePings(this.keepAliveTimer || DEFAULT_KEEP_ALIVE);
this.startKeepAlivePings(this.keepAliveTimeout);
}

private startKeepAlivePings(interval: number): void {
if (this.keepAliveTimer) {
private startKeepAlivePings(timeout: number): void {
if (this.keepAliveInterval) {
log.info("Recurring pings already set up.");
return;
}

this.keepAliveTimer = setInterval(() => {
void this.ping()
.then(() => log.info("Keep-alive ping successful"))
.catch((error) => log.error("Error in keep-alive ping cycle:", error));
}, interval) as unknown as number;
this.keepAliveInterval = setInterval(() => {
void this.ping();
}, timeout);
}

private stopKeepAlivePings(): void {
if (!this.keepAliveTimer) {
if (!this.keepAliveInterval) {
log.info("Already stopped recurring pings.");
return;
}

log.info("Stopping recurring pings.");
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = null;
clearInterval(this.keepAliveInterval);
this.keepAliveInterval = null;
}

private async sendLightPushCheckMessage(peer: Peer): Promise<void> {
if (
this.lightPush &&
this.libp2p &&
this.reliabilityMonitor.shouldVerifyPeer(peer.id)
) {
const encoder = createEncoder({
contentTopic: this.buildLightPushContentTopic(),
pubsubTopic: this.pubsubTopic,
ephemeral: true
});

const message = { payload: new Uint8Array(1) };
const protoMessage = await encoder.toProtoObj(message);

// make a delay to be sure message is send when subscription is in place
setTimeout(
(async () => {
const result = await (this.lightPush!.protocol as LightPushCore).send(
encoder,
message,
peer
);
this.reliabilityMonitor.notifyMessageSent(peer.id, protoMessage);
if (result.failure) {
log.error(
`failed to send lightPush ping message to peer:${peer.id.toString()}\t${result.failure.error}`
);
return;
}
}) as () => void,
DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL
);
}
}

private buildLightPushContentTopic(): string {
return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`;
}
}

Expand Down
7 changes: 4 additions & 3 deletions packages/sdk/src/reliability_monitor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export class ReliabilityMonitorManager {
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>,
addLibp2pEventListener: Libp2p["addEventListener"]
addLibp2pEventListener: Libp2p["addEventListener"],
sendLightPushMessage: (peer: Peer) => Promise<void>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
Expand All @@ -36,7 +37,8 @@ export class ReliabilityMonitorManager {
renewPeer,
getContentTopics,
protocolSubscribe,
addLibp2pEventListener
addLibp2pEventListener,
sendLightPushMessage
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
Expand All @@ -50,7 +52,6 @@ export class ReliabilityMonitorManager {

public static stopAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
}
Expand Down
Loading

0 comments on commit ded994f

Please sign in to comment.