Skip to content

Commit

Permalink
lots more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
holic committed Oct 25, 2024
1 parent 91970a7 commit 9f0ee91
Showing 1 changed file with 48 additions and 22 deletions.
70 changes: 48 additions & 22 deletions packages/store-sync/src/wiresaw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import { StorageAdapterBlock, StoreEventsLog } from "./common";
import { storeEventsAbi } from "@latticexyz/store";
import { logSort } from "@latticexyz/common";
import { SocketRpcClient, getWebSocketRpcClient } from "viem/utils";
import { wait } from "@latticexyz/common/utils";
import { debug as parentDebug } from "./debug";

const debug = parentDebug.extend("watchLogs");

type WatchLogsInput = {
url: string;
Expand All @@ -25,24 +27,49 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog
let keepAliveInterval: ReturnType<typeof setTimeout> | undefined = undefined;

const logs$ = new Observable<StorageAdapterBlock>((subscriber) => {
debug("logs$ subscribed");

let client: SocketRpcClient<WebSocket>;

function setupClient(): void {
debug("setupClient called");

// Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed
let caughtUp = false;
const logBuffer: StoreEventsLog[] = [];

getWebSocketRpcClient(url, {
keepAlive: false, // keepAlive is handled below
}).then(async (_client) => {
debug("got websocket rpc client");
client = _client;

client.socket.addEventListener("error", (error) =>
subscriber.error({ code: -32603, message: "WebSocket error", data: error }),
);
// Keep websocket alive and reconnect if it's not alive anymore
keepAliveInterval = setInterval(async () => {
if (client.socket.readyState !== client.socket.OPEN) {
debug("wanted to keep socket alive, but socket not open", client.socket.readyState);
return;
}

try {
debug("keeping socket alive");
client.requestAsync({ body: { method: "net_version" }, timeout: 2000 });
} catch (error) {
debug("no response to keep alive, closing...", error);
clearInterval(keepAliveInterval);
client.close();
}
}, 3000);

client.socket.addEventListener("error", (error) => {
debug("socket error", error);
clearInterval(keepAliveInterval);
subscriber.error({ code: -32603, message: "WebSocket error", data: error });
});

client.socket.addEventListener("close", () => {
console.log("Websocket closed, reconnecting...");
debug("socket closed, trying to setup again...");
clearInterval(keepAliveInterval);
setupClient();
});

Expand All @@ -55,66 +82,65 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog
},
})
).result;
debug("got watchLogs subscription", subscriptionId);

// Listen for wiresaw_watchLogs subscription
// Need to use low level methods since viem's socekt client only handles `eth_subscription` messages.
// (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178)
client.socket.addEventListener("message", (message) => {
debug("got socket message", message);

const response = JSON.parse(message.data);
if ("error" in response) {
debug("was error, returning error to subscriber");
// Return JSON-RPC errors to the subscriber
subscriber.error(response.error);
return;
}

// Parse the logs from wiresaw_watchLogs
if ("params" in response && response.params.subscription === subscriptionId) {
debug("parsing logs");
const logs: RpcLog[] = response.params.result;
const formattedLogs = logs.map((log) => formatLog(log));
const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs });
debug("got logs", parsedLogs);
if (caughtUp) {
debug("handing off logs to subscriber");
const blockNumber = parsedLogs[0].blockNumber;
subscriber.next({ blockNumber, logs: parsedLogs });
resumeBlock = blockNumber + 1n;
} else {
debug("buffering logs");
logBuffer.push(...parsedLogs);
}
return;
}

debug("unknown message, skipping");
});

// Catch up to the pending logs
try {
debug("fetching initial logs");
const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics });
debug("got logs", initialLogs);
const logs = [...initialLogs, ...logBuffer].sort(logSort);
const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock;
subscriber.next({ blockNumber, logs: initialLogs });
resumeBlock = blockNumber + 1n;
caughtUp = true;
} catch (e) {
} catch (error) {
debug("could not get initial logs", error);
subscriber.error("Could not fetch initial wiresaw logs");
}

// Keep websocket alive and reconnect if it's not alive anymore
keepAliveInterval = setInterval(async () => {
try {
await Promise.race([
client.requestAsync({ body: { method: "net_version" } }),
wait(2000).then(() => {
throw new Error("Timed out waiting for websocket RPC ping");
}),
]);
} catch {
console.log("Detected unresponsive websocket, closing...");
clearInterval(keepAliveInterval);
client.close();
}
}, 3000);
});
}

setupClient();

return () => {
debug("logs$ subscription closed");
clearInterval(keepAliveInterval);
client?.close();
};
Expand Down

0 comments on commit 9f0ee91

Please sign in to comment.