diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 2c3a5983d84faa..9e132db6d9d74a 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -110,7 +110,7 @@ export async function startPluginsServer( let jobsConsumer: Consumer | undefined let schedulerTasksConsumer: Consumer | undefined - let personOverridesWorker: PeriodicTask | undefined + let personOverridesPeriodicTask: PeriodicTask | undefined let httpServer: Server | undefined // healthcheck server @@ -150,7 +150,7 @@ export async function startPluginsServer( jobsConsumer?.disconnect(), stopSessionRecordingBlobConsumer?.(), schedulerTasksConsumer?.disconnect(), - personOverridesWorker?.stop(), + personOverridesPeriodicTask?.stop(), ]) if (piscina) { @@ -437,9 +437,9 @@ export async function startPluginsServer( const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) - personOverridesWorker = new DeferredPersonOverrideWorker(postgres, kafkaProducer).run() - personOverridesWorker.promise.catch(async () => { - status.error('⚠️', 'Person override writer crashed! Requesting shutdown...') + personOverridesPeriodicTask = new DeferredPersonOverrideWorker(postgres, kafkaProducer).runTask() + personOverridesPeriodicTask.promise.catch(async () => { + status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...') await closeJobs() process.exit(1) }) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 89464a7a43f4e7..8616e612649195 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -808,7 +808,7 @@ export class DeferredPersonOverrideWorker { }) } - public run(): PeriodicTask { + public runTask(): PeriodicTask { return new PeriodicTask(async () => { status.debug('👥', 'Processing pending overrides...') const overridesCount = await this.processPendingOverrides()