Skip to content

Commit

Permalink
bring over changes
Browse files Browse the repository at this point in the history
  • Loading branch information
holic committed Oct 27, 2024
1 parent 42af66f commit cc64e69
Showing 1 changed file with 105 additions and 73 deletions.
178 changes: 105 additions & 73 deletions packages/store-sync/src/wiresaw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import { StorageAdapterBlock, StoreEventsLog } from "./common";
import { storeEventsAbi } from "@latticexyz/store";
import { logSort } from "@latticexyz/common";
import { SocketRpcClient, getWebSocketRpcClient } from "viem/utils";
import { debug } from "./debug";
import { debug as parentDebug } from "./debug";

const debug = parentDebug.extend("watchLogs");

type WatchLogsInput = {
url: string;
Expand All @@ -22,97 +24,127 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog
] as LogTopic[]; // https://github.com/wevm/viem/blob/63a5ac86eb9a2962f7323b4cc76ef54f9f5ef7ed/src/actions/public/getLogs.ts#L171

let resumeBlock = fromBlock;
let keepAliveInterval: ReturnType<typeof setTimeout> | undefined = undefined;

let pingTimer: ReturnType<typeof setTimeout> | undefined = undefined;
// how often to ping to keep socket alive
const pingInterval = 3000;
// how long to wait for ping response before we attempt to reconnect
const pingTimeout = 5000;

const logs$ = new Observable<StorageAdapterBlock>((subscriber) => {
debug("logs$ subscribed");

let client: SocketRpcClient<WebSocket>;

function setupClient(): void {
async function setupClient(): Promise<void> {
debug("setupClient called");

// Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed
let caughtUp = false;
const logBuffer: StoreEventsLog[] = [];

getWebSocketRpcClient(url, {
keepAlive: false, // keepAlive is handled below
}).then(async (_client) => {
client = _client;
client.socket.addEventListener("error", (error) =>
subscriber.error({ code: -32603, message: "WebSocket error", data: error }),
);

// Start watching pending logs
const subscriptionId: Hex = (
await client.requestAsync({
body: {
method: "wiresaw_watchLogs",
params: [{ address, topics }],
},
})
).result;

// Listen for wiresaw_watchLogs subscription
// Need to use low level methods since viem's socekt client only handles `eth_subscription` messages.
// (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178)
client.socket.addEventListener("message", (message) => {
const response = JSON.parse(message.data);
if ("error" in response) {
// Return JSON-RPC errors to the subscriber
subscriber.error(response.error);
return;
}
client = await getWebSocketRpcClient(url, { keepAlive: false });
debug("got websocket rpc client");

// Parse the logs from wiresaw_watchLogs
if ("params" in response && response.params.subscription === subscriptionId) {
const logs: RpcLog[] = response.params.result;
const formattedLogs = logs.map((log) => formatLog(log));
const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs });
if (caughtUp) {
const blockNumber = parsedLogs[0].blockNumber;
subscriber.next({ blockNumber, logs: parsedLogs });
resumeBlock = blockNumber + 1n;
} else {
logBuffer.push(...parsedLogs);
}
}
async function ping(): Promise<void> {
try {
debug("pinging socket");
await client.requestAsync({ body: { method: "net_version" }, timeout: pingTimeout });
} catch (error) {
debug("ping failed, closing...", error);
client.close();
}
}

function schedulePing(): void {
debug("scheduling next ping");
pingTimer = setTimeout(() => ping().then(schedulePing), pingInterval);
}

schedulePing();

client.socket.addEventListener("error", (error) => {
debug("socket error, closing...", error);
client.close();
});

client.socket.addEventListener("close", async () => {
debug("socket close, setting up new client...");
clearTimeout(pingTimer);
setupClient().catch((error) => {
debug("error trying to setup new client", error);
subscriber.error(error);
});
});

// Catch up to the pending logs
try {
const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics });
const logs = [...initialLogs, ...logBuffer].sort(logSort);
const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock;
subscriber.next({ blockNumber, logs: initialLogs });
resumeBlock = blockNumber + 1n;
caughtUp = true;
} catch (e) {
subscriber.error("Could not fetch initial wiresaw logs");
// Start watching pending logs
const subscriptionId: Hex = (
await client.requestAsync({
body: {
method: "wiresaw_watchLogs",
params: [{ address, topics }],
},
})
).result;
debug("got watchLogs subscription", subscriptionId);

// Listen for wiresaw_watchLogs subscription
// Need to use low level methods since viem's socekt client only handles `eth_subscription` messages.
// (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178)
client.socket.addEventListener("message", (message) => {
const response = JSON.parse(message.data);
if ("error" in response) {
debug("was error, returning error to subscriber");
// Return JSON-RPC errors to the subscriber
subscriber.error(response.error);
return;
}

// Keep websocket alive and reconnect if it's not alive anymore
keepAliveInterval = setInterval(async () => {
try {
await Promise.race([
client.requestAsync({ body: { method: "net_version" } }),
new Promise<void>((_, reject) => {
setTimeout(reject, 2000);
}),
]);
} catch {
debug("Detected unresponsive websocket, reconnecting...");
clearInterval(keepAliveInterval);
client.close();
setupClient();
// Parse the logs from wiresaw_watchLogs
if ("params" in response && response.params.subscription === subscriptionId) {
debug("parsing logs");
const logs: RpcLog[] = response.params.result;
const formattedLogs = logs.map((log) => formatLog(log));
const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs });
debug("got logs", parsedLogs);
if (caughtUp) {
debug("handing off logs to subscriber");
const blockNumber = parsedLogs[0].blockNumber;
subscriber.next({ blockNumber, logs: parsedLogs });
resumeBlock = blockNumber + 1n;
} else {
debug("buffering logs");
logBuffer.push(...parsedLogs);
}
}, 3000);
return;
}
});

// Catch up to the pending logs
try {
debug("fetching initial logs");
const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics });
debug("got logs", initialLogs);
const logs = [...initialLogs, ...logBuffer].sort(logSort);
const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock;
subscriber.next({ blockNumber, logs: initialLogs });
resumeBlock = blockNumber + 1n;
caughtUp = true;
} catch (error) {
debug("could not get initial logs", error);
subscriber.error("Could not fetch initial wiresaw logs");
}
}

setupClient();
setupClient().catch((error) => {
debug("error setting up initial client", error);
subscriber.error(error);
});

return () => {
debug("logs$ subscription closed");
clearTimeout(pingTimer);
client?.close();
if (keepAliveInterval != null) {
clearInterval(keepAliveInterval);
}
};
});

Expand Down

0 comments on commit cc64e69

Please sign in to comment.