Skip to content

Commit

Permalink
feat: offline state recovery for Filter subscription (#2049)
Browse files Browse the repository at this point in the history
* up

* fix window reference

* add tests

* up

* add e2e renew test

* address comments

* remove unused

* add test

* try

* remove only

* up test

* up

* remove only

* add tmp logs, use before/after hooks

* up

* fix check

* remove only

* fix test

* up
  • Loading branch information
weboko authored Aug 28, 2024
1 parent 71384df commit eadb85a
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 77 deletions.
98 changes: 71 additions & 27 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,15 @@ export class ConnectionManager

private currentActiveParallelDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = [];
private online: boolean = false;

public isConnected(): boolean {
return this.online;
}
private isP2PNetworkConnected: boolean = false;

private toggleOnline(): void {
if (!this.online) {
this.online = true;
this.dispatchEvent(
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
detail: this.online
})
);
public isConnected(): boolean {
if (globalThis?.navigator && !globalThis?.navigator?.onLine) {
return false;
}
}

private toggleOffline(): void {
if (this.online && this.libp2p.getConnections().length == 0) {
this.online = false;
this.dispatchEvent(
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
detail: this.online
})
);
}
return this.isP2PNetworkConnected;
}

public static create(
Expand Down Expand Up @@ -103,6 +86,7 @@ export class ConnectionManager
"peer:discovery",
this.onEventHandlers["peer:discovery"]
);
this.stopNetworkStatusListener();
}

public async dropConnection(peerId: PeerId): Promise<void> {
Expand Down Expand Up @@ -193,7 +177,7 @@ export class ConnectionManager
options: keepAliveOptions
});

this.run()
this.startEventListeners()
.then(() => log.info(`Connection Manager is now running`))
.catch((error) =>
log.error(`Unexpected error while running service`, error)
Expand Down Expand Up @@ -225,11 +209,12 @@ export class ConnectionManager
}
}

private async run(): Promise<void> {
// start event listeners
private async startEventListeners(): Promise<void> {
this.startPeerDiscoveryListener();
this.startPeerConnectionListener();
this.startPeerDisconnectionListener();

this.startNetworkStatusListener();
}

private async dialPeer(peerId: PeerId): Promise<void> {
Expand Down Expand Up @@ -428,14 +413,18 @@ export class ConnectionManager
)
);
}
this.toggleOnline();

this.setP2PNetworkConnected();
})();
},
"peer:disconnect": (evt: CustomEvent<PeerId>): void => {
void (async () => {
this.keepAliveManager.stop(evt.detail);
this.toggleOffline();
this.setP2PNetworkDisconnected();
})();
},
"browser:network": (): void => {
this.dispatchWakuConnectionEvent();
}
};

Expand Down Expand Up @@ -572,4 +561,59 @@ export class ConnectionManager
if (!shardInfoBytes) return undefined;
return decodeRelayShard(shardInfoBytes);
}

private startNetworkStatusListener(): void {
try {
globalThis.addEventListener(
"online",
this.onEventHandlers["browser:network"]
);
globalThis.addEventListener(
"offline",
this.onEventHandlers["browser:network"]
);
} catch (err) {
log.error(`Failed to start network listener: ${err}`);
}
}

private stopNetworkStatusListener(): void {
try {
globalThis.removeEventListener(
"online",
this.onEventHandlers["browser:network"]
);
globalThis.removeEventListener(
"offline",
this.onEventHandlers["browser:network"]
);
} catch (err) {
log.error(`Failed to stop network listener: ${err}`);
}
}

private setP2PNetworkConnected(): void {
if (!this.isP2PNetworkConnected) {
this.isP2PNetworkConnected = true;
this.dispatchWakuConnectionEvent();
}
}

private setP2PNetworkDisconnected(): void {
if (
this.isP2PNetworkConnected &&
this.libp2p.getConnections().length === 0
) {
this.isP2PNetworkConnected = false;
this.dispatchWakuConnectionEvent();
}
}

