Skip to content

Commit

Permalink
db-tabulator: overhaul scheduling to be more scalable
Browse files Browse the repository at this point in the history
  • Loading branch information
siddharthvp committed Dec 4, 2023
1 parent ebd9f7c commit 57c79de
Show file tree
Hide file tree
Showing 11 changed files with 321 additions and 67 deletions.
39 changes: 39 additions & 0 deletions db-tabulator/HybridMetadataStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import {MetadataStore} from "./MetadataStore";
import {Query} from "./app";
import {MariadbMetadataStore} from "./MariadbMetadataStore";
import {NoMetadataStore} from "./NoMetadataStore";

export class HybridMetadataStore implements MetadataStore {

stores: MetadataStore[] = [
new MariadbMetadataStore(),
new NoMetadataStore(),
];
activeStore: MetadataStore;

async init(): Promise<void> {
for (const store of this.stores) {
try {
await store.init();
this.activeStore = store;
break;
} catch (e) {}
}
}

getQueriesToRun() {
return this.activeStore.getQueriesToRun();
}

removeOthers(pages: Set<string>) {
return this.activeStore.removeOthers(pages);
}

updateLastTimestamp(query: Query) {
return this.activeStore.updateLastTimestamp(query);
}

updateMetadata(page: string, queries: Query[]) {
return this.activeStore.updateMetadata(page, queries);
}
}
92 changes: 92 additions & 0 deletions db-tabulator/MariadbMetadataStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import {TOOLS_DB_HOST, toolsdb} from "../db";
import {fetchQueriesForPage, Query} from "./app";
import {MetadataStore} from "./MetadataStore";
import {createLocalSSHTunnel, setDifference} from "../utils";
import * as crypto from "crypto";

export class MariadbMetadataStore implements MetadataStore {
db: toolsdb;

async init() {
this.db = new toolsdb('dbreports_p');
await createLocalSSHTunnel(TOOLS_DB_HOST);
await this.db.query(`
CREATE TABLE IF NOT EXISTS dbreports(
page VARCHAR(255),
idx SMALLINT UNSIGNED,
templateMd5 CHAR(32),
intervalDays SMALLINT UNSIGNED,
lastUpdate DATETIME
)
`); // Primary key?
}

async updateMetadata(page: string, queries: Query[]) {
const existingQueryMd5s = new Set((await this.db.query('SELECT templateMd5 FROM dbreports'))
.map(q => q.templateMd5));
const newQueryMd5s = new Set(queries.map(q => this.makeMd5(q)));

await this.db.transaction(async conn => {
setDifference(existingQueryMd5s, newQueryMd5s).forEach(md5 => {
conn.execute('DELETE FROM dbreports WHERE page = ? AND templateMd5 = ?', [page, md5]);
});

// Don't delete lastUpdate values on service restart (or when other reports are added to page)
for (let query of queries) {
const md5 = this.makeMd5(query);
const intervalDays = isNaN(query.config.interval) ? null : query.config.interval;
if (existingQueryMd5s.has(md5)) {
await conn.execute(`
UPDATE dbreports SET idx = ?, intervalDays = ?
WHERE page = ? AND templateMd5 = ?
`, [query.idx, intervalDays, query.page, md5]);
} else {
await conn.execute(`
INSERT INTO dbreports(page, idx, templateMd5, intervalDays, lastUpdate)
VALUES (?, ?, ?, ?, ?)
`, [query.page, query.idx, md5, intervalDays, null]);
}
}
});
}

makeMd5(query: Query) {
return crypto.createHash('md5').update(query.template.wikitext).digest('hex');
}

async removeOthers(pages: Set<string>) {
const questionMarks = Array(pages.size).fill('?').join(',')
await this.db.run(
`DELETE FROM dbreports WHERE page NOT IN (${questionMarks})`,
[...pages]
)
}

async getQueriesToRun() {
const data = await this.db.query(`
SELECT page, idx FROM dbreports
WHERE intervalDays IS NOT NULL
AND (lastUpdate IS NULL OR lastUpdate < NOW() - INTERVAL intervalDays DAY)
`);
let pages: Record<string, Set<number>> = {};
data.forEach(row => {
if (!pages[row.page]) {
pages[row.page] = new Set();
}
pages[row.page].add(row.idx as number);
});
const result: Record<string, Query[]> = {};
for (const [page, indices] of Object.entries(pages)) {
const queries = await fetchQueriesForPage(page);
result[page] = queries.filter(q => indices.has(q.idx));
}
return result;
}

async updateLastTimestamp(query: Query): Promise<void> {
const result = await this.db.run(
`UPDATE dbreports SET lastUpdate = UTC_TIMESTAMP() WHERE page = ? AND idx = ?`
, [query.page, query.idx]);
// TODO: log warning if rows affected != 1
}
}
9 changes: 9 additions & 0 deletions db-tabulator/MetadataStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import {Query} from "./app";

