From 57c79de8c8b6680737ac45c98b97fbb72fa0bb1c Mon Sep 17 00:00:00 2001 From: Siddharth VP Date: Mon, 4 Dec 2023 14:05:28 +0530 Subject: [PATCH] db-tabulator: overhaul scheduling to be more scalable --- db-tabulator/HybridMetadataStore.ts | 39 ++++++++ db-tabulator/MariadbMetadataStore.ts | 92 +++++++++++++++++++ db-tabulator/MetadataStore.ts | 9 ++ db-tabulator/NoMetadataStore.ts | 69 ++++++++++++++ db-tabulator/README.md | 4 +- db-tabulator/app.ts | 61 ++---------- .../eventstream-metadata-maintainer.ts | 75 +++++++++++++++ db-tabulator/main.ts | 5 +- db-tabulator/test.ts | 29 ++++-- eventstream-router/main.ts | 3 +- jobs.yml | 2 +- 11 files changed, 321 insertions(+), 67 deletions(-) create mode 100644 db-tabulator/HybridMetadataStore.ts create mode 100644 db-tabulator/MariadbMetadataStore.ts create mode 100644 db-tabulator/MetadataStore.ts create mode 100644 db-tabulator/NoMetadataStore.ts create mode 100644 db-tabulator/eventstream-metadata-maintainer.ts diff --git a/db-tabulator/HybridMetadataStore.ts b/db-tabulator/HybridMetadataStore.ts new file mode 100644 index 0000000..ee2c3e4 --- /dev/null +++ b/db-tabulator/HybridMetadataStore.ts @@ -0,0 +1,39 @@ +import {MetadataStore} from "./MetadataStore"; +import {Query} from "./app"; +import {MariadbMetadataStore} from "./MariadbMetadataStore"; +import {NoMetadataStore} from "./NoMetadataStore"; + +export class HybridMetadataStore implements MetadataStore { + + stores: MetadataStore[] = [ + new MariadbMetadataStore(), + new NoMetadataStore(), + ]; + activeStore: MetadataStore; + + async init(): Promise { + for (const store of this.stores) { + try { + await store.init(); + this.activeStore = store; + break; + } catch (e) {} + } + } + + getQueriesToRun() { + return this.activeStore.getQueriesToRun(); + } + + removeOthers(pages: Set) { + return this.activeStore.removeOthers(pages); + } + + updateLastTimestamp(query: Query) { + return this.activeStore.updateLastTimestamp(query); + } + + updateMetadata(page: string, queries: Query[]) { + return this.activeStore.updateMetadata(page, queries); + } +} diff --git a/db-tabulator/MariadbMetadataStore.ts b/db-tabulator/MariadbMetadataStore.ts new file mode 100644 index 0000000..5033b73 --- /dev/null +++ b/db-tabulator/MariadbMetadataStore.ts @@ -0,0 +1,92 @@ +import {TOOLS_DB_HOST, toolsdb} from "../db"; +import {fetchQueriesForPage, Query} from "./app"; +import {MetadataStore} from "./MetadataStore"; +import {createLocalSSHTunnel, setDifference} from "../utils"; +import * as crypto from "crypto"; + +export class MariadbMetadataStore implements MetadataStore { + db: toolsdb; + + async init() { + this.db = new toolsdb('dbreports_p'); + await createLocalSSHTunnel(TOOLS_DB_HOST); + await this.db.query(` + CREATE TABLE IF NOT EXISTS dbreports( + page VARCHAR(255), + idx SMALLINT UNSIGNED, + templateMd5 CHAR(32), + intervalDays SMALLINT UNSIGNED, + lastUpdate DATETIME + ) + `); // Primary key? + } + + async updateMetadata(page: string, queries: Query[]) { + const existingQueryMd5s = new Set((await this.db.query('SELECT templateMd5 FROM dbreports')) + .map(q => q.templateMd5)); + const newQueryMd5s = new Set(queries.map(q => this.makeMd5(q))); + + await this.db.transaction(async conn => { + setDifference(existingQueryMd5s, newQueryMd5s).forEach(md5 => { + conn.execute('DELETE FROM dbreports WHERE page = ? AND templateMd5 = ?', [page, md5]); + }); + + // Don't delete lastUpdate values on service restart (or when other reports are added to page) + for (let query of queries) { + const md5 = this.makeMd5(query); + const intervalDays = isNaN(query.config.interval) ? null : query.config.interval; + if (existingQueryMd5s.has(md5)) { + await conn.execute(` + UPDATE dbreports SET idx = ?, intervalDays = ? + WHERE page = ? AND templateMd5 = ? + `, [query.idx, intervalDays, query.page, md5]); + } else { + await conn.execute(` + INSERT INTO dbreports(page, idx, templateMd5, intervalDays, lastUpdate) + VALUES (?, ?, ?, ?, ?) + `, [query.page, query.idx, md5, intervalDays, null]); + } + } + }); + } + + makeMd5(query: Query) { + return crypto.createHash('md5').update(query.template.wikitext).digest('hex'); + } + + async removeOthers(pages: Set) { + const questionMarks = Array(pages.size).fill('?').join(',') + await this.db.run( + `DELETE FROM dbreports WHERE page NOT IN (${questionMarks})`, + [...pages] + ) + } + + async getQueriesToRun() { + const data = await this.db.query(` + SELECT page, idx FROM dbreports + WHERE intervalDays IS NOT NULL + AND (lastUpdate IS NULL OR lastUpdate < NOW() - INTERVAL intervalDays DAY) + `); + let pages: Record> = {}; + data.forEach(row => { + if (!pages[row.page]) { + pages[row.page] = new Set(); + } + pages[row.page].add(row.idx as number); + }); + const result: Record = {}; + for (const [page, indices] of Object.entries(pages)) { + const queries = await fetchQueriesForPage(page); + result[page] = queries.filter(q => indices.has(q.idx)); + } + return result; + } + + async updateLastTimestamp(query: Query): Promise { + const result = await this.db.run( + `UPDATE dbreports SET lastUpdate = UTC_TIMESTAMP() WHERE page = ? AND idx = ?` + , [query.page, query.idx]); + // TODO: log warning if rows affected != 1 + } +} diff --git a/db-tabulator/MetadataStore.ts b/db-tabulator/MetadataStore.ts new file mode 100644 index 0000000..e75de36 --- /dev/null +++ b/db-tabulator/MetadataStore.ts @@ -0,0 +1,9 @@ +import {Query} from "./app"; + +export interface MetadataStore { + init(): Promise; + updateMetadata(page: string, queries: Query[]): Promise; + removeOthers(pages: Set): Promise; + updateLastTimestamp(query: Query): Promise; + getQueriesToRun(): Promise>; +} diff --git a/db-tabulator/NoMetadataStore.ts b/db-tabulator/NoMetadataStore.ts new file mode 100644 index 0000000..84e89d7 --- /dev/null +++ b/db-tabulator/NoMetadataStore.ts @@ -0,0 +1,69 @@ +import {BOT_NAME, Query, getQueriesFromText, SUBSCRIPTIONS_CATEGORY, TEMPLATE} from "./app"; +import {bot, enwikidb, log} from "../botbase"; +import {MwnDate} from "../../mwn/src/date"; +import {MetadataStore} from "./MetadataStore"; + +export class NoMetadataStore implements MetadataStore { + wikidb: enwikidb; + + async init() { + this.wikidb = new enwikidb(); + } + + async updateMetadata(page: string, queries: Query[]) {} + + async removeOthers(pages: Set) {} + + async updateLastTimestamp() {} + + async getQueriesToRun(): Promise> { + // Get the date of the bot's last edit to each of the subscribed pages + // The API doesn't have an efficient query for this, so using the DB instead + let [timeTaken, lastEditsDb] = await this.wikidb.timedQuery(` + SELECT page_namespace, page_title, + (SELECT MAX(rc_timestamp) FROM recentchanges_userindex + JOIN actor_recentchanges ON rc_actor = actor_id AND actor_name = ? + WHERE rc_namespace = page_namespace AND rc_title = page_title + ) AS last_edit + FROM page + JOIN categorylinks ON cl_from = page_id AND cl_to = ? + `, [BOT_NAME, SUBSCRIPTIONS_CATEGORY.replace(/ /g, '_')]); + log(`[i] Retrieved last edits data. DB query took ${timeTaken.toFixed(2)} seconds.`); + + const lastEditsData = Object.fromEntries(lastEditsDb.map((row) => [ + new bot.page(row.page_title as string, row.page_namespace as number).toText(), + row.last_edit && new bot.date(row.last_edit) + ])); + + let allQueries: Record = {}; + let pages = (await new bot.page('Template:' + TEMPLATE).transclusions()); + for await (let pg of bot.readGen(pages)) { + if (pg.ns === 0) { // sanity check: don't work in mainspace + continue; + } + let text = pg.revisions[0].content; + allQueries[pg.title] = getQueriesFromText(text, pg.title).filter(q => { + return this.checkIfUpdateDue(lastEditsData[q.page], q) + }); + } + return allQueries; + } + + checkIfUpdateDue(lastUpdate: MwnDate, query: Query): boolean { + const interval = query.config.interval; + if (isNaN(interval)) { + log(`[+] Skipping ${query} as periodic updates are not configured`); + return false; + } + if (!lastUpdate) { + return true; + } + let daysDiff = (new bot.date().getTime() - lastUpdate.getTime())/8.64e7; + const isUpdateDue = daysDiff >= interval - 0.5; + if (!isUpdateDue) { + log(`[+] Skipping ${query} as update is not due.`); + } + return isUpdateDue; + } + +} diff --git a/db-tabulator/README.md b/db-tabulator/README.md index 2a8edeb..6865cea 100644 --- a/db-tabulator/README.md +++ b/db-tabulator/README.md @@ -1,8 +1,8 @@ ## Database report generator `app.ts` contains all the working logic, but doesn't execute anything by itself. There are 3 entry points: -- `main.ts` - triggered via cron. See entry in `crontab` file. -- `eventstream-trigger.ts` - eventstream hook that does a immediate update for a single page that is edited to newly transclude the triggering template. +- `main.ts` - triggered via cron. See entry in `jobs.yml` file. +- `eventstream-metadata-maintainer.ts` - eventstream hook that updates stored metadata of queries present on pages, used in the cron job. - `web-endpoint.ts` - webservice route that allows users to trigger update on a specific report. Use `--fake` argument for the input to be read from `fake-configs.wikitext` and output to be written to `fake-output.wikitext. \ No newline at end of file diff --git a/db-tabulator/app.ts b/db-tabulator/app.ts index abc5b73..b39cf4d 100644 --- a/db-tabulator/app.ts +++ b/db-tabulator/app.ts @@ -3,8 +3,9 @@ import { enwikidb, SQLError } from "../db"; import { Template } from "../../mwn/build/wikitext"; import { arrayChunk, createLogStream, lowerFirst, readFile, writeFile } from "../utils"; import {NS_CATEGORY, NS_FILE, NS_MAIN} from "../namespaces"; -import { MwnDate } from "../../mwn/build/date"; import { formatSummary } from "../reports/commons"; +import {MetadataStore} from "./MetadataStore"; +import {HybridMetadataStore} from "./HybridMetadataStore"; export const BOT_NAME = 'SDZeroBot'; export const TEMPLATE = 'Database report'; @@ -21,24 +22,17 @@ const db = new enwikidb({ connectionLimit: CONCURRENCY }); +export const metadataStore: MetadataStore = new HybridMetadataStore(); + export async function fetchQueries(): Promise> { if (argv.fake) { let text = readFile(FAKE_INPUT_FILE); return { 'Fake-Configs': getQueriesFromText(text, 'Fake-Configs') }; } - let allQueries: Record = {}; - let pages = (await new bot.page('Template:' + TEMPLATE).transclusions()); - for await (let pg of bot.readGen(pages)) { - if (pg.ns === 0) { // sanity check: don't work in mainspace - continue; - } - let text = pg.revisions[0].content; - allQueries[pg.title] = getQueriesFromText(text, pg.title); - } - return allQueries; + return metadataStore.getQueriesToRun(); } -function getQueriesFromText(text: string, title: string): Query[] { +export function getQueriesFromText(text: string, title: string): Query[] { let templates = bot.wikitext.parseTemplates(text, { namePredicate: name => name === TEMPLATE }); @@ -49,29 +43,8 @@ function getQueriesFromText(text: string, title: string): Query[] { return templates.map((template, idx) => new Query(template, title, idx + 1)); } -let lastEditsData: Record; - // Called from the cronjob export async function processQueries(allQueries: Record) { - await db.getReplagHours(); - // Get the date of the bot's last edit to each of the subscribed pages - // The API doesn't have an efficient query for this, so using the DB instead - let [timeTaken, lastEditsDb] = await db.timedQuery(` - SELECT page_namespace, page_title, - (SELECT MAX(rc_timestamp) FROM recentchanges_userindex - JOIN actor_recentchanges ON rc_actor = actor_id AND actor_name = ? - WHERE rc_namespace = page_namespace AND rc_title = page_title - ) AS last_edit - FROM page - JOIN categorylinks ON cl_from = page_id AND cl_to = ? - `, [BOT_NAME, SUBSCRIPTIONS_CATEGORY.replace(/ /g, '_')]); - log(`[i] Retrieved last edits data. DB query took ${timeTaken.toFixed(2)} seconds.`); - - lastEditsData = Object.fromEntries(lastEditsDb.map((row) => [ - new bot.page(row.page_title as string, row.page_namespace as number).toText(), - row.last_edit && new bot.date(row.last_edit) - ])); - await bot.batchOperation(Object.entries(allQueries), async ([page, queries]) => { log(`[+] Processing page ${page}`); await processQueriesForPage(queries); @@ -132,6 +105,7 @@ export class Query { maxPages?: number; removeUnderscores?: number[]; hiddenColumns?: number[]; + interval?: number; } = {}; /** Warnings generated while template parsing or result formatting, to be added to the page */ @@ -163,6 +137,7 @@ export class Query { const result = await this.runQuery(); const resultText = await this.formatResults(result); await this.save(resultText); + await metadataStore.updateLastTimestamp(this); } catch (err) { if (err instanceof HandledError) return; emailOnError(err, 'db-tabulator'); @@ -174,27 +149,9 @@ export class Query { return this.template.getValue(param)?.replace(//g, '').trim(); } - static checkIfUpdateDue(lastUpdate: MwnDate, interval: number): boolean { - if (!lastUpdate) { - return true; - } - let daysDiff = (new bot.date().getTime() - lastUpdate.getTime())/8.64e7; - return daysDiff >= interval - 0.5; - } - // Errors in configs are reported to user through [[Module:Database report]] in Lua parseQuery() { - if (this.context === 'cron') { - let interval = parseInt(this.getTemplateValue('interval')); - if (isNaN(interval)) { - log(`[+] Skipping ${this} as periodic updates are not configured`); - throw new HandledError(); - } - if (!Query.checkIfUpdateDue(lastEditsData[this.page], interval)) { - log(`[+] Skipping ${this} as update is not due.`); - throw new HandledError(); - } - } + this.config.interval = parseInt(this.getTemplateValue('interval')); // Use of semicolons for multiple statements will be flagged as error at query runtime this.config.sql = this.getTemplateValue('sql') diff --git a/db-tabulator/eventstream-metadata-maintainer.ts b/db-tabulator/eventstream-metadata-maintainer.ts new file mode 100644 index 0000000..d12f6b1 --- /dev/null +++ b/db-tabulator/eventstream-metadata-maintainer.ts @@ -0,0 +1,75 @@ +import {BOT_NAME, fetchQueriesForPage, SUBSCRIPTIONS_CATEGORY, metadataStore} from "./app"; +import {pageFromCategoryEvent, Route} from "../eventstream-router/app"; +import {bot} from "../botbase"; +import {HybridMetadataStore} from "./HybridMetadataStore"; +import {NoMetadataStore} from "./NoMetadataStore"; + +/** + * If there are a large number of reports, we want to identify which reports need updating without reading in the pages. + * Use EventStream to listen to addition/removal of pages from the subscriptions category. Maintain the list in-memory + * (restored from db on service restart). Use EventStream edit events to also keep track of edits made to pages within + * the category. For each such edit, update metadata in the database. + * Poll this database periodically to check for reports to update. + */ +export default class DbTabulatorMetadata extends Route { + name = "db-tabulator-metadata"; + + subscriptions: Set; + + // Store metadata along with last update + + async init() { + super.init(); + this.log('[S] Started'); + this.subscriptions = new Set((await new bot.Category(SUBSCRIPTIONS_CATEGORY).pages()).map(e => e.title)); + await metadataStore.init(); + if (metadataStore instanceof HybridMetadataStore) { + if (metadataStore.activeStore instanceof NoMetadataStore) { + this.log("[E] Active store is NoMetadataStore, which cannot be used for collecting metadata"); + // Retry loops are problematic as until init() completes, all messages are buffered. + // So, just bail out after 10 minutes. + this.log(`[E] Scheduling restart in 10 minutes`) + setTimeout(() => { + // TODO: centralize this in eventstream-router core? + this.log(`[E] Restart triggered due to ${this.name} handler failing to init`) + process.exit(1); + }, 10 * 60 * 1000); + return Promise.reject(); + } + } + await this.refreshExistingMetadata(); + } + + filter(data): boolean { + return data.wiki === 'enwiki' && + ((data.type === 'categorize' && data.title === 'Category:' + SUBSCRIPTIONS_CATEGORY) || + (data.type === 'edit' && this.subscriptions.has(data.title) && data.user !== BOT_NAME)); + } + + async worker(data) { + if (data.type === 'categorize') { + let page = pageFromCategoryEvent(data); + if (page.added) { + this.subscriptions.add(page.title); + } else { + this.subscriptions.delete(page.title); + } + this.updateMetadata(page.title); + } else { + this.updateMetadata(data.title); + } + } + + async updateMetadata(page: string) { + this.log(`[+] Updating metadata for ${page}`); + const queries = await fetchQueriesForPage(page); + queries.forEach(q => q.parseQuery()); + metadataStore.updateMetadata(page, queries); + } + + async refreshExistingMetadata() { + await bot.batchOperation([...this.subscriptions], page => this.updateMetadata(page), 10); + // Remove pre-existing rows in db which are no longer in subscriptions + await metadataStore.removeOthers(this.subscriptions); + } +} diff --git a/db-tabulator/main.ts b/db-tabulator/main.ts index ccf0b90..e049a40 100644 --- a/db-tabulator/main.ts +++ b/db-tabulator/main.ts @@ -1,6 +1,6 @@ import { argv, bot, emailOnError, log } from "../botbase"; import { closeTunnels, createLocalSSHTunnel, writeFile } from "../utils"; -import { checkShutoff, FAKE_OUTPUT_FILE, fetchQueries, processQueries } from "./app"; +import { checkShutoff, FAKE_OUTPUT_FILE, fetchQueries, processQueries, metadataStore } from "./app"; import { ENWIKI_DB_HOST } from "../db"; /** @@ -36,7 +36,8 @@ import { ENWIKI_DB_HOST } from "../db"; await Promise.all([ bot.getTokensAndSiteInfo(), - createLocalSSHTunnel(ENWIKI_DB_HOST) + metadataStore.init(), + createLocalSSHTunnel(ENWIKI_DB_HOST), ]); if (argv.fake) { diff --git a/db-tabulator/test.ts b/db-tabulator/test.ts index aed0587..e69d34d 100644 --- a/db-tabulator/test.ts +++ b/db-tabulator/test.ts @@ -1,19 +1,30 @@ import { Query } from "./app"; import { bot } from "../botbase"; import assert = require("assert"); +import {NoMetadataStore} from "./NoMetadataStore"; +import {Template} from "../../mwn/build/wikitext"; +import {MwnDate} from "../../mwn"; describe('db-tabulator', () => { + const noMetadataStore = new NoMetadataStore(); + + const isUpdateDue = (lastUpdate: MwnDate, interval: number) => { + const query = new Query(new Template(""), "", 1); + query.config.interval = interval; + return noMetadataStore.checkIfUpdateDue(lastUpdate, query); + } + it('checkIfUpdateDue', () => { - assert.strictEqual(Query.checkIfUpdateDue(new bot.date().subtract(1, 'day'), 1), true); - assert.strictEqual(Query.checkIfUpdateDue(new bot.date().subtract(2, 'day'), 1), true); - assert.strictEqual(Query.checkIfUpdateDue(new bot.date().subtract(1, 'hour'), 1), false); - assert.strictEqual(Query.checkIfUpdateDue(new bot.date().subtract(11, 'hour'), 1), false); - assert.strictEqual(Query.checkIfUpdateDue(new bot.date().subtract(13, 'hour'), 1), true); + assert.strictEqual(isUpdateDue(new bot.date().subtract(1, 'day'), 1), true); + assert.strictEqual(isUpdateDue(new bot.date().subtract(2, 'day'), 1), true); + assert.strictEqual(isUpdateDue(new bot.date().subtract(1, 'hour'), 1), false); + assert.strictEqual(isUpdateDue(new bot.date().subtract(11, 'hour'), 1), false); + assert.strictEqual(isUpdateDue(new bot.date().subtract(13, 'hour'), 1), true); - assert.strictEqual(Query.checkIfUpdateDue(new bot.date().subtract(30, 'hour'), 2), false); - assert.strictEqual(Query.checkIfUpdateDue(new bot.date().subtract(36, 'hour'), 2), true); - assert.strictEqual(Query.checkIfUpdateDue(new bot.date().subtract(40, 'hour'), 2), true); + assert.strictEqual(isUpdateDue(new bot.date().subtract(30, 'hour'), 2), false); + assert.strictEqual(isUpdateDue(new bot.date().subtract(36, 'hour'), 2), true); + assert.strictEqual(isUpdateDue(new bot.date().subtract(40, 'hour'), 2), true); }); -}); \ No newline at end of file +}); diff --git a/eventstream-router/main.ts b/eventstream-router/main.ts index ca086cf..2c852d7 100644 --- a/eventstream-router/main.ts +++ b/eventstream-router/main.ts @@ -10,10 +10,11 @@ import g13Watch from "../reports/g13-watch/eventstream-watch"; import gans from "../most-gans/eventstream-updater"; import botActivityMonitor from "../bot-monitor/eventstream-trigger"; import dbTabulator from "../db-tabulator/eventstream-trigger"; +import dbTabulatorMetadata from "../db-tabulator/eventstream-metadata-maintainer"; import shutoffsMonitor from "./routes/shutoffs-monitor"; import dykCountsTask from "./routes/dyk-counts"; -const routeClasses = [gans, dykCountsTask, botActivityMonitor, dbTabulator, shutoffsMonitor]; +const routeClasses = [gans, dykCountsTask, botActivityMonitor, dbTabulatorMetadata, shutoffsMonitor]; // debugging a single route example: -r "./test" streamWithRoutes(argv.r ? [require(argv.r).default] : routeClasses); diff --git a/jobs.yml b/jobs.yml index 9af1257..cc630f4 100644 --- a/jobs.yml +++ b/jobs.yml @@ -21,7 +21,7 @@ - {"schedule": "5 0 * * *", "name": "job-g13watch", mem: "256Mi", "command": "~/SDZeroBot/job reports/g13-watch/g13-watch.js", "image": "node16", "emails": "onfailure"} - {"schedule": "15,45 * * * *", "name": "stream-check", mem: "128Mi", "command": "~/SDZeroBot/job eventstream-router/check.js", "image": "node16", "emails": "onfailure"} - {"schedule": "20 * * * *", "name": "bot-monitor", mem: "256Mi", "command": "~/SDZeroBot/job bot-monitor/main.js", "image": "node16", "emails": "onfailure"} -- {"schedule": "25 4 * * *", "name": "db-tabulator", mem: "256Mi", "command": "~/SDZeroBot/job db-tabulator/main.js", "image": "node16", "emails": "onfailure"} +- {"schedule": "25 1,5,9,13,17,21 * * *", "name": "db-tabulator", mem: "256Mi", "command": "~/SDZeroBot/job db-tabulator/main.js", "image": "node16", "emails": "onfailure"} - {"schedule": "8 16 * * *", "name": "gans-list", mem: "256Mi", "command": "~/SDZeroBot/job most-gans/gans-lister.js", "image": "node16", "emails": "onfailure"} - {"schedule": "0 3 1 * *", "name": "gans-users", mem: "256Mi", "command": "~/SDZeroBot/job most-gans/update-entries.js", "image": "node16", "emails": "onfailure"} - {"schedule": "0 4 * * *", "name": "shells", mem: "128Mi", "command": "~/SDZeroBot/job terminate-shell-pods.js", "image": "node16", "emails": "onfailure"}