Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
[indexer] await listeners and improve health check
Browse files Browse the repository at this point in the history
  • Loading branch information
YBadiss committed Apr 10, 2024
1 parent 1cd4ec9 commit 5f669cc
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 41 deletions.
3 changes: 3 additions & 0 deletions fc-community-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@ BASE_MAINNET_RPC_URL=<quicknode_url>

```bash
yarn install
# Running pull synchroniser
yarn run sync-dev
# Running listener
yarn run dev
```
79 changes: 45 additions & 34 deletions fc-community-indexer/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1791,47 +1791,58 @@ const getEvents = async (
return allLogs.flat();
};

export const listenToEvents = (
export const listenToEvents = async (
handler: (factchainEvent: FactchainEvent) => void,
) => {
supportedNetworks.map((network) => {
listenToEventsForNetwork(network, handler);
});
): Promise<ethers.Contract[]> => {
return (await Promise.all(
supportedNetworks.map(async (network): Promise<ethers.Contract[]> => {
return await listenToEventsForNetwork(network, handler);
})
)).flat();
};

const listenToEventsForNetwork = (
// TODO:
// - check number of listeners in health check, 500 if not okay
// - maybe call process.exit(0) ?


const listenToEventsForNetwork = async (
network: any,
handler: (factchainEvent: FactchainEvent) => void,
) => {
): Promise<ethers.Contract[]> => {
const provider = new ethers.WebSocketProvider(
network.rpcUrl.replace("https", "wss"),
);
network.contracts.map((factchainContract: FactchainContract) => {
const contract = new ethers.Contract(
factchainContract.address,
factchainContract.abi,
provider,
);
const listeningContracts: ethers.Contract[] = await Promise.all(
network.contracts.map(async (factchainContract: FactchainContract) => {
const contract = new ethers.Contract(
factchainContract.address,
factchainContract.abi,
provider,
);

console.log(
`Listening to events for contract ${factchainContract.address} on network ${network.name}`,
);
contract.on("*", (newEvent) => {
try {
const event = newEvent.log;
console.log("New event received", event);
const factchainEvent = {
networkName: network.name,
contractAddress: event.address,
eventName: event.eventName,
blockTimestamp: Math.floor(Date.now() / 1000),
blockNumber: event.blockNumber,
eventArgs: factchainContract.parseEvent(event),
};
handler(factchainEvent);
} catch (e) {
console.error("Error processing event", e);
}
});
});
console.log(
`Listening to events for contract ${factchainContract.address} on network ${network.name}`,
);
await contract.on("*", (newEvent) => {
try {
const event = newEvent.log;
console.log("New event received", event);
const factchainEvent = {
networkName: network.name,
contractAddress: event.address,
eventName: event.eventName,
blockTimestamp: Math.floor(Date.now() / 1000),
blockNumber: event.blockNumber,
eventArgs: factchainContract.parseEvent(event),
};
handler(factchainEvent);
} catch (e) {
console.error("Error processing event", e);
}
});
return contract
})
);
return listeningContracts;
};
26 changes: 19 additions & 7 deletions fc-community-indexer/src/listener.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
import { ethers } from "ethers";
import { listenToEvents } from "./events";
import { writeEvent } from "./mongo";
import express from "express";

async function run() {
listenToEvents((factchainEvent) => {
console.log("Created factchain event", factchainEvent);
async function run(): Promise<ethers.Contract[]> {
return await listenToEvents((factchainEvent) => {
writeEvent(factchainEvent);
});
}

const app = express();
const port = process.env.PORT;
let listeningContracts: ethers.Contract[] = [];

app.get("/", (req, res) => {
res.send("OK");
app.get("/", async (req, res) => {
const contractsHaveListeners = await Promise.all(
listeningContracts.map(async (contract) => {
const listenerCount = await contract.listenerCount();
console.log(`Listening to ${contract.address} events, with ${listenerCount} listeners`);
return listenerCount > 0;
})
);
if (contractsHaveListeners.every((hasListeners) => hasListeners)) {
res.send("OK");
} else {
res.status(500).send("Not all contracts have listeners");
}
});

app.listen(port, () => {
app.listen(port, async () => {
console.log(`synchroniser listening on port ${port}`);
run().catch(console.dir);
listeningContracts = await run();
});

0 comments on commit 5f669cc

Please sign in to comment.