From 9f0ee911810e31864d43850cedf0a4e4aff3022e Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 25 Oct 2024 15:21:04 +0100 Subject: [PATCH] lots more logs --- packages/store-sync/src/wiresaw.ts | 70 ++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 22 deletions(-) diff --git a/packages/store-sync/src/wiresaw.ts b/packages/store-sync/src/wiresaw.ts index 9cb02cfccf..cbbb432d1a 100644 --- a/packages/store-sync/src/wiresaw.ts +++ b/packages/store-sync/src/wiresaw.ts @@ -4,7 +4,9 @@ import { StorageAdapterBlock, StoreEventsLog } from "./common"; import { storeEventsAbi } from "@latticexyz/store"; import { logSort } from "@latticexyz/common"; import { SocketRpcClient, getWebSocketRpcClient } from "viem/utils"; -import { wait } from "@latticexyz/common/utils"; +import { debug as parentDebug } from "./debug"; + +const debug = parentDebug.extend("watchLogs"); type WatchLogsInput = { url: string; @@ -25,9 +27,13 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog let keepAliveInterval: ReturnType | undefined = undefined; const logs$ = new Observable((subscriber) => { + debug("logs$ subscribed"); + let client: SocketRpcClient; function setupClient(): void { + debug("setupClient called"); + // Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed let caughtUp = false; const logBuffer: StoreEventsLog[] = []; @@ -35,14 +41,35 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog getWebSocketRpcClient(url, { keepAlive: false, // keepAlive is handled below }).then(async (_client) => { + debug("got websocket rpc client"); client = _client; - client.socket.addEventListener("error", (error) => - subscriber.error({ code: -32603, message: "WebSocket error", data: error }), - ); + // Keep websocket alive and reconnect if it's not alive anymore + keepAliveInterval = setInterval(async () => { + if (client.socket.readyState !== client.socket.OPEN) { + debug("wanted to keep socket alive, but socket not open", client.socket.readyState); + return; + } + + try { + debug("keeping socket alive"); + client.requestAsync({ body: { method: "net_version" }, timeout: 2000 }); + } catch (error) { + debug("no response to keep alive, closing...", error); + clearInterval(keepAliveInterval); + client.close(); + } + }, 3000); + + client.socket.addEventListener("error", (error) => { + debug("socket error", error); + clearInterval(keepAliveInterval); + subscriber.error({ code: -32603, message: "WebSocket error", data: error }); + }); client.socket.addEventListener("close", () => { - console.log("Websocket closed, reconnecting..."); + debug("socket closed, trying to setup again..."); + clearInterval(keepAliveInterval); setupClient(); }); @@ -55,13 +82,17 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog }, }) ).result; + debug("got watchLogs subscription", subscriptionId); // Listen for wiresaw_watchLogs subscription // Need to use low level methods since viem's socekt client only handles `eth_subscription` messages. // (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178) client.socket.addEventListener("message", (message) => { + debug("got socket message", message); + const response = JSON.parse(message.data); if ("error" in response) { + debug("was error, returning error to subscriber"); // Return JSON-RPC errors to the subscriber subscriber.error(response.error); return; @@ -69,52 +100,47 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog // Parse the logs from wiresaw_watchLogs if ("params" in response && response.params.subscription === subscriptionId) { + debug("parsing logs"); const logs: RpcLog[] = response.params.result; const formattedLogs = logs.map((log) => formatLog(log)); const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); + debug("got logs", parsedLogs); if (caughtUp) { + debug("handing off logs to subscriber"); const blockNumber = parsedLogs[0].blockNumber; subscriber.next({ blockNumber, logs: parsedLogs }); resumeBlock = blockNumber + 1n; } else { + debug("buffering logs"); logBuffer.push(...parsedLogs); } + return; } + + debug("unknown message, skipping"); }); // Catch up to the pending logs try { + debug("fetching initial logs"); const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics }); + debug("got logs", initialLogs); const logs = [...initialLogs, ...logBuffer].sort(logSort); const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock; subscriber.next({ blockNumber, logs: initialLogs }); resumeBlock = blockNumber + 1n; caughtUp = true; - } catch (e) { + } catch (error) { + debug("could not get initial logs", error); subscriber.error("Could not fetch initial wiresaw logs"); } - - // Keep websocket alive and reconnect if it's not alive anymore - keepAliveInterval = setInterval(async () => { - try { - await Promise.race([ - client.requestAsync({ body: { method: "net_version" } }), - wait(2000).then(() => { - throw new Error("Timed out waiting for websocket RPC ping"); - }), - ]); - } catch { - console.log("Detected unresponsive websocket, closing..."); - clearInterval(keepAliveInterval); - client.close(); - } - }, 3000); }); } setupClient(); return () => { + debug("logs$ subscription closed"); clearInterval(keepAliveInterval); client?.close(); };