From 9ddc874ecbe8671c197619a23bdeac2bee57174e Mon Sep 17 00:00:00 2001 From: alvarius Date: Sun, 27 Oct 2024 15:51:30 +0100 Subject: [PATCH] fix(store-sync): reconnect unresponsive watchLogs socket (#3301) Co-authored-by: Kevin Ingersoll --- .changeset/tiny-parrots-hang.md | 5 + packages/store-sync/src/createStoreSync.ts | 2 +- .../src/{wiresaw.ts => watchLogs.ts} | 92 ++++++++++++++++--- 3 files changed, 85 insertions(+), 14 deletions(-) create mode 100644 .changeset/tiny-parrots-hang.md rename packages/store-sync/src/{wiresaw.ts => watchLogs.ts} (60%) diff --git a/.changeset/tiny-parrots-hang.md b/.changeset/tiny-parrots-hang.md new file mode 100644 index 0000000000..a2dd680013 --- /dev/null +++ b/.changeset/tiny-parrots-hang.md @@ -0,0 +1,5 @@ +--- +"@latticexyz/store-sync": patch +--- + +Experimental pending logs watcher now reconnects if it loses connection or times out. diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index b152661591..ab3eb005a6 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -39,7 +39,7 @@ import { fromEventSource } from "./fromEventSource"; import { fetchAndStoreLogs } from "./fetchAndStoreLogs"; import { isLogsApiResponse } from "./indexer-client/isLogsApiResponse"; import { toStorageAdatperBlock } from "./indexer-client/toStorageAdapterBlock"; -import { watchLogs } from "./wiresaw"; +import { watchLogs } from "./watchLogs"; const debug = parentDebug.extend("createStoreSync"); diff --git a/packages/store-sync/src/wiresaw.ts b/packages/store-sync/src/watchLogs.ts similarity index 60% rename from packages/store-sync/src/wiresaw.ts rename to packages/store-sync/src/watchLogs.ts index 038016a83d..3d393f7ff1 100644 --- a/packages/store-sync/src/wiresaw.ts +++ b/packages/store-sync/src/watchLogs.ts @@ -4,6 +4,9 @@ import { StorageAdapterBlock, StoreEventsLog } from "./common"; import { storeEventsAbi } from "@latticexyz/store"; import { logSort } from "@latticexyz/common"; import { SocketRpcClient, getWebSocketRpcClient } from "viem/utils"; +import { debug as parentDebug } from "./debug"; + +const debug = parentDebug.extend("watchLogs"); type WatchLogsInput = { url: string; @@ -16,21 +19,63 @@ type WatchLogsResult = { }; export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLogsResult { - // Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed - let caughtUp = false; - const logBuffer: StoreEventsLog[] = []; - const topics = [ storeEventsAbi.flatMap((event) => encodeEventTopics({ abi: [event], eventName: event.name })), ] as LogTopic[]; // https://github.com/wevm/viem/blob/63a5ac86eb9a2962f7323b4cc76ef54f9f5ef7ed/src/actions/public/getLogs.ts#L171 + let resumeBlock = fromBlock; + + let pingTimer: ReturnType | undefined = undefined; + // how often to ping to keep socket alive + const pingInterval = 3000; + // how long to wait for ping response before we attempt to reconnect + const pingTimeout = 5000; + const logs$ = new Observable((subscriber) => { + debug("logs$ subscribed"); + let client: SocketRpcClient; - getWebSocketRpcClient(url, { keepAlive: true, reconnect: true }).then(async (_client) => { - client = _client; - client.socket.addEventListener("error", (error) => - subscriber.error({ code: -32603, message: "WebSocket error", data: error }), - ); + + async function setupClient(): Promise { + debug("setupClient called"); + + // Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed + let caughtUp = false; + const logBuffer: StoreEventsLog[] = []; + + client = await getWebSocketRpcClient(url, { keepAlive: false }); + debug("got websocket rpc client"); + + async function ping(): Promise { + try { + debug("pinging socket"); + await client.requestAsync({ body: { method: "net_version" }, timeout: pingTimeout }); + } catch (error) { + debug("ping failed, closing...", error); + client.close(); + } + } + + function schedulePing(): void { + debug("scheduling next ping"); + pingTimer = setTimeout(() => ping().then(schedulePing), pingInterval); + } + + schedulePing(); + + client.socket.addEventListener("error", (error) => { + debug("socket error, closing...", error); + client.close(); + }); + + client.socket.addEventListener("close", async () => { + debug("socket close, setting up new client..."); + clearTimeout(pingTimer); + setupClient().catch((error) => { + debug("error trying to setup new client", error); + subscriber.error(error); + }); + }); // Start watching pending logs const subscriptionId: Hex = ( @@ -41,6 +86,7 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog }, }) ).result; + debug("got watchLogs subscription", subscriptionId); // Listen for wiresaw_watchLogs subscription // Need to use low level methods since viem's socekt client only handles `eth_subscription` messages. @@ -48,6 +94,7 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog client.socket.addEventListener("message", (message) => { const response = JSON.parse(message.data); if ("error" in response) { + debug("was error, returning error to subscriber"); // Return JSON-RPC errors to the subscriber subscriber.error(response.error); return; @@ -55,31 +102,50 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog // Parse the logs from wiresaw_watchLogs if ("params" in response && response.params.subscription === subscriptionId) { + debug("parsing logs"); const logs: RpcLog[] = response.params.result; const formattedLogs = logs.map((log) => formatLog(log)); const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); + debug("got logs", parsedLogs); if (caughtUp) { + debug("handing off logs to subscriber"); const blockNumber = parsedLogs[0].blockNumber; subscriber.next({ blockNumber, logs: parsedLogs }); + resumeBlock = blockNumber + 1n; } else { + debug("buffering logs"); logBuffer.push(...parsedLogs); } + return; } }); // Catch up to the pending logs try { - const initialLogs = await fetchInitialLogs({ client, address, fromBlock, topics }); + debug("fetching initial logs"); + const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics }); + debug("got logs", initialLogs); const logs = [...initialLogs, ...logBuffer].sort(logSort); - const blockNumber = logs.at(-1)?.blockNumber ?? fromBlock; + const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock; subscriber.next({ blockNumber, logs: initialLogs }); + resumeBlock = blockNumber + 1n; caughtUp = true; - } catch (e) { + } catch (error) { + debug("could not get initial logs", error); subscriber.error("Could not fetch initial wiresaw logs"); } + } + + setupClient().catch((error) => { + debug("error setting up initial client", error); + subscriber.error(error); }); - return () => client?.close(); + return () => { + debug("logs$ subscription closed"); + clearTimeout(pingTimer); + client?.close(); + }; }); return { logs$ };