Skip to content

Commit

Permalink
Run the overrides worker.
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming committed Dec 6, 2023
1 parent 14541c5 commit d98386f
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 0 deletions.
22 changes: 22 additions & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PeriodicTask } from '../utils/periodic-task'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { delay } from '../utils/utils'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { DeferredPersonOverrideWriter } from '../worker/ingestion/person-state'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
import { GraphileWorker } from './graphile-worker/graphile-worker'
Expand Down Expand Up @@ -108,6 +110,8 @@ export async function startPluginsServer(
let jobsConsumer: Consumer | undefined
let schedulerTasksConsumer: Consumer | undefined

let personOverridesWorker: PeriodicTask

let httpServer: Server | undefined // healthcheck server

let graphileWorker: GraphileWorker | undefined
Expand Down Expand Up @@ -146,6 +150,7 @@ export async function startPluginsServer(
jobsConsumer?.disconnect(),
stopSessionRecordingBlobConsumer?.(),
schedulerTasksConsumer?.disconnect(),
personOverridesWorker?.stop(),
])

if (piscina) {
Expand Down Expand Up @@ -428,6 +433,23 @@ export async function startPluginsServer(
}
}

if (capabilities.personOverrides) {
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))
const overridesWriter = new DeferredPersonOverrideWriter(postgres)

personOverridesWorker = new PeriodicTask(async () => {
status.debug('👥', 'Processing pending overrides...')
const overridesCount = await overridesWriter.processPendingOverrides(kafkaProducer)
;(overridesCount > 0 ? status.info : status.debug)(
'👥',
`Processed ${overridesCount} pending overrides.`
)
})

healthChecks['person-overrides'] = () => personOverridesWorker.isRunning()
}

if (capabilities.http) {
httpServer = createHttpServer(serverConfig.HTTP_SERVER_PORT, healthChecks, analyticsEventsIngestionConsumer)
}
Expand Down
47 changes: 47 additions & 0 deletions plugin-server/src/utils/periodic-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { status } from './status'
import { sleep } from './utils'

export class PeriodicTask {
private promise: Promise<void>
private running = true
private abortController: AbortController

constructor(task: () => Promise<void> | void, intervalMs = 1000, minimumWaitMs = 1000) {
this.abortController = new AbortController()

const abortRequested = new Promise((resolve) => {
this.abortController.signal.addEventListener('abort', resolve, { once: true })
})

this.promise = new Promise<void>(async (resolve, reject) => {
try {
status.debug('🔄', 'Periodic task starting...', { task })
while (!this.abortController.signal.aborted) {
const startTimeMs = +Date.now()
await task()
const waitTimeMs = Math.max(intervalMs - startTimeMs, minimumWaitMs)
status.debug('🔄', `Next evaluation in ${waitTimeMs / 1000}s`, { task })
await Promise.race([sleep(waitTimeMs), abortRequested])
}
status.info('✅', 'Periodic task stopped by request.', { task })
resolve()
} catch (error) {
status.warn('⚠️', 'Error in periodic task!', { task, error })
reject(error)
} finally {
this.running = false
}
})
}

public isRunning(): boolean {
return this.running
}

public async stop(): Promise<void> {
this.abortController.abort()
try {
await this.promise
} catch {}
}
}
24 changes: 24 additions & 0 deletions plugin-server/tests/utils/periodic-task.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { PeriodicTask } from '../../src/utils/periodic-task'

describe('PeriodicTask', () => {
describe('updates completion status', () => {
it('on success', async () => {
const fn = jest.fn()
const task = new PeriodicTask(fn)
expect(fn).toBeCalled()
expect(task.isRunning()).toEqual(true)
await task.stop()
expect(task.isRunning()).toEqual(false)
})

it('on failure', async () => {
const fn = jest.fn(() => {
throw new Error()
})
const task = new PeriodicTask(fn)
expect(fn).toBeCalled()
await task.stop()
expect(task.isRunning()).toEqual(false)
})
})
})

0 comments on commit d98386f

Please sign in to comment.