export interface MetadataStore {
init(): Promise<void>;
updateMetadata(page: string, queries: Query[]): Promise<void>;
removeOthers(pages: Set<string>): Promise<void>;
updateLastTimestamp(query: Query): Promise<void>;
getQueriesToRun(): Promise<Record<string, Query[]>>;
}
69 changes: 69 additions & 0 deletions db-tabulator/NoMetadataStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import {BOT_NAME, Query, getQueriesFromText, SUBSCRIPTIONS_CATEGORY, TEMPLATE} from "./app";
import {bot, enwikidb, log} from "../botbase";
import {MwnDate} from "../../mwn/src/date";
import {MetadataStore} from "./MetadataStore";

export class NoMetadataStore implements MetadataStore {
wikidb: enwikidb;

async init() {
this.wikidb = new enwikidb();
}

async updateMetadata(page: string, queries: Query[]) {}

async removeOthers(pages: Set<string>) {}

async updateLastTimestamp() {}

async getQueriesToRun(): Promise<Record<string, Query[]>> {
// Get the date of the bot's last edit to each of the subscribed pages
// The API doesn't have an efficient query for this, so using the DB instead
let [timeTaken, lastEditsDb] = await this.wikidb.timedQuery(`
SELECT page_namespace, page_title,
(SELECT MAX(rc_timestamp) FROM recentchanges_userindex
JOIN actor_recentchanges ON rc_actor = actor_id AND actor_name = ?
WHERE rc_namespace = page_namespace AND rc_title = page_title
) AS last_edit
FROM page
JOIN categorylinks ON cl_from = page_id AND cl_to = ?
`, [BOT_NAME, SUBSCRIPTIONS_CATEGORY.replace(/ /g, '_')]);
log(`[i] Retrieved last edits data. DB query took ${timeTaken.toFixed(2)} seconds.`);

const lastEditsData = Object.fromEntries(lastEditsDb.map((row) => [
new bot.page(row.page_title as string, row.page_namespace as number).toText(),
row.last_edit && new bot.date(row.last_edit)
]));

let allQueries: Record<string, Query[]> = {};
let pages = (await new bot.page('Template:' + TEMPLATE).transclusions());
for await (let pg of bot.readGen(pages)) {
if (pg.ns === 0) { // sanity check: don't work in mainspace
continue;
}
let text = pg.revisions[0].content;
allQueries[pg.title] = getQueriesFromText(text, pg.title).filter(q => {
return this.checkIfUpdateDue(lastEditsData[q.page], q)
});
}
return allQueries;
}

checkIfUpdateDue(lastUpdate: MwnDate, query: Query): boolean {
const interval = query.config.interval;
if (isNaN(interval)) {
log(`[+] Skipping ${query} as periodic updates are not configured`);
return false;
}
if (!lastUpdate) {
return true;
}
let daysDiff = (new bot.date().getTime() - lastUpdate.getTime())/8.64e7;
const isUpdateDue = daysDiff >= interval - 0.5;
if (!isUpdateDue) {
log(`[+] Skipping ${query} as update is not due.`);
}
return isUpdateDue;
}

}
4 changes: 2 additions & 2 deletions db-tabulator/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
## Database report generator

`app.ts` contains all the working logic, but doesn't execute anything by itself. There are 3 entry points:
- `main.ts` - triggered via cron. See entry in `crontab` file.
- `eventstream-trigger.ts` - eventstream hook that does a immediate update for a single page that is edited to newly transclude the triggering template.
- `main.ts` - triggered via cron. See entry in `jobs.yml` file.
- `eventstream-metadata-maintainer.ts` - eventstream hook that updates stored metadata of queries present on pages, used in the cron job.
- `web-endpoint.ts` - webservice route that allows users to trigger update on a specific report.

