From 7bb782de5a2d6390b6def47bc84554805f3be778 Mon Sep 17 00:00:00 2001 From: alvrs Date: Wed, 16 Oct 2024 20:43:26 +0200 Subject: [PATCH 1/7] fix(store-sync): detect and reconnect unresponsive websocket --- packages/store-sync/src/wiresaw.ts | 134 ++++++++++++++++++----------- 1 file changed, 82 insertions(+), 52 deletions(-) diff --git a/packages/store-sync/src/wiresaw.ts b/packages/store-sync/src/wiresaw.ts index 038016a83d..d2417d8283 100644 --- a/packages/store-sync/src/wiresaw.ts +++ b/packages/store-sync/src/wiresaw.ts @@ -4,6 +4,8 @@ import { StorageAdapterBlock, StoreEventsLog } from "./common"; import { storeEventsAbi } from "@latticexyz/store"; import { logSort } from "@latticexyz/common"; import { SocketRpcClient, getWebSocketRpcClient } from "viem/utils"; +import { sleep } from "@latticexyz/utils"; +import { debug } from "./debug"; type WatchLogsInput = { url: string; @@ -16,70 +18,98 @@ type WatchLogsResult = { }; export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLogsResult { - // Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed - let caughtUp = false; - const logBuffer: StoreEventsLog[] = []; - const topics = [ storeEventsAbi.flatMap((event) => encodeEventTopics({ abi: [event], eventName: event.name })), ] as LogTopic[]; // https://github.com/wevm/viem/blob/63a5ac86eb9a2962f7323b4cc76ef54f9f5ef7ed/src/actions/public/getLogs.ts#L171 + let resumeBlock = fromBlock; + let keepAliveInterval: ReturnType | undefined = undefined; const logs$ = new Observable((subscriber) => { let client: SocketRpcClient; - getWebSocketRpcClient(url, { keepAlive: true, reconnect: true }).then(async (_client) => { - client = _client; - client.socket.addEventListener("error", (error) => - subscriber.error({ code: -32603, message: "WebSocket error", data: error }), - ); - - // Start watching pending logs - const subscriptionId: Hex = ( - await client.requestAsync({ - body: { - method: "wiresaw_watchLogs", - params: [{ address, topics }], - }, - }) - ).result; - - // Listen for wiresaw_watchLogs subscription - // Need to use low level methods since viem's socekt client only handles `eth_subscription` messages. - // (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178) - client.socket.addEventListener("message", (message) => { - const response = JSON.parse(message.data); - if ("error" in response) { - // Return JSON-RPC errors to the subscriber - subscriber.error(response.error); - return; - } - // Parse the logs from wiresaw_watchLogs - if ("params" in response && response.params.subscription === subscriptionId) { - const logs: RpcLog[] = response.params.result; - const formattedLogs = logs.map((log) => formatLog(log)); - const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); - if (caughtUp) { - const blockNumber = parsedLogs[0].blockNumber; - subscriber.next({ blockNumber, logs: parsedLogs }); - } else { - logBuffer.push(...parsedLogs); + function setupClient(): void { + // Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed + let caughtUp = false; + const logBuffer: StoreEventsLog[] = []; + + getWebSocketRpcClient(url).then(async (_client) => { + client = _client; + client.socket.addEventListener("error", (error) => + subscriber.error({ code: -32603, message: "WebSocket error", data: error }), + ); + + // Start watching pending logs + const subscriptionId: Hex = ( + await client.requestAsync({ + body: { + method: "wiresaw_watchLogs", + params: [{ address, topics }], + }, + }) + ).result; + + // Listen for wiresaw_watchLogs subscription + // Need to use low level methods since viem's socekt client only handles `eth_subscription` messages. + // (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178) + client.socket.addEventListener("message", (message) => { + const response = JSON.parse(message.data); + if ("error" in response) { + // Return JSON-RPC errors to the subscriber + subscriber.error(response.error); + return; + } + + // Parse the logs from wiresaw_watchLogs + if ("params" in response && response.params.subscription === subscriptionId) { + const logs: RpcLog[] = response.params.result; + const formattedLogs = logs.map((log) => formatLog(log)); + const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); + if (caughtUp) { + const blockNumber = parsedLogs[0].blockNumber; + subscriber.next({ blockNumber, logs: parsedLogs }); + resumeBlock = blockNumber + 1n; + } else { + logBuffer.push(...parsedLogs); + } } + }); + + // Catch up to the pending logs + try { + const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics }); + const logs = [...initialLogs, ...logBuffer].sort(logSort); + const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock; + subscriber.next({ blockNumber, logs: initialLogs }); + resumeBlock = blockNumber + 1n; + caughtUp = true; + } catch (e) { + subscriber.error("Could not fetch initial wiresaw logs"); } + + // Keep websocket alive and reconnect if it's not alive anymore + keepAliveInterval = setInterval(async () => { + const result = await Promise.race([ + client.requestAsync({ body: { method: "net_version" } }), + sleep(1000, null), + ]); + if (!result) { + debug("Detected unresponsive websocket, reconnecting..."); + clearInterval(keepAliveInterval); + client.close(); + setupClient(); + } + }, 3000); }); + } - // Catch up to the pending logs - try { - const initialLogs = await fetchInitialLogs({ client, address, fromBlock, topics }); - const logs = [...initialLogs, ...logBuffer].sort(logSort); - const blockNumber = logs.at(-1)?.blockNumber ?? fromBlock; - subscriber.next({ blockNumber, logs: initialLogs }); - caughtUp = true; - } catch (e) { - subscriber.error("Could not fetch initial wiresaw logs"); - } - }); + setupClient(); - return () => client?.close(); + return () => { + client?.close(); + if (keepAliveInterval != null) { + clearInterval(keepAliveInterval); + } + }; }); return { logs$ }; From 909685be8efd40a54be6679b339a2e9a171d8b65 Mon Sep 17 00:00:00 2001 From: alvrs Date: Wed, 16 Oct 2024 21:03:29 +0200 Subject: [PATCH 2/7] set keepalive to false, replace sleep with promise --- packages/store-sync/src/wiresaw.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/store-sync/src/wiresaw.ts b/packages/store-sync/src/wiresaw.ts index d2417d8283..5682c32b86 100644 --- a/packages/store-sync/src/wiresaw.ts +++ b/packages/store-sync/src/wiresaw.ts @@ -4,7 +4,6 @@ import { StorageAdapterBlock, StoreEventsLog } from "./common"; import { storeEventsAbi } from "@latticexyz/store"; import { logSort } from "@latticexyz/common"; import { SocketRpcClient, getWebSocketRpcClient } from "viem/utils"; -import { sleep } from "@latticexyz/utils"; import { debug } from "./debug"; type WatchLogsInput = { @@ -32,7 +31,9 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog let caughtUp = false; const logBuffer: StoreEventsLog[] = []; - getWebSocketRpcClient(url).then(async (_client) => { + getWebSocketRpcClient(url, { + keepAlive: false, // keepAlive is handled below + }).then(async (_client) => { client = _client; client.socket.addEventListener("error", (error) => subscriber.error({ code: -32603, message: "WebSocket error", data: error }), @@ -90,7 +91,9 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog keepAliveInterval = setInterval(async () => { const result = await Promise.race([ client.requestAsync({ body: { method: "net_version" } }), - sleep(1000, null), + new Promise((resolve) => { + setTimeout(resolve, 2000); + }), ]); if (!result) { debug("Detected unresponsive websocket, reconnecting..."); From 15eaf635b94a22808ab68853bd0ee4f980788e98 Mon Sep 17 00:00:00 2001 From: alvrs Date: Wed, 16 Oct 2024 23:12:01 +0200 Subject: [PATCH 3/7] use reject --- packages/store-sync/src/wiresaw.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/store-sync/src/wiresaw.ts b/packages/store-sync/src/wiresaw.ts index 5682c32b86..1e5206d978 100644 --- a/packages/store-sync/src/wiresaw.ts +++ b/packages/store-sync/src/wiresaw.ts @@ -89,13 +89,14 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog // Keep websocket alive and reconnect if it's not alive anymore keepAliveInterval = setInterval(async () => { - const result = await Promise.race([ - client.requestAsync({ body: { method: "net_version" } }), - new Promise((resolve) => { - setTimeout(resolve, 2000); - }), - ]); - if (!result) { + try { + await Promise.race([ + client.requestAsync({ body: { method: "net_version" } }), + new Promise((_, reject) => { + setTimeout(reject, 2000); + }), + ]); + } catch { debug("Detected unresponsive websocket, reconnecting..."); clearInterval(keepAliveInterval); client.close(); From cc64e692f84e6260b761244767a66d344a6286d7 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Sun, 27 Oct 2024 14:19:49 +0000 Subject: [PATCH 4/7] bring over changes --- packages/store-sync/src/wiresaw.ts | 178 +++++++++++++++++------------ 1 file changed, 105 insertions(+), 73 deletions(-) diff --git a/packages/store-sync/src/wiresaw.ts b/packages/store-sync/src/wiresaw.ts index 1e5206d978..3d393f7ff1 100644 --- a/packages/store-sync/src/wiresaw.ts +++ b/packages/store-sync/src/wiresaw.ts @@ -4,7 +4,9 @@ import { StorageAdapterBlock, StoreEventsLog } from "./common"; import { storeEventsAbi } from "@latticexyz/store"; import { logSort } from "@latticexyz/common"; import { SocketRpcClient, getWebSocketRpcClient } from "viem/utils"; -import { debug } from "./debug"; +import { debug as parentDebug } from "./debug"; + +const debug = parentDebug.extend("watchLogs"); type WatchLogsInput = { url: string; @@ -22,97 +24,127 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog ] as LogTopic[]; // https://github.com/wevm/viem/blob/63a5ac86eb9a2962f7323b4cc76ef54f9f5ef7ed/src/actions/public/getLogs.ts#L171 let resumeBlock = fromBlock; - let keepAliveInterval: ReturnType | undefined = undefined; + + let pingTimer: ReturnType | undefined = undefined; + // how often to ping to keep socket alive + const pingInterval = 3000; + // how long to wait for ping response before we attempt to reconnect + const pingTimeout = 5000; + const logs$ = new Observable((subscriber) => { + debug("logs$ subscribed"); + let client: SocketRpcClient; - function setupClient(): void { + async function setupClient(): Promise { + debug("setupClient called"); + // Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed let caughtUp = false; const logBuffer: StoreEventsLog[] = []; - getWebSocketRpcClient(url, { - keepAlive: false, // keepAlive is handled below - }).then(async (_client) => { - client = _client; - client.socket.addEventListener("error", (error) => - subscriber.error({ code: -32603, message: "WebSocket error", data: error }), - ); - - // Start watching pending logs - const subscriptionId: Hex = ( - await client.requestAsync({ - body: { - method: "wiresaw_watchLogs", - params: [{ address, topics }], - }, - }) - ).result; - - // Listen for wiresaw_watchLogs subscription - // Need to use low level methods since viem's socekt client only handles `eth_subscription` messages. - // (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178) - client.socket.addEventListener("message", (message) => { - const response = JSON.parse(message.data); - if ("error" in response) { - // Return JSON-RPC errors to the subscriber - subscriber.error(response.error); - return; - } + client = await getWebSocketRpcClient(url, { keepAlive: false }); + debug("got websocket rpc client"); - // Parse the logs from wiresaw_watchLogs - if ("params" in response && response.params.subscription === subscriptionId) { - const logs: RpcLog[] = response.params.result; - const formattedLogs = logs.map((log) => formatLog(log)); - const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); - if (caughtUp) { - const blockNumber = parsedLogs[0].blockNumber; - subscriber.next({ blockNumber, logs: parsedLogs }); - resumeBlock = blockNumber + 1n; - } else { - logBuffer.push(...parsedLogs); - } - } + async function ping(): Promise { + try { + debug("pinging socket"); + await client.requestAsync({ body: { method: "net_version" }, timeout: pingTimeout }); + } catch (error) { + debug("ping failed, closing...", error); + client.close(); + } + } + + function schedulePing(): void { + debug("scheduling next ping"); + pingTimer = setTimeout(() => ping().then(schedulePing), pingInterval); + } + + schedulePing(); + + client.socket.addEventListener("error", (error) => { + debug("socket error, closing...", error); + client.close(); + }); + + client.socket.addEventListener("close", async () => { + debug("socket close, setting up new client..."); + clearTimeout(pingTimer); + setupClient().catch((error) => { + debug("error trying to setup new client", error); + subscriber.error(error); }); + }); - // Catch up to the pending logs - try { - const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics }); - const logs = [...initialLogs, ...logBuffer].sort(logSort); - const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock; - subscriber.next({ blockNumber, logs: initialLogs }); - resumeBlock = blockNumber + 1n; - caughtUp = true; - } catch (e) { - subscriber.error("Could not fetch initial wiresaw logs"); + // Start watching pending logs + const subscriptionId: Hex = ( + await client.requestAsync({ + body: { + method: "wiresaw_watchLogs", + params: [{ address, topics }], + }, + }) + ).result; + debug("got watchLogs subscription", subscriptionId); + + // Listen for wiresaw_watchLogs subscription + // Need to use low level methods since viem's socekt client only handles `eth_subscription` messages. + // (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178) + client.socket.addEventListener("message", (message) => { + const response = JSON.parse(message.data); + if ("error" in response) { + debug("was error, returning error to subscriber"); + // Return JSON-RPC errors to the subscriber + subscriber.error(response.error); + return; } - // Keep websocket alive and reconnect if it's not alive anymore - keepAliveInterval = setInterval(async () => { - try { - await Promise.race([ - client.requestAsync({ body: { method: "net_version" } }), - new Promise((_, reject) => { - setTimeout(reject, 2000); - }), - ]); - } catch { - debug("Detected unresponsive websocket, reconnecting..."); - clearInterval(keepAliveInterval); - client.close(); - setupClient(); + // Parse the logs from wiresaw_watchLogs + if ("params" in response && response.params.subscription === subscriptionId) { + debug("parsing logs"); + const logs: RpcLog[] = response.params.result; + const formattedLogs = logs.map((log) => formatLog(log)); + const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); + debug("got logs", parsedLogs); + if (caughtUp) { + debug("handing off logs to subscriber"); + const blockNumber = parsedLogs[0].blockNumber; + subscriber.next({ blockNumber, logs: parsedLogs }); + resumeBlock = blockNumber + 1n; + } else { + debug("buffering logs"); + logBuffer.push(...parsedLogs); } - }, 3000); + return; + } }); + + // Catch up to the pending logs + try { + debug("fetching initial logs"); + const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics }); + debug("got logs", initialLogs); + const logs = [...initialLogs, ...logBuffer].sort(logSort); + const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock; + subscriber.next({ blockNumber, logs: initialLogs }); + resumeBlock = blockNumber + 1n; + caughtUp = true; + } catch (error) { + debug("could not get initial logs", error); + subscriber.error("Could not fetch initial wiresaw logs"); + } } - setupClient(); + setupClient().catch((error) => { + debug("error setting up initial client", error); + subscriber.error(error); + }); return () => { + debug("logs$ subscription closed"); + clearTimeout(pingTimer); client?.close(); - if (keepAliveInterval != null) { - clearInterval(keepAliveInterval); - } }; }); From 15d4640e7414a8becc76a096df20aec9ff6f5000 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Sun, 27 Oct 2024 14:20:16 +0000 Subject: [PATCH 5/7] rename --- packages/store-sync/src/createStoreSync.ts | 2 +- packages/store-sync/src/{wiresaw.ts => watchLogs.ts} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename packages/store-sync/src/{wiresaw.ts => watchLogs.ts} (100%) diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index b152661591..ab3eb005a6 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -39,7 +39,7 @@ import { fromEventSource } from "./fromEventSource"; import { fetchAndStoreLogs } from "./fetchAndStoreLogs"; import { isLogsApiResponse } from "./indexer-client/isLogsApiResponse"; import { toStorageAdatperBlock } from "./indexer-client/toStorageAdapterBlock"; -import { watchLogs } from "./wiresaw"; +import { watchLogs } from "./watchLogs"; const debug = parentDebug.extend("createStoreSync"); diff --git a/packages/store-sync/src/wiresaw.ts b/packages/store-sync/src/watchLogs.ts similarity index 100% rename from packages/store-sync/src/wiresaw.ts rename to packages/store-sync/src/watchLogs.ts From ddc7c7b5dd75fa4c15a68157d98b38023ea6fda6 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Sun, 27 Oct 2024 07:23:04 -0700 Subject: [PATCH 6/7] Create tiny-parrots-hang.md --- .changeset/tiny-parrots-hang.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/tiny-parrots-hang.md diff --git a/.changeset/tiny-parrots-hang.md b/.changeset/tiny-parrots-hang.md new file mode 100644 index 0000000000..cf5643010d --- /dev/null +++ b/.changeset/tiny-parrots-hang.md @@ -0,0 +1,5 @@ +--- +"@latticexyz/store-sync": patch +--- + +Experimental pending logs watcher now reconnects if it loses connection or ping times out. From 1a9a5292fdb05b9b41bfb4c479ec6bfac8eb2217 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Sun, 27 Oct 2024 07:23:33 -0700 Subject: [PATCH 7/7] Update tiny-parrots-hang.md --- .changeset/tiny-parrots-hang.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/tiny-parrots-hang.md b/.changeset/tiny-parrots-hang.md index cf5643010d..a2dd680013 100644 --- a/.changeset/tiny-parrots-hang.md +++ b/.changeset/tiny-parrots-hang.md @@ -2,4 +2,4 @@ "@latticexyz/store-sync": patch --- -Experimental pending logs watcher now reconnects if it loses connection or ping times out. +Experimental pending logs watcher now reconnects if it loses connection or times out.