private dispatchWakuConnectionEvent(): void {
this.dispatchEvent(
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
detail: this.isConnected()
})
);
}
}
92 changes: 70 additions & 22 deletions packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
type ContentTopic,
type CoreProtocolResult,
type CreateSubscriptionResult,
EConnectionStateEvents,
type IAsyncIterator,
type IDecodedMessage,
type IDecoder,
Expand Down Expand Up @@ -65,20 +66,22 @@ export class SubscriptionManager implements ISubscriptionSDK {
private missedMessagesByPeer: Map<string, number> = new Map();
private maxPingFailures: number = DEFAULT_MAX_PINGS;
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
private subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS;

private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
>;
> = new Map();

public constructor(
private readonly pubsubTopic: PubsubTopic,
private protocol: FilterCore,
private getPeers: () => Peer[],
private readonly protocol: FilterCore,
private readonly connectionManager: ConnectionManager,
private readonly getPeers: () => Peer[],
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();

const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
this.receivedMessagesHashes = {
all: new Set(),
Expand All @@ -89,10 +92,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
}

public get messageHashes(): string[] {
return [...this.receivedMessagesHashes.all];
}

private addHash(hash: string, peerIdStr?: string): void {
this.receivedMessagesHashes.all.add(hash);

Expand Down Expand Up @@ -155,9 +154,8 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});

if (options.keepAlive) {
this.startKeepAlivePings(options);
}
this.subscribeOptions = options;
this.startSubscriptionsMaintenance(options);

return finalResult;
}
Expand All @@ -183,9 +181,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
const finalResult = this.handleResult(results, "unsubscribe");

if (this.subscriptionCallbacks.size === 0) {
if (this.keepAliveTimer) {
this.stopKeepAlivePings();
}
this.stopSubscriptionsMaintenance();
}

return finalResult;
Expand All @@ -211,9 +207,7 @@ export class SubscriptionManager implements ISubscriptionSDK {

const finalResult = this.handleResult(results, "unsubscribeAll");

if (this.keepAliveTimer) {
this.stopKeepAlivePings();
}
this.stopSubscriptionsMaintenance();

return finalResult;
}
Expand Down Expand Up @@ -378,8 +372,19 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}

private startKeepAlivePings(options: SubscribeOptions): void {
const { keepAlive } = options;
private startSubscriptionsMaintenance(options: SubscribeOptions): void {
if (options?.keepAlive) {
this.startKeepAlivePings(options.keepAlive);
}
this.startConnectionListener();
}

private stopSubscriptionsMaintenance(): void {
this.stopKeepAlivePings();
this.stopConnectionListener();
}

private startKeepAlivePings(interval: number): void {
if (this.keepAliveTimer) {
log.info("Recurring pings already set up.");
return;
Expand All @@ -389,7 +394,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
void this.ping().catch((error) => {
log.error("Error in keep-alive ping cycle:", error);
});
}, keepAlive) as unknown as number;
}, interval) as unknown as number;
}

private stopKeepAlivePings(): void {
Expand All @@ -403,6 +408,48 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.keepAliveTimer = null;
}

private startConnectionListener(): void {
this.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
this.connectionListener.bind(this) as (v: CustomEvent<boolean>) => void
);
}

private stopConnectionListener(): void {
this.connectionManager.removeEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
this.connectionListener.bind(this) as (v: CustomEvent<boolean>) => void
);
}

private async connectionListener({
detail: isConnected
}: CustomEvent<boolean>): Promise<void> {
if (!isConnected) {
this.stopKeepAlivePings();
return;
}

try {
const result = await this.ping();
const renewPeerPromises = result.failures.map(
async (v): Promise<void> => {
if (v.peerId) {
await this.renewAndSubscribePeer(v.peerId);
}
}
);

await Promise.all(renewPeerPromises);
} catch (err) {
log.error(`networkStateListener failed to recover: ${err}`);
}

this.startKeepAlivePings(
this.subscribeOptions?.keepAlive || DEFAULT_SUBSCRIBE_OPTIONS.keepAlive
);
}

private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
Expand All @@ -416,6 +463,7 @@ export class SubscriptionManager implements ISubscriptionSDK {

class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore;
private readonly _connectionManager: ConnectionManager;

private activeSubscriptions = new Map<string, SubscriptionManager>();

Expand Down Expand Up @@ -445,8 +493,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
);

this.protocol = this.core as FilterCore;

this.activeSubscriptions = new Map();
this._connectionManager = connectionManager;
}

/**
Expand Down Expand Up @@ -576,6 +623,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
new SubscriptionManager(
pubsubTopic,
this.protocol,
this._connectionManager,
() => this.connectedPeers,
this.renewPeer.bind(this)
)
Expand Down
Loading

0 comments on commit eadb85a

Please sign in to comment.