Skip to content

Commit

Permalink
eventstream-router: add first task: g13-watch
Browse files Browse the repository at this point in the history
  • Loading branch information
siddharthvp committed Nov 24, 2020
1 parent 59de68f commit 829ed76
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 15 deletions.
4 changes: 3 additions & 1 deletion eventstream-router/EventSource.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/**
* Forked from https://github.com/EventSource/eventsource
* Forked from npm eventsource module
* https://github.com/EventSource/eventsource
* (MIT license)
*/

var original = require('original');
Expand Down
85 changes: 85 additions & 0 deletions eventstream-router/g13-watch.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 86 additions & 0 deletions eventstream-router/g13-watch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import {fs, bot, mysql, argv} from '../botbase';
import {streamLog} from './utils';
const {preprocessDraftForExtract} = require('../tasks/commons');
const TextExtractor = require('../TextExtractor')(bot);
const auth = require('../.auth');

let log, pool;

export async function init() {
log = streamLog.bind(fs.createWriteStream('./g13-watch.out', {flags: 'a', encoding: 'utf8'}));

log(`[S] Started`);
await bot.getSiteInfo();
pool = await initDb();
}

async function initDb() {
// Create a pool, but almost all the time only one connection will be used
// Each pool connection is released immediately after use
const pool = mysql.createPool({
host: 'tools.db.svc.eqiad.wmflabs',
user: auth.db_user,
password: auth.db_password,
port: 3306,
database: 's54328__g13watch_p',
waitForConnections: true,
connectionLimit: 5
});

await pool.execute(`
CREATE TABLE IF NOT EXISTS g13(
name VARCHAR(255) UNIQUE,
description VARCHAR(255),
excerpt BLOB,
size INT,
ts TIMESTAMP NOT NULL
) COLLATE 'utf8_unicode_ci'
`); // use utf8_unicode_ci so that MariaDb allows a varchar(255) field to have unique constraint
// max index column size is 767 bytes. 255*3 = 765 bytes with utf8, 255*4 = 1020 bytes with utf8mb4

return pool;
}

export function filter(data) {
return data.wiki === 'enwiki' &&
data.type === 'categorize' &&
data.title === 'Category:Candidates for speedy deletion as abandoned drafts or AfC submissions';
}

export async function worker(data) {
let match = /^\[\[:(.*?)\]\] added/.exec(data.comment);
if (!match) {
return;
}

let title = match[1];
// data.timestamp is *seconds* since epoch
// This date object will be passed to db
let ts = data.timestamp ? new bot.date(data.timestamp * 1000) : null;
log(`[+] Page ${title} at ${ts}`);
let pagedata = await bot.read(title, {
prop: 'revisions|description',
rvprop: 'content|size'
});
let text = pagedata?.revisions?.[0]?.content ?? null;
let size = pagedata?.revisions?.[0].size ?? null;
let desc = pagedata?.description ?? null;
if (desc && desc.size > 255) {
desc = desc.slice(0, 250) + ' ...';
}
let extract = TextExtractor.getExtract(text, 300, 550, preprocessDraftForExtract);

let conn;
try {
conn = await pool.getConnection();
await conn.execute(`INSERT INTO g13 VALUES(?, ?, ?, ?, ?)`, [title, desc, extract, size, ts]);
} catch (err) {
if (err.code === 'ER_DUP_ENTRY') {
log(`[W] ${title} entered category more than once`);
return;
}
log(err);
} finally {
await conn.release();
}
}
49 changes: 49 additions & 0 deletions eventstream-router/index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Type definitions for eventsource 1.1
// Project: http://github.com/EventSource/eventsource
// Definitions by: Scott Lee Davis <https://github.com/scottleedavis>
// Ali Afroozeh <https://github.com/afroozeh>
// Pedro Gámez <https://github.com/snakedrak>
// Akuukis <https://github.com/Akuukis>
// Definitions: https://github.com/DefinitelyTyped/DefinitelyTyped

// eventsource uses DOM dependencies which are absent in a browserless environment like node.js.
// to avoid compiler errors this monkey patch is used. See more details in:
// - sinon: https://github.com/DefinitelyTyped/DefinitelyTyped/issues/11351
// - rxjs: https://github.com/ReactiveX/rxjs/issues/1986
/// <reference path="./dom-monkeypatch.d.ts" />

