Skip to content

Commit

Permalink
db-tabulator: show live status of updates on web UI
Browse files Browse the repository at this point in the history
  • Loading branch information
siddharthvp committed Apr 14, 2024
1 parent 37df627 commit 03087ff
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 60 deletions.
25 changes: 20 additions & 5 deletions db-tabulator/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { formatSummary } from "../reports/commons";
import {MetadataStore} from "./MetadataStore";
import {HybridMetadataStore} from "./HybridMetadataStore";
import {applyJsPreprocessing, processQueriesExternally} from "./preprocess";
import {EventEmitter} from "events";

export const BOT_NAME = 'SDZeroBot';
export const TEMPLATE = 'Database report';
Expand Down Expand Up @@ -45,12 +46,12 @@ export function getQueriesFromText(text: string, title: string): Query[] {
new Query(template, title, idx + 1, !!template.getValue('preprocess_js')?.trim()));
}

export async function processQueries(allQueries: Record<string, Query[]>) {
export async function processQueries(allQueries: Record<string, Query[]>, notifier?: EventEmitter) {
await bot.batchOperation(Object.entries(allQueries), async ([page, queries]) => {
if (queries.filter(q => q.needsExternalRun).length > 0) {
// Needs an external process for security
log(`[+] Processing page ${page} using child process`);
await processQueriesExternally(page);
await processQueriesExternally(page, notifier);
} else {
log(`[+] Processing page ${page}`);
await processQueriesForPage(queries);
Expand Down Expand Up @@ -85,7 +86,7 @@ export async function checkShutoff() {

const queriesLog = createLogStream('queries.log');

export class Query {
export class Query extends EventEmitter {

/// Step 1. Parse the query
/// Step 2. Run the query
Expand Down Expand Up @@ -138,13 +139,19 @@ export class Query {
needsForceKill = false;

constructor(template: Template, page: string, idxOnPage: number, external?: boolean) {
super();
this.page = page;
this.template = template;
this.idx = idxOnPage;
this.needsExternalRun = external;
this.context = getContext();
}

/** Produce events for progress tracking from web UI (if invoked from web endpoint) */
emit(...args) {
return super.emit('message', ...args);
}

toString() {
return this.page + (this.idx !== 1 ? ` (#${this.idx})` : '');
}
Expand All @@ -156,6 +163,7 @@ export class Query {
const resultText = await this.formatResults(result);
await this.save(resultText);
await metadataStore.updateLastTimestamp(this);
this.emit('done-one');
} catch (err) {
if (err instanceof HandledError) return;
emailOnError(err, 'db-tabulator');
Expand Down Expand Up @@ -260,8 +268,10 @@ export class Query {
let query = `SET STATEMENT max_statement_time = ${QUERY_TIMEOUT} FOR ${this.config.sql.trim()}`;
query = this.appendLimit(query);
queriesLog(`Page: [[${this.page}]], context: ${this.context}, query: ${query}`);
this.emit('query-executing', this.config.sql);
return db.timedQuery(query).then(([timeTaken, queryResult]) => {
this.queryRuntime = timeTaken.toFixed(2);
this.emit('query-executed', this.queryRuntime);
log(`[+] ${this}: Took ${this.queryRuntime} seconds`);
return queryResult;
}).catch(async (err: SQLError) => {
Expand Down Expand Up @@ -378,7 +388,6 @@ export class Query {
}

async formatResultSet(result, pageNumber: number) {

let numColumns = Object.keys(result[0]).length;
for (let i = 1; i <= numColumns; i++) {
// Stringify everything
Expand All @@ -391,7 +400,7 @@ export class Query {
if (this.getTemplateValue('preprocess_js')) {
const jsCode = stripOuterNowikis(this.getTemplateValue('preprocess_js'));
try {
result = await applyJsPreprocessing(result, jsCode, this.toString(), this);
result = await applyJsPreprocessing(result, jsCode, this);
} catch (e) {
log(`[E] Error in applyJsPreprocessing`);
log(e);
Expand Down Expand Up @@ -584,6 +593,7 @@ export class Query {
return;
}
let outputPage = this.config.outputPage || this.page;
this.emit('saving', outputPage);
let page = new bot.page(outputPage);
let firstPageResult = Array.isArray(queryResult) ? queryResult[0] : queryResult;
try {
Expand All @@ -595,6 +605,7 @@ export class Query {
summary: this.generateEditSummary(isError)
};
});
this.emit('save-success', outputPage);
} catch (err) {
if (isError) { // error on an error logging attempt, just throw now
throw err;
Expand All @@ -606,18 +617,21 @@ export class Query {
if (err.code === 'protectedpage') {
throw err;
}
this.emit('save-failed', outputPage, err.message);
return this.saveWithError(`Error while saving report: ${err.message}`);
}
if (Array.isArray(queryResult)) { // paginated result (output_page is not applicable in this case)
for (let [idx, resultText] of Object.entries(queryResult)) {
let pageNumber = parseInt(idx) + 1;
if (pageNumber === 1) continue; // already saved above
let subpage = new bot.page(outputPage + '/' + pageNumber);
this.emit('saving', subpage.getPrefixedText());
await subpage.save(
`{{Database report/subpage|page=${pageNumber}|num_pages=${this.numPages}}}\n` +
resultText,
'Updating database report'
);
this.emit('save-success', subpage.getPrefixedText());
}
for (let i = this.numPages + 1; i <= MAX_SUBPAGES; i++) {
let subpage = new bot.page(outputPage + '/' + i);
Expand Down Expand Up @@ -667,6 +681,7 @@ export class Query {
if (endTemplateStartIdx === -1) {
// Still no? Record for edit summary
this.endNotFound = true;
this.emit('end-not-found');
}
}
let textToReplace = text.slice(
Expand Down
13 changes: 12 additions & 1 deletion db-tabulator/external-update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@ import {metadataStore, fetchQueriesForPage, processQueriesForPage} from "./app";
]);

const queries = await fetchQueriesForPage(argv.page);

// Send progress events to parent process for display in web UI
for (let query of queries) {
query.on('message', (...args) => {
process.send({
code: args[0],
args: args.slice(1)
});
});
}

await processQueriesForPage(queries);

if (queries.filter(q => q.needsForceKill).length > 0) {
process.send({ code: 'catastrophic-error' });
process.send({ code: 'catastrophic-error', args: [] });
}

})().catch(e => emailOnError(e, 'db-tabulator-child'));
44 changes: 24 additions & 20 deletions db-tabulator/preprocess.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import {argv, log} from "../botbase";
import {sleep} from "../../mwn/build/utils";
import {fork} from "child_process";
import EventEmitter from "events";
import type {Query} from "./app";

const softTimeout = 1000;
const hardTimeout = 1500;
const processTimeout = 30000;

interface PreprocessContext {
warnings: Array<string>;
needsForceKill: boolean;
}

async function timedPromise(timeout: number, promise: Promise<void>, cleanup: () => void) {
let t: NodeJS.Timeout;
await Promise.race([
Expand All @@ -27,7 +23,7 @@ async function timedPromise(timeout: number, promise: Promise<void>, cleanup: ()
});
}

export async function processQueriesExternally(page: string) {
export async function processQueriesExternally(page: string, notifier?: EventEmitter) {
const controller = new AbortController();
await timedPromise(
processTimeout,
Expand All @@ -45,6 +41,9 @@ export async function processQueriesExternally(page: string) {
if (message.code === 'catastrophic-error') {
controller.abort(); // This triggers exit event
}
if (notifier) {
notifier.emit('message', message.code, ...message.args);
}
});
child.on('error', (err) => {
log(`[E] Error from child process`);
Expand All @@ -57,13 +56,14 @@ export async function processQueriesExternally(page: string) {
log(`[E] Aborting child process as it took more than ${processTimeout/1000} seconds`);
// FIXME? looks necessary as some errors in child process cause it to never resolve/reject
controller.abort();
notifier.emit('process-timed-out');
}
);
}

export async function applyJsPreprocessing(rows: Record<string, string>[], jsCode: string, queryId: string,
ctx: PreprocessContext): Promise<Record<string, any>[]> {
log(`[+] Applying JS preprocessing for ${queryId}`);
export async function applyJsPreprocessing(rows: Record<string, string>[], jsCode: string, query: Query): Promise<Record<string, any>[]> {
log(`[+] Applying JS preprocessing for ${query}`);
query.emit('preprocessing');
let startTime = process.hrtime.bigint();

// Import dynamically as this has native dependencies
Expand All @@ -73,7 +73,7 @@ export async function applyJsPreprocessing(rows: Record<string, string>[], jsCod
memoryLimit: 16,
onCatastrophicError(msg) {
log(`[E] Catastrophic error in isolated-vm: ${msg}`);
ctx.needsForceKill = true;
query.needsForceKill = true;
}
});
const context = await isolate.createContext();
Expand All @@ -95,20 +95,23 @@ export async function applyJsPreprocessing(rows: Record<string, string>[], jsCod
if (Array.isArray(userCodeResultParsed)) {
result = userCodeResultParsed;
} else {
log(`[E] JS preprocessing for ${queryId} returned a non-array: ${userCodeResult.slice(0, 100)} ... Ignoring.`);
ctx.warnings.push(`JS preprocessing didn't return an array of rows, will be ignored`);
log(`[E] JS preprocessing for ${query} returned a non-array: ${userCodeResult.slice(0, 100)} ... Ignoring.`);
query.warnings.push(`JS preprocessing didn't return an array of rows, will be ignored`);
query.emit('js-no-array');
}
} else {
log(`[E] JS preprocessing for ${queryId} has an invalid return value: ${userCodeResult}. Ignoring.`);
ctx.warnings.push(`JS preprocessing must have a transferable return value`);
log(`[E] JS preprocessing for ${query} has an invalid return value: ${userCodeResult}. Ignoring.`);
query.warnings.push(`JS preprocessing must have a transferable return value`);
query.emit('js-invalid-return');
}
} catch (e) { // Shouldn't occur as we are the ones doing the JSON.stringify
log(`[E] JS preprocessing for ${queryId} returned a non-JSON: ${userCodeResult.slice(0, 100)}. Ignoring.`);
log(`[E] JS preprocessing for ${query} returned a non-JSON: ${userCodeResult.slice(0, 100)}. Ignoring.`);
}
} catch (e) {
log(`[E] JS preprocessing for ${queryId} failed: ${e.toString()}`);
log(`[E] JS preprocessing for ${query} failed: ${e.toString()}`);
log(e);
ctx.warnings.push(`JS preprocessing failed: ${e.toString()}`);
query.warnings.push(`JS preprocessing failed: ${e.toString()}`);
query.emit('js-failed', e.toString());
}
}

Expand All @@ -123,8 +126,9 @@ export async function applyJsPreprocessing(rows: Record<string, string>[], jsCod
);

let endTime = process.hrtime.bigint();
let timeTaken = Number(endTime - startTime) / 1e9;
log(`[+] JS preprocessing for ${queryId} took ${timeTaken.toFixed(3)} seconds, cpuTime: ${isolate.cpuTime}, wallTime: ${isolate.wallTime}.`);
let timeTaken = (Number(endTime - startTime) / 1e9).toFixed(3);
log(`[+] JS preprocessing for ${query} took ${timeTaken} seconds, cpuTime: ${isolate.cpuTime}, wallTime: ${isolate.wallTime}.`);
query.emit('preprocessing-complete', timeTaken);

return result;
}
2 changes: 1 addition & 1 deletion db-tabulator/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ describe('db-tabulator', () => {
row.id = parseInt(row.id) + 100;
})
return rows;
}`, 'test', { warnings: [], needsForceKill: false }));
}`, new Query(new Template('{{}}'), '', 1)));
})

});
Loading

0 comments on commit 03087ff

Please sign in to comment.