Skip to content

Commit

Permalink
eventstream-router: implement in-memory health checks
Browse files Browse the repository at this point in the history
  • Loading branch information
siddharthvp committed May 3, 2024
1 parent 7fd49de commit b0cd12a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 15 deletions.
48 changes: 33 additions & 15 deletions eventstream-router/app.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { argv, bot, fs, log } from "../botbase";
import { argv, bot, emailOnError, fs, log } from "../botbase";
import { RecentChangeStreamEvent } from "./RecentChangeStreamEvent";
import { createLogStream, stringifyObject } from "../utils";
import EventSource = require("./EventSource");
import { MINUTE, SECOND } from "../millis";

// TODO: improve logging

Expand Down Expand Up @@ -126,15 +127,22 @@ function addToRouterLog(routeName: string, data: RecentChangeStreamEvent) {
routerLog(`Routing to ${routeName}: ${catNote}${data.title}@${data.wiki}`);
}

async function run(routes: RouteValidator[], lastSeen: LastSeen) {
log('[S] Restarted main');
let stream;

function run(routes: RouteValidator[], lastSeen: LastSeen) {
log('[S] Restarted');

const ts = lastSeen.get();
const tsUsable = ts.isValid() && new bot.date().subtract(7, 'days').isBefore(ts);
log(`[i] lastSeenTs: ${ts}: ${tsUsable}`);

let since = !argv.fromNow && tsUsable ? ts : new bot.date();
let stream = new EventSource(

if (stream) {
// ensure that there aren't two parallel connections
stream.close();
}
stream = new EventSource(
`https://stream.wikimedia.org/v2/stream/recentchange?since=${since.toISOString()}`, {
headers: {
'User-Agent': bot.options.userAgent
Expand All @@ -151,14 +159,14 @@ async function run(routes: RouteValidator[], lastSeen: LastSeen) {
if (evt.type === 'error' && evt.message === undefined) {
// The every 15 minute connection drop?
return; // EventSource automatically reconnects. No unnecessary logging.
// TODO: consider logging this, if this is the source of other kinds of drops as well
}
log(`[W] Event source encountered error:`);
logError(evt);

if (evt.status === 429) { // Too Many Requests, the underlying library doesn't reconnect by itself
stream.close(); // just to be safe that there aren't two parallel connections
bot.sleep(5000).then(() => {
return main(routes, lastSeen); // restart
bot.sleep(5 * SECOND).then(() => {
start(routes, lastSeen); // restart
});
}
// TODO: handle other errors, ensure auto-reconnection
Expand Down Expand Up @@ -188,17 +196,19 @@ async function run(routes: RouteValidator[], lastSeen: LastSeen) {
}
}

async function main(routes: RouteValidator[], lastSeen: LastSeen) {
await run(routes, lastSeen).catch(err => {
logError(err);
main(routes, lastSeen);
});
function start(routes: RouteValidator[], lastSeen: LastSeen) {
try {
run(routes, lastSeen);
} catch (err) { // should never occur
emailOnError(err, 'stream (run)');
}
}

interface StreamAppConfig {
routingLogFile?: string;
lastSeenFile?: string;
lastSeenUpdateInterval?: number
lastSeenUpdateInterval?: number;
healthCheckInterval?: number;
}

export function streamWithRoutes(routes: (new () => Route)[], config: StreamAppConfig = {}) {
Expand All @@ -210,9 +220,17 @@ export function streamWithRoutes(routes: (new () => Route)[], config: StreamAppC
routerLog = createLogStream(config.routingLogFile || './routerlog.out');
const lastSeen = new LastSeen(
config.lastSeenFile || './last-seen.txt',
config.lastSeenUpdateInterval || 1000
config.lastSeenUpdateInterval || SECOND
);
main(validatedRoutes, lastSeen);
start(validatedRoutes, lastSeen);

setInterval(() => {
if (lastSeen.get().add(2, 'minutes').isBefore(new Date())) {
log(`[E] Restarting as no events seen in last two minutes`);
start(validatedRoutes, lastSeen);
}
}, config.healthCheckInterval || 2 * MINUTE);

}

export function logError(err, task?) {
Expand Down
9 changes: 9 additions & 0 deletions millis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export const MILLISECOND = 1;

export const SECOND = 1000;

export const MINUTE = 60 * SECOND;

export const HOUR = 60 * MINUTE;

export const DAY = 24 * HOUR;

0 comments on commit b0cd12a

Please sign in to comment.