From 829ed7690dd0c10b275f23d6e2a414321ab6a84f Mon Sep 17 00:00:00 2001 From: Siddharth VP Date: Tue, 24 Nov 2020 22:26:27 +0530 Subject: [PATCH] eventstream-router: add first task: g13-watch --- eventstream-router/EventSource.js | 4 +- eventstream-router/g13-watch.js | 85 ++++++++++++++++++++++++++++++ eventstream-router/g13-watch.ts | 86 +++++++++++++++++++++++++++++++ eventstream-router/index.d.ts | 49 ++++++++++++++++++ eventstream-router/main.js | 26 +++++----- eventstream-router/package.json | 7 +++ eventstream-router/tsconfig.json | 14 +++++ eventstream-router/utils.js | 16 +++++- eventstream-router/utils.ts | 14 ++++- 9 files changed, 286 insertions(+), 15 deletions(-) create mode 100644 eventstream-router/g13-watch.js create mode 100644 eventstream-router/g13-watch.ts create mode 100644 eventstream-router/index.d.ts create mode 100644 eventstream-router/package.json create mode 100644 eventstream-router/tsconfig.json diff --git a/eventstream-router/EventSource.js b/eventstream-router/EventSource.js index 62cb2b1..47699cc 100644 --- a/eventstream-router/EventSource.js +++ b/eventstream-router/EventSource.js @@ -1,5 +1,7 @@ /** - * Forked from https://github.com/EventSource/eventsource + * Forked from npm eventsource module + * https://github.com/EventSource/eventsource + * (MIT license) */ var original = require('original'); diff --git a/eventstream-router/g13-watch.js b/eventstream-router/g13-watch.js new file mode 100644 index 0000000..ef69708 --- /dev/null +++ b/eventstream-router/g13-watch.js @@ -0,0 +1,85 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.worker = exports.filter = exports.init = void 0; +const botbase_1 = require("../botbase"); +const utils_1 = require("./utils"); +const { preprocessDraftForExtract } = require('../tasks/commons'); +const TextExtractor = require('../TextExtractor')(botbase_1.bot); +const auth = require('../.auth'); +let log, pool; +async function init() { + log = utils_1.streamLog.bind(botbase_1.fs.createWriteStream('./g13-watch.out', { flags: 'a', encoding: 'utf8' })); + log(`[S] Started`); + await botbase_1.bot.getSiteInfo(); + pool = await initDb(); +} +exports.init = init; +async function initDb() { + // Create a pool, but almost all the time only one connection will be used + // Each pool connection is released immediately after use + const pool = botbase_1.mysql.createPool({ + host: 'tools.db.svc.eqiad.wmflabs', + user: auth.db_user, + password: auth.db_password, + port: 3306, + database: 's54328__g13watch_p', + waitForConnections: true, + connectionLimit: 5 + }); + await pool.execute(` + CREATE TABLE IF NOT EXISTS g13( + name VARCHAR(255) UNIQUE, + description VARCHAR(255), + excerpt BLOB, + size INT, + ts TIMESTAMP NOT NULL + ) COLLATE 'utf8_unicode_ci' + `); // use utf8_unicode_ci so that MariaDb allows a varchar(255) field to have unique constraint + // max index column size is 767 bytes. 255*3 = 765 bytes with utf8, 255*4 = 1020 bytes with utf8mb4 + return pool; +} +function filter(data) { + return data.wiki === 'enwiki' && + data.type === 'categorize' && + data.title === 'Category:Candidates for speedy deletion as abandoned drafts or AfC submissions'; +} +exports.filter = filter; +async function worker(data) { + let match = /^\[\[:(.*?)\]\] added/.exec(data.comment); + if (!match) { + return; + } + let title = match[1]; + // data.timestamp is *seconds* since epoch + // This date object will be passed to db + let ts = data.timestamp ? new botbase_1.bot.date(data.timestamp * 1000) : null; + log(`[+] Page ${title} at ${ts}`); + let pagedata = await botbase_1.bot.read(title, { + prop: 'revisions|description', + rvprop: 'content|size' + }); + let text = pagedata?.revisions?.[0]?.content ?? null; + let size = pagedata?.revisions?.[0].size ?? null; + let desc = pagedata?.description ?? null; + if (desc && desc.size > 255) { + desc = desc.slice(0, 250) + ' ...'; + } + let extract = TextExtractor.getExtract(text, 300, 550, preprocessDraftForExtract); + let conn; + try { + conn = await pool.getConnection(); + await conn.execute(`INSERT INTO g13 VALUES(?, ?, ?, ?, ?)`, [title, desc, extract, size, ts]); + } + catch (err) { + if (err.code === 'ER_DUP_ENTRY') { + log(`[W] ${title} entered category more than once`); + return; + } + log(err); + } + finally { + await conn.release(); + } +} +exports.worker = worker; +//# sourceMappingURL=g13-watch.js.map \ No newline at end of file diff --git a/eventstream-router/g13-watch.ts b/eventstream-router/g13-watch.ts new file mode 100644 index 0000000..817e4a3 --- /dev/null +++ b/eventstream-router/g13-watch.ts @@ -0,0 +1,86 @@ +import {fs, bot, mysql, argv} from '../botbase'; +import {streamLog} from './utils'; +const {preprocessDraftForExtract} = require('../tasks/commons'); +const TextExtractor = require('../TextExtractor')(bot); +const auth = require('../.auth'); + +let log, pool; + +export async function init() { + log = streamLog.bind(fs.createWriteStream('./g13-watch.out', {flags: 'a', encoding: 'utf8'})); + + log(`[S] Started`); + await bot.getSiteInfo(); + pool = await initDb(); +} + +async function initDb() { + // Create a pool, but almost all the time only one connection will be used + // Each pool connection is released immediately after use + const pool = mysql.createPool({ + host: 'tools.db.svc.eqiad.wmflabs', + user: auth.db_user, + password: auth.db_password, + port: 3306, + database: 's54328__g13watch_p', + waitForConnections: true, + connectionLimit: 5 + }); + + await pool.execute(` + CREATE TABLE IF NOT EXISTS g13( + name VARCHAR(255) UNIQUE, + description VARCHAR(255), + excerpt BLOB, + size INT, + ts TIMESTAMP NOT NULL + ) COLLATE 'utf8_unicode_ci' + `); // use utf8_unicode_ci so that MariaDb allows a varchar(255) field to have unique constraint + // max index column size is 767 bytes. 255*3 = 765 bytes with utf8, 255*4 = 1020 bytes with utf8mb4 + + return pool; +} + +export function filter(data) { + return data.wiki === 'enwiki' && + data.type === 'categorize' && + data.title === 'Category:Candidates for speedy deletion as abandoned drafts or AfC submissions'; +} + +export async function worker(data) { + let match = /^\[\[:(.*?)\]\] added/.exec(data.comment); + if (!match) { + return; + } + + let title = match[1]; + // data.timestamp is *seconds* since epoch + // This date object will be passed to db + let ts = data.timestamp ? new bot.date(data.timestamp * 1000) : null; + log(`[+] Page ${title} at ${ts}`); + let pagedata = await bot.read(title, { + prop: 'revisions|description', + rvprop: 'content|size' + }); + let text = pagedata?.revisions?.[0]?.content ?? null; + let size = pagedata?.revisions?.[0].size ?? null; + let desc = pagedata?.description ?? null; + if (desc && desc.size > 255) { + desc = desc.slice(0, 250) + ' ...'; + } + let extract = TextExtractor.getExtract(text, 300, 550, preprocessDraftForExtract); + + let conn; + try { + conn = await pool.getConnection(); + await conn.execute(`INSERT INTO g13 VALUES(?, ?, ?, ?, ?)`, [title, desc, extract, size, ts]); + } catch (err) { + if (err.code === 'ER_DUP_ENTRY') { + log(`[W] ${title} entered category more than once`); + return; + } + log(err); + } finally { + await conn.release(); + } +} diff --git a/eventstream-router/index.d.ts b/eventstream-router/index.d.ts new file mode 100644 index 0000000..4cdc906 --- /dev/null +++ b/eventstream-router/index.d.ts @@ -0,0 +1,49 @@ +// Type definitions for eventsource 1.1 +// Project: http://github.com/EventSource/eventsource +// Definitions by: Scott Lee Davis +// Ali Afroozeh +// Pedro Gámez +// Akuukis +// Definitions: https://github.com/DefinitelyTyped/DefinitelyTyped + +// eventsource uses DOM dependencies which are absent in a browserless environment like node.js. +// to avoid compiler errors this monkey patch is used. See more details in: +// - sinon: https://github.com/DefinitelyTyped/DefinitelyTyped/issues/11351 +// - rxjs: https://github.com/ReactiveX/rxjs/issues/1986 +/// + +declare class EventSource { + static readonly CLOSED: number; + static readonly CONNECTING: number; + static readonly OPEN: number; + + constructor(url: string, eventSourceInitDict?: EventSource.EventSourceInitDict); + + readonly CLOSED: number; + readonly CONNECTING: number; + readonly OPEN: number; + readonly url: string; + readonly readyState: number; + readonly withCredentials: boolean; + onopen: (evt: MessageEvent) => any; + onmessage: (evt: MessageEvent) => any; + onerror: (evt: MessageEvent) => any; + addEventListener(type: string, listener: EventListener): void; + dispatchEvent(evt: Event): boolean; + removeEventListener(type: string, listener?: EventListener): void; + close(): void; +} + +declare namespace EventSource { + enum ReadyState { CONNECTING = 0, OPEN = 1, CLOSED = 2 } + + interface EventSourceInitDict { + withCredentials?: boolean; + headers?: object; + proxy?: string; + https?: object; + rejectUnauthorized?: boolean; + } +} + +export = EventSource; diff --git a/eventstream-router/main.js b/eventstream-router/main.js index 99d1aa0..76b691f 100644 --- a/eventstream-router/main.js +++ b/eventstream-router/main.js @@ -1,8 +1,6 @@ const {fs, bot, log, argv} = require('../botbase'); const EventSource = require('./EventSource'); -// TODO: separate main.js logs with route worker logs - function logError(err, task) { let taskFmt = task ? `[${task}]` : ''; let stringified; @@ -57,8 +55,8 @@ class Route { try { exported = require('./' + file); } catch (e) { - console.error(`Invalid route ${this.name}: require() failed`); - console.error(e); + log(`Invalid route ${this.name}: require() failed`); + log(e); this.isValid = false; return; } @@ -68,7 +66,7 @@ class Route { this.isValid = typeof this.filter === 'function' && typeof this.worker === 'function'; if (!this.isValid) { - console.error(`Ignoring ${this.name}: filter or worker is not a function`); + log(`Ignoring ${this.name}: filter or worker is not a function`); return; } this.ready = new Promise((resolve, reject) => { @@ -107,7 +105,7 @@ setInterval(function () { }, lastSeenUpdateInterval); async function main() { - debug('[S] Restarted main'); + log('[S] Restarted main'); let lastTs; try { @@ -128,7 +126,7 @@ async function main() { stream.onopen = function () { // EventStreams API drops connection every 15 minutes ([[phab:T242767]]) // So this will be invoked every time that happens. - log(`[i] Connected`); + log(`[i] Reconnected`); } stream.onerror = function (evt) { if (evt.type === 'error' && evt.message === undefined) { @@ -138,11 +136,15 @@ async function main() { log(`[W] Event source encountered error:`); console.log(evt); logError(evt); - // if (evt.status === 429) { // Too Many Requests, the underlying library doesn't restart by itself - // bot.sleep(5000).then(() => { - // return go(); // restart - // }); - // } + + // TODO: handle other errors, ensure auto-reconnection + + if (evt.status === 429) { // Too Many Requests, the underlying library doesn't reconnect by itself + stream.close(); // just to be safe that there aren't two parallel connections + bot.sleep(5000).then(() => { + return go(); // restart + }); + } } stream.onmessage = function (event) { let data = JSON.parse(event.data); diff --git a/eventstream-router/package.json b/eventstream-router/package.json new file mode 100644 index 0000000..991431c --- /dev/null +++ b/eventstream-router/package.json @@ -0,0 +1,7 @@ +{ + "scripts": { + "start": "jstart -N stream -mem 2g ~/bin/node ~/SDZeroBot/eventstream-router/main.js", + "stop": "jstop stream", + "restart": "npm run stop && sleep 10 && npm run start" + } +} diff --git a/eventstream-router/tsconfig.json b/eventstream-router/tsconfig.json new file mode 100644 index 0000000..3ab4da3 --- /dev/null +++ b/eventstream-router/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2020", + "lib": [ + "ES2020" + ], + "module": "CommonJS", + "types": ["node", "mocha"], + "sourceMap": true + }, + "ts-node": { + "logError": true + } +} diff --git a/eventstream-router/utils.js b/eventstream-router/utils.js index f67b097..05de362 100644 --- a/eventstream-router/utils.js +++ b/eventstream-router/utils.js @@ -6,11 +6,25 @@ const botbase_1 = require("../botbase"); // before use function streamLog(msg) { let ts = new botbase_1.bot.date().format('YYYY-MM-DD HH:mm:ss'); + let stringified; if (typeof msg === 'string') { this.write(`[${ts}] ${msg}\n`); } + else if (stringified = stringifyObject(msg)) { + this.write(`[${ts}] ${stringified}\n`); + } else { - this.write(`[${ts}] ${JSON.stringify(msg)}\n`); + this.write(`[${ts}] [Non-stringifiable object!]\n`); } } exports.streamLog = streamLog; +// JSON.stringify throws on a cyclic object +function stringifyObject(obj) { + try { + return JSON.stringify(obj, null, 2); + } + catch (e) { + return null; + } +} +//# sourceMappingURL=utils.js.map \ No newline at end of file diff --git a/eventstream-router/utils.ts b/eventstream-router/utils.ts index 6f7ca41..1ac1df7 100644 --- a/eventstream-router/utils.ts +++ b/eventstream-router/utils.ts @@ -4,9 +4,21 @@ import {bot} from '../botbase'; // before use export function streamLog(msg) { let ts = new bot.date().format('YYYY-MM-DD HH:mm:ss'); + let stringified; if (typeof msg === 'string') { this.write(`[${ts}] ${msg}\n`); + } else if (stringified = stringifyObject(msg)) { + this.write(`[${ts}] ${stringified}\n`); } else { - this.write(`[${ts}] ${JSON.stringify(msg)}\n`); + this.write(`[${ts}] [Non-stringifiable object!]\n`); + } +} + +// JSON.stringify throws on a cyclic object +function stringifyObject(obj) { + try { + return JSON.stringify(obj, null, 2); + } catch (e) { + return null; } }