diff --git a/db-tabulator/app.ts b/db-tabulator/app.ts index d0b56f3..393c6cc 100644 --- a/db-tabulator/app.ts +++ b/db-tabulator/app.ts @@ -7,6 +7,7 @@ import { formatSummary } from "../reports/commons"; import {MetadataStore} from "./MetadataStore"; import {HybridMetadataStore} from "./HybridMetadataStore"; import {applyJsPreprocessing, processQueriesExternally} from "./preprocess"; +import {EventEmitter} from "events"; export const BOT_NAME = 'SDZeroBot'; export const TEMPLATE = 'Database report'; @@ -45,12 +46,12 @@ export function getQueriesFromText(text: string, title: string): Query[] { new Query(template, title, idx + 1, !!template.getValue('preprocess_js')?.trim())); } -export async function processQueries(allQueries: Record) { +export async function processQueries(allQueries: Record, notifier?: EventEmitter) { await bot.batchOperation(Object.entries(allQueries), async ([page, queries]) => { if (queries.filter(q => q.needsExternalRun).length > 0) { // Needs an external process for security log(`[+] Processing page ${page} using child process`); - await processQueriesExternally(page); + await processQueriesExternally(page, notifier); } else { log(`[+] Processing page ${page}`); await processQueriesForPage(queries); @@ -85,7 +86,7 @@ export async function checkShutoff() { const queriesLog = createLogStream('queries.log'); -export class Query { +export class Query extends EventEmitter { /// Step 1. Parse the query /// Step 2. Run the query @@ -138,6 +139,7 @@ export class Query { needsForceKill = false; constructor(template: Template, page: string, idxOnPage: number, external?: boolean) { + super(); this.page = page; this.template = template; this.idx = idxOnPage; @@ -145,6 +147,11 @@ export class Query { this.context = getContext(); } + /** Produce events for progress tracking from web UI (if invoked from web endpoint) */ + emit(...args) { + return super.emit('message', ...args); + } + toString() { return this.page + (this.idx !== 1 ? ` (#${this.idx})` : ''); } @@ -156,6 +163,7 @@ export class Query { const resultText = await this.formatResults(result); await this.save(resultText); await metadataStore.updateLastTimestamp(this); + this.emit('done-one'); } catch (err) { if (err instanceof HandledError) return; emailOnError(err, 'db-tabulator'); @@ -260,8 +268,10 @@ export class Query { let query = `SET STATEMENT max_statement_time = ${QUERY_TIMEOUT} FOR ${this.config.sql.trim()}`; query = this.appendLimit(query); queriesLog(`Page: [[${this.page}]], context: ${this.context}, query: ${query}`); + this.emit('query-executing', this.config.sql); return db.timedQuery(query).then(([timeTaken, queryResult]) => { this.queryRuntime = timeTaken.toFixed(2); + this.emit('query-executed', this.queryRuntime); log(`[+] ${this}: Took ${this.queryRuntime} seconds`); return queryResult; }).catch(async (err: SQLError) => { @@ -378,7 +388,6 @@ export class Query { } async formatResultSet(result, pageNumber: number) { - let numColumns = Object.keys(result[0]).length; for (let i = 1; i <= numColumns; i++) { // Stringify everything @@ -391,7 +400,7 @@ export class Query { if (this.getTemplateValue('preprocess_js')) { const jsCode = stripOuterNowikis(this.getTemplateValue('preprocess_js')); try { - result = await applyJsPreprocessing(result, jsCode, this.toString(), this); + result = await applyJsPreprocessing(result, jsCode, this); } catch (e) { log(`[E] Error in applyJsPreprocessing`); log(e); @@ -584,6 +593,7 @@ export class Query { return; } let outputPage = this.config.outputPage || this.page; + this.emit('saving', outputPage); let page = new bot.page(outputPage); let firstPageResult = Array.isArray(queryResult) ? queryResult[0] : queryResult; try { @@ -595,6 +605,7 @@ export class Query { summary: this.generateEditSummary(isError) }; }); + this.emit('save-success', outputPage); } catch (err) { if (isError) { // error on an error logging attempt, just throw now throw err; @@ -606,6 +617,7 @@ export class Query { if (err.code === 'protectedpage') { throw err; } + this.emit('save-failed', outputPage, err.message); return this.saveWithError(`Error while saving report: ${err.message}`); } if (Array.isArray(queryResult)) { // paginated result (output_page is not applicable in this case) @@ -613,11 +625,13 @@ export class Query { let pageNumber = parseInt(idx) + 1; if (pageNumber === 1) continue; // already saved above let subpage = new bot.page(outputPage + '/' + pageNumber); + this.emit('saving', subpage.getPrefixedText()); await subpage.save( `{{Database report/subpage|page=${pageNumber}|num_pages=${this.numPages}}}\n` + resultText, 'Updating database report' ); + this.emit('save-success', subpage.getPrefixedText()); } for (let i = this.numPages + 1; i <= MAX_SUBPAGES; i++) { let subpage = new bot.page(outputPage + '/' + i); @@ -667,6 +681,7 @@ export class Query { if (endTemplateStartIdx === -1) { // Still no? Record for edit summary this.endNotFound = true; + this.emit('end-not-found'); } } let textToReplace = text.slice( diff --git a/db-tabulator/external-update.ts b/db-tabulator/external-update.ts index e8da10b..3306fa7 100644 --- a/db-tabulator/external-update.ts +++ b/db-tabulator/external-update.ts @@ -15,10 +15,21 @@ import {metadataStore, fetchQueriesForPage, processQueriesForPage} from "./app"; ]); const queries = await fetchQueriesForPage(argv.page); + + // Send progress events to parent process for display in web UI + for (let query of queries) { + query.on('message', (...args) => { + process.send({ + code: args[0], + args: args.slice(1) + }); + }); + } + await processQueriesForPage(queries); if (queries.filter(q => q.needsForceKill).length > 0) { - process.send({ code: 'catastrophic-error' }); + process.send({ code: 'catastrophic-error', args: [] }); } })().catch(e => emailOnError(e, 'db-tabulator-child')); diff --git a/db-tabulator/preprocess.ts b/db-tabulator/preprocess.ts index 83f36f9..5035653 100644 --- a/db-tabulator/preprocess.ts +++ b/db-tabulator/preprocess.ts @@ -1,16 +1,12 @@ import {argv, log} from "../botbase"; -import {sleep} from "../../mwn/build/utils"; import {fork} from "child_process"; +import EventEmitter from "events"; +import type {Query} from "./app"; const softTimeout = 1000; const hardTimeout = 1500; const processTimeout = 30000; -interface PreprocessContext { - warnings: Array; - needsForceKill: boolean; -} - async function timedPromise(timeout: number, promise: Promise, cleanup: () => void) { let t: NodeJS.Timeout; await Promise.race([ @@ -27,7 +23,7 @@ async function timedPromise(timeout: number, promise: Promise, cleanup: () }); } -export async function processQueriesExternally(page: string) { +export async function processQueriesExternally(page: string, notifier?: EventEmitter) { const controller = new AbortController(); await timedPromise( processTimeout, @@ -45,6 +41,9 @@ export async function processQueriesExternally(page: string) { if (message.code === 'catastrophic-error') { controller.abort(); // This triggers exit event } + if (notifier) { + notifier.emit('message', message.code, ...message.args); + } }); child.on('error', (err) => { log(`[E] Error from child process`); @@ -57,13 +56,14 @@ export async function processQueriesExternally(page: string) { log(`[E] Aborting child process as it took more than ${processTimeout/1000} seconds`); // FIXME? looks necessary as some errors in child process cause it to never resolve/reject controller.abort(); + notifier.emit('process-timed-out'); } ); } -export async function applyJsPreprocessing(rows: Record[], jsCode: string, queryId: string, - ctx: PreprocessContext): Promise[]> { - log(`[+] Applying JS preprocessing for ${queryId}`); +export async function applyJsPreprocessing(rows: Record[], jsCode: string, query: Query): Promise[]> { + log(`[+] Applying JS preprocessing for ${query}`); + query.emit('preprocessing'); let startTime = process.hrtime.bigint(); // Import dynamically as this has native dependencies @@ -73,7 +73,7 @@ export async function applyJsPreprocessing(rows: Record[], jsCod memoryLimit: 16, onCatastrophicError(msg) { log(`[E] Catastrophic error in isolated-vm: ${msg}`); - ctx.needsForceKill = true; + query.needsForceKill = true; } }); const context = await isolate.createContext(); @@ -95,20 +95,23 @@ export async function applyJsPreprocessing(rows: Record[], jsCod if (Array.isArray(userCodeResultParsed)) { result = userCodeResultParsed; } else { - log(`[E] JS preprocessing for ${queryId} returned a non-array: ${userCodeResult.slice(0, 100)} ... Ignoring.`); - ctx.warnings.push(`JS preprocessing didn't return an array of rows, will be ignored`); + log(`[E] JS preprocessing for ${query} returned a non-array: ${userCodeResult.slice(0, 100)} ... Ignoring.`); + query.warnings.push(`JS preprocessing didn't return an array of rows, will be ignored`); + query.emit('js-no-array'); } } else { - log(`[E] JS preprocessing for ${queryId} has an invalid return value: ${userCodeResult}. Ignoring.`); - ctx.warnings.push(`JS preprocessing must have a transferable return value`); + log(`[E] JS preprocessing for ${query} has an invalid return value: ${userCodeResult}. Ignoring.`); + query.warnings.push(`JS preprocessing must have a transferable return value`); + query.emit('js-invalid-return'); } } catch (e) { // Shouldn't occur as we are the ones doing the JSON.stringify - log(`[E] JS preprocessing for ${queryId} returned a non-JSON: ${userCodeResult.slice(0, 100)}. Ignoring.`); + log(`[E] JS preprocessing for ${query} returned a non-JSON: ${userCodeResult.slice(0, 100)}. Ignoring.`); } } catch (e) { - log(`[E] JS preprocessing for ${queryId} failed: ${e.toString()}`); + log(`[E] JS preprocessing for ${query} failed: ${e.toString()}`); log(e); - ctx.warnings.push(`JS preprocessing failed: ${e.toString()}`); + query.warnings.push(`JS preprocessing failed: ${e.toString()}`); + query.emit('js-failed', e.toString()); } } @@ -123,8 +126,9 @@ export async function applyJsPreprocessing(rows: Record[], jsCod ); let endTime = process.hrtime.bigint(); - let timeTaken = Number(endTime - startTime) / 1e9; - log(`[+] JS preprocessing for ${queryId} took ${timeTaken.toFixed(3)} seconds, cpuTime: ${isolate.cpuTime}, wallTime: ${isolate.wallTime}.`); + let timeTaken = (Number(endTime - startTime) / 1e9).toFixed(3); + log(`[+] JS preprocessing for ${query} took ${timeTaken} seconds, cpuTime: ${isolate.cpuTime}, wallTime: ${isolate.wallTime}.`); + query.emit('preprocessing-complete', timeTaken); return result; } diff --git a/db-tabulator/test.ts b/db-tabulator/test.ts index edaeeda..290dd49 100644 --- a/db-tabulator/test.ts +++ b/db-tabulator/test.ts @@ -36,7 +36,7 @@ describe('db-tabulator', () => { row.id = parseInt(row.id) + 100; }) return rows; - }`, 'test', { warnings: [], needsForceKill: false })); + }`, new Query(new Template('{{}}'), '', 1))); }) }); diff --git a/db-tabulator/web-endpoint.ts b/db-tabulator/web-endpoint.ts index f7d3f4b..021a091 100644 --- a/db-tabulator/web-endpoint.ts +++ b/db-tabulator/web-endpoint.ts @@ -4,13 +4,14 @@ import { fetchQueriesForPage, metadataStore, SHUTOFF_PAGE, - TEMPLATE, SUBSCRIPTIONS_CATEGORY, - processQueries + processQueries, + BOT_NAME } from "./app"; import { createLogStream, mapPath } from "../utils"; import {bot, enwikidb} from "../botbase"; import {getRedisInstance} from "../redis"; +import {EventEmitter} from "events"; const router = express.Router(); @@ -22,16 +23,33 @@ const redisKey = 'web-db-tabulator-pages'; const db = new enwikidb(); -// TODO: show status of requested updates on web UI, with JS polling +router.get('/stream', async (req, res) => { + const {page} = req.query as Record; -router.get('/', async function (req, res, next) { - let {page} = req.query as {page: string}; + res.writeHead(200, { + "Connection": "keep-alive", + "Cache-Control": "no-cache", + "Content-Type": "text/event-stream", + }); + + res.on('close', () => { + log(`[W] Client closed the connection`); + res.end(); + }); + + function stream(code: string, args?: Record) { + res.write(`data: ${JSON.stringify(Object.assign( {}, { code }, args || {}))}\n\n`); + } + function endStream() { + stream('end'); + } let [shutoffText, queries, revId] = await Promise.all([ checkShutoff(), fetchQueriesForPage(page), getLastNonBotRevId(page).catch(err => { - res.status(err.code || 500).render('oneline', { text: err.message }); + stream('failed-get-last-revid', { code: err.code, message: err.message }); + endStream(); }), metadataStore.init(), ]); @@ -39,23 +57,23 @@ router.get('/', async function (req, res, next) { if (!revId) return; if (shutoffText) { - log(`[E] Refused run on ${page} as task is shut off. Shutoff page content: ${shutoffText}`); - return res.status(422).render('oneline', { - text: `Bot is current shut off via ${SHUTOFF_PAGE}. The page should be blank for it to work.` - }); + stream('shutoff', { SHUTOFF_PAGE }); + return endStream(); + } else { + stream('shutoff-checked'); } const pgKey = page + ':' + revId; if (await redis.sismember(redisKey, pgKey).catch(handleRedisError)) { - return res.status(409).render('oneline', { - text: `An update is already in progress for report(s) on page ${page} (revid ${revId})` - }); + stream('already-in-progress', { page, revId }); + return endStream(); } redis.sadd(redisKey, pgKey).catch(handleRedisError); // If no queries found, link clicked was probably from a transcluded report. // Check if any transclusion(s) are in SUBSCRIPTION_CATEGORY and update them. if (queries.length === 0) { + stream('looking-up-transclusions'); const title = bot.Title.newFromText(page); try { // FIXME: use the web replica here as this is a blocking call? @@ -79,20 +97,36 @@ router.get('/', async function (req, res, next) { } } - res.status(queries.length ? 202 : 400).render('database-report', { - page, - template: TEMPLATE, - noQueries: queries.length === 0 - }); - if (queries) { - log(`Started processing ${page}`); - try { - await processQueries({[page]: queries}); - } finally { - redis.srem(redisKey, pgKey).catch(handleRedisError); - } - log(`Finished processing ${page}`); + if (queries.length) { + stream('started', { numQueries: queries.length }); + } else { + stream('no-queries'); + redis.srem(redisKey, pgKey).catch(handleRedisError); + return endStream(); + } + + let handleMessage = (...args) => { + stream(args[0], { args: args.slice(1) }) + }; + + const notifier = new EventEmitter(); + notifier.on('message', handleMessage); // If custom JS is enabled + queries.forEach(q => q.on('message', handleMessage)); // If custom JS is not enabled + + log(`Started processing ${page}`); + try { + await processQueries({[page]: queries}, notifier); + } finally { + redis.srem(redisKey, pgKey).catch(handleRedisError); } + log(`Finished processing ${page}`); + stream('completed'); + return endStream(); +}); + +router.get('/', async function (req, res, next) { + const {page} = req.query as Record; + res.status(200).render('database-report', { page }); }); async function getLastNonBotRevId(page: string) { @@ -101,7 +135,7 @@ async function getLastNonBotRevId(page: string) { titles: page, rvprop: 'ids', rvlimit: 1, - rvexcludeuser: 'SDZeroBot' + rvexcludeuser: BOT_NAME }); let pg = response?.query?.pages?.[0]; if (!pg) { diff --git a/webservice/public/images/loading.gif b/webservice/public/images/loading.gif new file mode 100644 index 0000000..e1b07ea Binary files /dev/null and b/webservice/public/images/loading.gif differ diff --git a/webservice/views/database-report.hbs b/webservice/views/database-report.hbs index b9bc4fc..5d1372f 100644 --- a/webservice/views/database-report.hbs +++ b/webservice/views/database-report.hbs @@ -1,6 +1,119 @@ -{{#if noQueries}} -

Couldn't find any transclusions of \{{{{{wikilink template}}}}} on the page {{{wikilink page}}} :(

-{{else}} -

Your request for regenerating the database report on {{{wikilink page}}} has been registered. It may take the bot a few minutes to make the edit, depending on the efficiency of the query provided.

-{{/if}} +

Live update: {{page}}

+
+loading + diff --git a/webservice/views/layout.hbs b/webservice/views/layout.hbs index 97ffc1b..93a5f46 100644 --- a/webservice/views/layout.hbs +++ b/webservice/views/layout.hbs @@ -1,5 +1,5 @@ - + {{title}}