diff --git a/.github/workflows/restart-services.yml b/.github/workflows/restart-services.yml index 576a4b0..b6b7f1e 100644 --- a/.github/workflows/restart-services.yml +++ b/.github/workflows/restart-services.yml @@ -7,7 +7,7 @@ jobs: update: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: garygrossgarten/github-action-ssh@915e492551885a89131e21d85f2e043c96abff80 with: command: >- diff --git a/.github/workflows/toolforge-deploy.yml b/.github/workflows/toolforge-deploy.yml index 4fa4b39..df4ce55 100644 --- a/.github/workflows/toolforge-deploy.yml +++ b/.github/workflows/toolforge-deploy.yml @@ -14,7 +14,7 @@ jobs: deploy: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: garygrossgarten/github-action-ssh@915e492551885a89131e21d85f2e043c96abff80 with: command: >- diff --git a/eventstream-router/main.ts b/eventstream-router/main.ts index 4e20589..ef6b64d 100644 --- a/eventstream-router/main.ts +++ b/eventstream-router/main.ts @@ -13,6 +13,7 @@ import dbTabulator from "../db-tabulator/eventstream-trigger"; import dbTabulatorMetadata from "../db-tabulator/eventstream-metadata-maintainer"; import shutoffsMonitor from "./routes/shutoffs-monitor"; import dykCountsTask from "./routes/dyk-counts"; +import purger from "./routes/purger" const routeClasses = [ gans, @@ -20,6 +21,7 @@ const routeClasses = [ botActivityMonitor, dbTabulatorMetadata, shutoffsMonitor, + purger, ]; // debugging a single route example: -r "./test" diff --git a/eventstream-router/routes/purger.ts b/eventstream-router/routes/purger.ts new file mode 100644 index 0000000..f511325 --- /dev/null +++ b/eventstream-router/routes/purger.ts @@ -0,0 +1,127 @@ +import {Route} from "../app"; +import {RecentChangeStreamEvent} from "../RecentChangeStreamEvent"; +import {bot} from "../../botbase"; +import {arrayChunk, setDifference} from "../../utils"; +import {Template} from "mwn/build/wikitext"; +import {ActionQueue, BufferedQueue} from "../../queue"; +import {ApiPurgeParams} from "mwn/build/api_params"; + +export default class Purger extends Route { + readonly name = "purger"; + + readonly CONF_PAGE = "User:SDZeroBot/Purge list"; + + scheduledPurges: Map = new Map(); + + purgeBatchQueue = new BufferedQueue(2000, this.queuePurgeRequest.bind(this)); + purgeRequestQueue = new ActionQueue(1, this.executePurgeRequest.bind(this)); + + async init() { + super.init(); + const entries = await this.parseEntries(); + this.registerChanges(entries, new Set(), true); + } + + filter(data: RecentChangeStreamEvent): boolean { + return data.wiki === 'enwiki' && data.title === this.CONF_PAGE; + } + + async worker(data: RecentChangeStreamEvent) { + const entries = await this.parseEntries(); + + const addedEntries = setDifference(entries, new Set(this.scheduledPurges.keys())); + const removedEntries = setDifference(new Set(this.scheduledPurges.keys()), entries); + + this.registerChanges(addedEntries, removedEntries); + } + + registerChanges(addedEntries: Set, removedEntries: Set, onRestart = false) { + for (let entry of removedEntries) { + clearInterval(this.scheduledPurges.get(entry)); + this.scheduledPurges.delete(entry); + } + for (let entry of addedEntries) { + if (!Number.isNaN(entry.intervalDays)) { + const interval = entry.intervalDays * 8.64e7; + this.scheduledPurges.set(entry, setInterval(() => this.purgeBatchQueue.push(entry), interval)); + } else { + if (!onRestart) { + // no interval, so trigger a one-off purge + this.purgeBatchQueue.push(entry); + } + } + } + // XXX: if there are multiple {{database report}}s on a page, update of one would trigger unnecessary + // one-off purges of pages in other query results. + // If we purge only newly added links, we may miss pages which actually need to be re-purged. + } + + async queuePurgeRequest(entries: Array) { + // 4 permutations + [ + entries.filter(e => e.forceLinkUpdate && e.forceRecursiveLinkUpdate), + entries.filter(e => e.forceLinkUpdate && !e.forceRecursiveLinkUpdate), + entries.filter(e => !e.forceLinkUpdate && !e.forceRecursiveLinkUpdate), + entries.filter(e => !e.forceLinkUpdate && e.forceRecursiveLinkUpdate), + ].forEach(batch => { + const subBatches = arrayChunk(batch, 100); + subBatches.forEach(subBatch => { + this.purgeRequestQueue.push({ + action: 'purge', + titles: subBatch.map(e => e.page), + forcelinkupdate: subBatch[0].forceLinkUpdate, + forcerecursivelinkupdate: subBatch[0].forceRecursiveLinkUpdate + }); + }); + }); + } + + async executePurgeRequest(purgeParams: ApiPurgeParams) { + try { + await bot.request(purgeParams); + this.log(`[V] Purged titles ${purgeParams.titles}`); + this.log(`[+] Purged batch of ${purgeParams.titles.length} pages`); + await bot.sleep(2000); // Sleep interval between successive purges + } catch (e) { + this.log(`[V] Failed to purge titles ${purgeParams.titles}`); + this.log(`[E] Failed to purge batch of ${purgeParams.titles.length} pages`); + this.log(e); + } + } + + async parseEntries() { + const text = (await bot.read(this.CONF_PAGE)).revisions[0].content; + const entries = bot.Wikitext.parseTemplates(text, { + namePredicate: name => name === '/purge' + }); + this.log(`[V] Parsed ${entries.length} titles from ${this.CONF_PAGE}`); + + const existingEntries = Object.fromEntries( + [...this.scheduledPurges.keys()].map(e => [e.serialize(), e]) + ); + return new Set(entries.map(e => { + const entry = new PurgeEntry(e); + // return reference to existing entry if present, as that facilitates easy setDifference + return existingEntries[entry.serialize()] ?? entry; + })); + } + +} + +class PurgeEntry { + page: string + intervalDays: number + forceLinkUpdate: boolean + forceRecursiveLinkUpdate: boolean + constructor(entry: Template) { + this.page = entry.getParam(1).value; + this.intervalDays = parseInt(entry.getParam('interval')?.value); + + // any non-empty value represents true! + this.forceLinkUpdate = Boolean(entry.getParam('forcelinkupdate')?.value); + this.forceRecursiveLinkUpdate = Boolean(entry.getParam('forcerecursivelinkupdate')?.value); + } + serialize() { + return `${this.page}__${this.intervalDays}__${this.forceLinkUpdate}__${this.forceRecursiveLinkUpdate}`; + } +} diff --git a/reports/afc-draft-purger.ts b/experiments/afc-draft-purger.ts similarity index 100% rename from reports/afc-draft-purger.ts rename to experiments/afc-draft-purger.ts diff --git a/queue.ts b/queue.ts new file mode 100644 index 0000000..71d1283 --- /dev/null +++ b/queue.ts @@ -0,0 +1,60 @@ +/** + * Queue for actions to be performed with a limited concurrency. + */ +export class ActionQueue { + action: (e: T) => Promise; + parallelism: number; + pendingQueue: Array = []; + running = 0; + + constructor(parallelism: number, action: (e: T) => Promise) { + this.parallelism = parallelism; + this.action = action; + } + + push(e: T) { + this.pendingQueue.push(e); + this.trigger(); + } + + trigger() { + while (this.running < this.parallelism && this.pendingQueue.length) { + const element = this.pendingQueue.shift(); + this.running++; + Promise.resolve(this.action(element)).finally(() => { + this.running--; + this.trigger(); + }); + } + } + +} + +/** + * Queue for items occurring together in time to be grouped into batches. + */ +export class BufferedQueue { + duration: number; + currentBatch: Array = []; + currentBatchTimeout: NodeJS.Timeout; + batchConsumer: (batch: Array) => Promise; + + constructor(duration: number, batchConsumer: (batch: Array) => Promise) { + this.duration = duration; + this.batchConsumer = batchConsumer; + } + + push(e: T) { + this.currentBatch.push(e); + if (this.currentBatchTimeout) { + clearTimeout(this.currentBatchTimeout); + } + this.currentBatchTimeout = setTimeout(this.finalizeBatch.bind(this), this.duration) + } + + finalizeBatch() { + this.batchConsumer(this.currentBatch) + this.currentBatch = []; + clearTimeout(this.currentBatchTimeout); + } +}