Skip to content

Commit

Permalink
fix(store-sync): reconnect unresponsive watchLogs socket (#3301)
Browse files Browse the repository at this point in the history
Co-authored-by: Kevin Ingersoll <[email protected]>
  • Loading branch information
alvrs and holic authored Oct 27, 2024
1 parent ee388ed commit 9ddc874
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .changeset/tiny-parrots-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/store-sync": patch
---

Experimental pending logs watcher now reconnects if it loses connection or times out.
2 changes: 1 addition & 1 deletion packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import { fromEventSource } from "./fromEventSource";
import { fetchAndStoreLogs } from "./fetchAndStoreLogs";
import { isLogsApiResponse } from "./indexer-client/isLogsApiResponse";
import { toStorageAdatperBlock } from "./indexer-client/toStorageAdapterBlock";
import { watchLogs } from "./wiresaw";
import { watchLogs } from "./watchLogs";

const debug = parentDebug.extend("createStoreSync");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import { StorageAdapterBlock, StoreEventsLog } from "./common";
import { storeEventsAbi } from "@latticexyz/store";
import { logSort } from "@latticexyz/common";
import { SocketRpcClient, getWebSocketRpcClient } from "viem/utils";
import { debug as parentDebug } from "./debug";

const debug = parentDebug.extend("watchLogs");

type WatchLogsInput = {
url: string;
Expand All @@ -16,21 +19,63 @@ type WatchLogsResult = {
};

export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLogsResult {
// Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed
let caughtUp = false;
const logBuffer: StoreEventsLog[] = [];

const topics = [
storeEventsAbi.flatMap((event) => encodeEventTopics({ abi: [event], eventName: event.name })),
] as LogTopic[]; // https://github.com/wevm/viem/blob/63a5ac86eb9a2962f7323b4cc76ef54f9f5ef7ed/src/actions/public/getLogs.ts#L171

let resumeBlock = fromBlock;

let pingTimer: ReturnType<typeof setTimeout> | undefined = undefined;
// how often to ping to keep socket alive
const pingInterval = 3000;
// how long to wait for ping response before we attempt to reconnect
const pingTimeout = 5000;

const logs$ = new Observable<StorageAdapterBlock>((subscriber) => {
debug("logs$ subscribed");

let client: SocketRpcClient<WebSocket>;
getWebSocketRpcClient(url, { keepAlive: true, reconnect: true }).then(async (_client) => {
client = _client;
client.socket.addEventListener("error", (error) =>
subscriber.error({ code: -32603, message: "WebSocket error", data: error }),
);

async function setupClient(): Promise<void> {
debug("setupClient called");

// Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed
let caughtUp = false;
const logBuffer: StoreEventsLog[] = [];

client = await getWebSocketRpcClient(url, { keepAlive: false });
debug("got websocket rpc client");

async function ping(): Promise<void> {
try {
debug("pinging socket");
await client.requestAsync({ body: { method: "net_version" }, timeout: pingTimeout });
} catch (error) {
debug("ping failed, closing...", error);
client.close();
}
}

function schedulePing(): void {
debug("scheduling next ping");
pingTimer = setTimeout(() => ping().then(schedulePing), pingInterval);
}

schedulePing();

client.socket.addEventListener("error", (error) => {
debug("socket error, closing...", error);
client.close();
});

client.socket.addEventListener("close", async () => {
debug("socket close, setting up new client...");
clearTimeout(pingTimer);
setupClient().catch((error) => {
debug("error trying to setup new client", error);
subscriber.error(error);
});
});

// Start watching pending logs
const subscriptionId: Hex = (
Expand All @@ -41,45 +86,66 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog
},
})
).result;
debug("got watchLogs subscription", subscriptionId);

// Listen for wiresaw_watchLogs subscription
// Need to use low level methods since viem's socekt client only handles `eth_subscription` messages.
// (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178)
client.socket.addEventListener("message", (message) => {
const response = JSON.parse(message.data);
if ("error" in response) {
debug("was error, returning error to subscriber");
// Return JSON-RPC errors to the subscriber
subscriber.error(response.error);
return;
}

// Parse the logs from wiresaw_watchLogs
if ("params" in response && response.params.subscription === subscriptionId) {
debug("parsing logs");
const logs: RpcLog[] = response.params.result;
const formattedLogs = logs.map((log) => formatLog(log));
const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs });
debug("got logs", parsedLogs);
if (caughtUp) {
debug("handing off logs to subscriber");
const blockNumber = parsedLogs[0].blockNumber;
subscriber.next({ blockNumber, logs: parsedLogs });
resumeBlock = blockNumber + 1n;
} else {
debug("buffering logs");
logBuffer.push(...parsedLogs);
}
return;
}
});

// Catch up to the pending logs
try {
const initialLogs = await fetchInitialLogs({ client, address, fromBlock, topics });
debug("fetching initial logs");
const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics });
debug("got logs", initialLogs);
const logs = [...initialLogs, ...logBuffer].sort(logSort);
const blockNumber = logs.at(-1)?.blockNumber ?? fromBlock;
const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock;
subscriber.next({ blockNumber, logs: initialLogs });
resumeBlock = blockNumber + 1n;
caughtUp = true;
} catch (e) {
} catch (error) {
debug("could not get initial logs", error);
subscriber.error("Could not fetch initial wiresaw logs");
}
}

setupClient().catch((error) => {
debug("error setting up initial client", error);
subscriber.error(error);
});

return () => client?.close();
return () => {
debug("logs$ subscription closed");
clearTimeout(pingTimer);
client?.close();
};
});

return { logs$ };
Expand Down

0 comments on commit 9ddc874

Please sign in to comment.