declare class EventSource {
static readonly CLOSED: number;
static readonly CONNECTING: number;
static readonly OPEN: number;

constructor(url: string, eventSourceInitDict?: EventSource.EventSourceInitDict);

readonly CLOSED: number;
readonly CONNECTING: number;
readonly OPEN: number;
readonly url: string;
readonly readyState: number;
readonly withCredentials: boolean;
onopen: (evt: MessageEvent) => any;
onmessage: (evt: MessageEvent) => any;
onerror: (evt: MessageEvent) => any;
addEventListener(type: string, listener: EventListener): void;
dispatchEvent(evt: Event): boolean;
removeEventListener(type: string, listener?: EventListener): void;
close(): void;
}

declare namespace EventSource {
enum ReadyState { CONNECTING = 0, OPEN = 1, CLOSED = 2 }

interface EventSourceInitDict {
withCredentials?: boolean;
headers?: object;
proxy?: string;
https?: object;
rejectUnauthorized?: boolean;
}
}

export = EventSource;
26 changes: 14 additions & 12 deletions eventstream-router/main.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
const {fs, bot, log, argv} = require('../botbase');
const EventSource = require('./EventSource');

// TODO: separate main.js logs with route worker logs

function logError(err, task) {
let taskFmt = task ? `[${task}]` : '';
let stringified;
Expand Down Expand Up @@ -57,8 +55,8 @@ class Route {
try {
exported = require('./' + file);
} catch (e) {
console.error(`Invalid route ${this.name}: require() failed`);
console.error(e);
log(`Invalid route ${this.name}: require() failed`);
log(e);
this.isValid = false;
return;
}
Expand All @@ -68,7 +66,7 @@ class Route {

this.isValid = typeof this.filter === 'function' && typeof this.worker === 'function';
if (!this.isValid) {
console.error(`Ignoring ${this.name}: filter or worker is not a function`);
log(`Ignoring ${this.name}: filter or worker is not a function`);
return;
}
this.ready = new Promise((resolve, reject) => {
Expand Down Expand Up @@ -107,7 +105,7 @@ setInterval(function () {
}, lastSeenUpdateInterval);

async function main() {
debug('[S] Restarted main');
log('[S] Restarted main');

let lastTs;
try {
Expand All @@ -128,7 +126,7 @@ async function main() {
stream.onopen = function () {
// EventStreams API drops connection every 15 minutes ([[phab:T242767]])
// So this will be invoked every time that happens.
log(`[i] Connected`);
log(`[i] Reconnected`);
}
stream.onerror = function (evt) {
if (evt.type === 'error' && evt.message === undefined) {
Expand All @@ -138,11 +136,15 @@ async function main() {
log(`[W] Event source encountered error:`);
console.log(evt);
logError(evt);
// if (evt.status === 429) { // Too Many Requests, the underlying library doesn't restart by itself
// bot.sleep(5000).then(() => {
// return go(); // restart
// });
// }

// TODO: handle other errors, ensure auto-reconnection

if (evt.status === 429) { // Too Many Requests, the underlying library doesn't reconnect by itself
stream.close(); // just to be safe that there aren't two parallel connections
bot.sleep(5000).then(() => {
return go(); // restart
});
}
}
stream.onmessage = function (event) {
let data = JSON.parse(event.data);
Expand Down
7 changes: 7 additions & 0 deletions eventstream-router/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"scripts": {
"start": "jstart -N stream -mem 2g ~/bin/node ~/SDZeroBot/eventstream-router/main.js",
"stop": "jstop stream",
"restart": "npm run stop && sleep 10 && npm run start"
}
}
14 changes: 14 additions & 0 deletions eventstream-router/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"compilerOptions": {
"target": "ES2020",
"lib": [
"ES2020"
],
"module": "CommonJS",
"types": ["node", "mocha"],
"sourceMap": true
},
"ts-node": {
"logError": true
}
}
16 changes: 15 additions & 1 deletion eventstream-router/utils.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion eventstream-router/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,21 @@ import {bot} from '../botbase';
// before use
export function streamLog(msg) {
let ts = new bot.date().format('YYYY-MM-DD HH:mm:ss');
let stringified;
if (typeof msg === 'string') {
this.write(`[${ts}] ${msg}\n`);
} else if (stringified = stringifyObject(msg)) {
this.write(`[${ts}] ${stringified}\n`);
} else {
this.write(`[${ts}] ${JSON.stringify(msg)}\n`);
this.write(`[${ts}] [Non-stringifiable object!]\n`);
}
}

// JSON.stringify throws on a cyclic object
function stringifyObject(obj) {
try {
return JSON.stringify(obj, null, 2);
} catch (e) {
return null;
}
}

0 comments on commit 829ed76

Please sign in to comment.