From d98386f32a5719d25ed25032c13d8d3a13cd6766 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 5 Dec 2023 17:31:31 -0800 Subject: [PATCH] Run the overrides worker. --- plugin-server/src/main/pluginsServer.ts | 22 +++++++++ plugin-server/src/utils/periodic-task.ts | 47 +++++++++++++++++++ .../tests/utils/periodic-task.test.ts | 24 ++++++++++ 3 files changed, 93 insertions(+) create mode 100644 plugin-server/src/utils/periodic-task.ts create mode 100644 plugin-server/tests/utils/periodic-task.test.ts diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index cf51b7713eb8e9..5b39b17cdbdfa2 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -15,11 +15,13 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' import { PostgresRouter } from '../utils/db/postgres' import { cancelAllScheduledJobs } from '../utils/node-schedule' +import { PeriodicTask } from '../utils/periodic-task' import { PubSub } from '../utils/pubsub' import { status } from '../utils/status' import { delay } from '../utils/utils' import { AppMetrics } from '../worker/ingestion/app-metrics' import { OrganizationManager } from '../worker/ingestion/organization-manager' +import { DeferredPersonOverrideWriter } from '../worker/ingestion/person-state' import { TeamManager } from '../worker/ingestion/team-manager' import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina' import { GraphileWorker } from './graphile-worker/graphile-worker' @@ -108,6 +110,8 @@ export async function startPluginsServer( let jobsConsumer: Consumer | undefined let schedulerTasksConsumer: Consumer | undefined + let personOverridesWorker: PeriodicTask + let httpServer: Server | undefined // healthcheck server let graphileWorker: GraphileWorker | undefined @@ -146,6 +150,7 @@ export async function startPluginsServer( jobsConsumer?.disconnect(), stopSessionRecordingBlobConsumer?.(), schedulerTasksConsumer?.disconnect(), + personOverridesWorker?.stop(), ]) if (piscina) { @@ -428,6 +433,23 @@ export async function startPluginsServer( } } + if (capabilities.personOverrides) { + const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) + const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) + const overridesWriter = new DeferredPersonOverrideWriter(postgres) + + personOverridesWorker = new PeriodicTask(async () => { + status.debug('👥', 'Processing pending overrides...') + const overridesCount = await overridesWriter.processPendingOverrides(kafkaProducer) + ;(overridesCount > 0 ? status.info : status.debug)( + '👥', + `Processed ${overridesCount} pending overrides.` + ) + }) + + healthChecks['person-overrides'] = () => personOverridesWorker.isRunning() + } + if (capabilities.http) { httpServer = createHttpServer(serverConfig.HTTP_SERVER_PORT, healthChecks, analyticsEventsIngestionConsumer) } diff --git a/plugin-server/src/utils/periodic-task.ts b/plugin-server/src/utils/periodic-task.ts new file mode 100644 index 00000000000000..13ee3fcba90055 --- /dev/null +++ b/plugin-server/src/utils/periodic-task.ts @@ -0,0 +1,47 @@ +import { status } from './status' +import { sleep } from './utils' + +export class PeriodicTask { + private promise: Promise + private running = true + private abortController: AbortController + + constructor(task: () => Promise | void, intervalMs = 1000, minimumWaitMs = 1000) { + this.abortController = new AbortController() + + const abortRequested = new Promise((resolve) => { + this.abortController.signal.addEventListener('abort', resolve, { once: true }) + }) + + this.promise = new Promise(async (resolve, reject) => { + try { + status.debug('🔄', 'Periodic task starting...', { task }) + while (!this.abortController.signal.aborted) { + const startTimeMs = +Date.now() + await task() + const waitTimeMs = Math.max(intervalMs - startTimeMs, minimumWaitMs) + status.debug('🔄', `Next evaluation in ${waitTimeMs / 1000}s`, { task }) + await Promise.race([sleep(waitTimeMs), abortRequested]) + } + status.info('✅', 'Periodic task stopped by request.', { task }) + resolve() + } catch (error) { + status.warn('⚠️', 'Error in periodic task!', { task, error }) + reject(error) + } finally { + this.running = false + } + }) + } + + public isRunning(): boolean { + return this.running + } + + public async stop(): Promise { + this.abortController.abort() + try { + await this.promise + } catch {} + } +} diff --git a/plugin-server/tests/utils/periodic-task.test.ts b/plugin-server/tests/utils/periodic-task.test.ts new file mode 100644 index 00000000000000..805517485adac4 --- /dev/null +++ b/plugin-server/tests/utils/periodic-task.test.ts @@ -0,0 +1,24 @@ +import { PeriodicTask } from '../../src/utils/periodic-task' + +describe('PeriodicTask', () => { + describe('updates completion status', () => { + it('on success', async () => { + const fn = jest.fn() + const task = new PeriodicTask(fn) + expect(fn).toBeCalled() + expect(task.isRunning()).toEqual(true) + await task.stop() + expect(task.isRunning()).toEqual(false) + }) + + it('on failure', async () => { + const fn = jest.fn(() => { + throw new Error() + }) + const task = new PeriodicTask(fn) + expect(fn).toBeCalled() + await task.stop() + expect(task.isRunning()).toEqual(false) + }) + }) +})