Use `--fake` argument for the input to be read from `fake-configs.wikitext` and output to be written to `fake-output.wikitext.
61 changes: 9 additions & 52 deletions db-tabulator/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { enwikidb, SQLError } from "../db";
import { Template } from "../../mwn/build/wikitext";
import { arrayChunk, createLogStream, lowerFirst, readFile, writeFile } from "../utils";
import {NS_CATEGORY, NS_FILE, NS_MAIN} from "../namespaces";
import { MwnDate } from "../../mwn/build/date";
import { formatSummary } from "../reports/commons";
import {MetadataStore} from "./MetadataStore";
import {HybridMetadataStore} from "./HybridMetadataStore";

export const BOT_NAME = 'SDZeroBot';
export const TEMPLATE = 'Database report';
Expand All @@ -21,24 +22,17 @@ const db = new enwikidb({
connectionLimit: CONCURRENCY
});

export const metadataStore: MetadataStore = new HybridMetadataStore();

export async function fetchQueries(): Promise<Record<string, Query[]>> {
if (argv.fake) {
let text = readFile(FAKE_INPUT_FILE);
return { 'Fake-Configs': getQueriesFromText(text, 'Fake-Configs') };
}
let allQueries: Record<string, Query[]> = {};
let pages = (await new bot.page('Template:' + TEMPLATE).transclusions());
for await (let pg of bot.readGen(pages)) {
if (pg.ns === 0) { // sanity check: don't work in mainspace
continue;
}
let text = pg.revisions[0].content;
allQueries[pg.title] = getQueriesFromText(text, pg.title);
}
return allQueries;
return metadataStore.getQueriesToRun();
}

function getQueriesFromText(text: string, title: string): Query[] {
export function getQueriesFromText(text: string, title: string): Query[] {
let templates = bot.wikitext.parseTemplates(text, {
namePredicate: name => name === TEMPLATE
});
Expand All @@ -49,29 +43,8 @@ function getQueriesFromText(text: string, title: string): Query[] {
return templates.map((template, idx) => new Query(template, title, idx + 1));
}

let lastEditsData: Record<string, MwnDate>;

// Called from the cronjob
export async function processQueries(allQueries: Record<string, Query[]>) {
await db.getReplagHours();
// Get the date of the bot's last edit to each of the subscribed pages
// The API doesn't have an efficient query for this, so using the DB instead
let [timeTaken, lastEditsDb] = await db.timedQuery(`
SELECT page_namespace, page_title,
(SELECT MAX(rc_timestamp) FROM recentchanges_userindex
JOIN actor_recentchanges ON rc_actor = actor_id AND actor_name = ?
WHERE rc_namespace = page_namespace AND rc_title = page_title
) AS last_edit
FROM page
JOIN categorylinks ON cl_from = page_id AND cl_to = ?
`, [BOT_NAME, SUBSCRIPTIONS_CATEGORY.replace(/ /g, '_')]);
log(`[i] Retrieved last edits data. DB query took ${timeTaken.toFixed(2)} seconds.`);

lastEditsData = Object.fromEntries(lastEditsDb.map((row) => [
new bot.page(row.page_title as string, row.page_namespace as number).toText(),
row.last_edit && new bot.date(row.last_edit)
]));

await bot.batchOperation(Object.entries(allQueries), async ([page, queries]) => {
log(`[+] Processing page ${page}`);
await processQueriesForPage(queries);
Expand Down Expand Up @@ -132,6 +105,7 @@ export class Query {
maxPages?: number;
removeUnderscores?: number[];
hiddenColumns?: number[];
interval?: number;
} = {};

/** Warnings generated while template parsing or result formatting, to be added to the page */
Expand Down Expand Up @@ -163,6 +137,7 @@ export class Query {
const result = await this.runQuery();
const resultText = await this.formatResults(result);
await this.save(resultText);
await metadataStore.updateLastTimestamp(this);
} catch (err) {
if (err instanceof HandledError) return;
emailOnError(err, 'db-tabulator');
Expand All @@ -174,27 +149,9 @@ export class Query {
return this.template.getValue(param)?.replace(/<!--.*?-->/g, '').trim();
}

static checkIfUpdateDue(lastUpdate: MwnDate, interval: number): boolean {
if (!lastUpdate) {
return true;
}
let daysDiff = (new bot.date().getTime() - lastUpdate.getTime())/8.64e7;
return daysDiff >= interval - 0.5;
}

// Errors in configs are reported to user through [[Module:Database report]] in Lua
parseQuery() {
if (this.context === 'cron') {
let interval = parseInt(this.getTemplateValue('interval'));
if (isNaN(interval)) {
log(`[+] Skipping ${this} as periodic updates are not configured`);
throw new HandledError();
}
if (!Query.checkIfUpdateDue(lastEditsData[this.page], interval)) {
log(`[+] Skipping ${this} as update is not due.`);
throw new HandledError();
}
}
this.config.interval = parseInt(this.getTemplateValue('interval'));

// Use of semicolons for multiple statements will be flagged as error at query runtime
this.config.sql = this.getTemplateValue('sql')
Expand Down
Loading

0 comments on commit 57c79de

Please sign in to comment.