diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index bbb8a49823ed7..7a30b46438b36 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin processAsyncOnEventHandlers: true, processAsyncWebhooksHandlers: true, sessionRecordingBlobIngestion: true, + personOverrides: config.POE_DEFERRED_WRITES_ENABLED, transpileFrontendApps: true, preflightSchedules: true, ...sharedCapabilities, @@ -75,5 +76,10 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin transpileFrontendApps: true, // TODO: move this away from pod startup, into a graphile job ...sharedCapabilities, } + case PluginServerMode.person_overrides: + return { + personOverrides: true, + ...sharedCapabilities, + } } } diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 5f7f064122542..5b277bc9766c5 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -128,6 +128,7 @@ export function getDefaultConfig(): PluginsServerConfig { EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '', DROP_EVENTS_BY_TOKEN: '', + POE_DEFERRED_WRITES_ENABLED: false, POE_EMBRACE_JOIN_FOR_TEAMS: '', RELOAD_PLUGIN_JITTER_MAX_MS: 60000, diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index cf51b7713eb8e..f59d196e93d24 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -15,11 +15,13 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' import { PostgresRouter } from '../utils/db/postgres' import { cancelAllScheduledJobs } from '../utils/node-schedule' +import { PeriodicTask } from '../utils/periodic-task' import { PubSub } from '../utils/pubsub' import { status } from '../utils/status' import { delay } from '../utils/utils' import { AppMetrics } from '../worker/ingestion/app-metrics' import { OrganizationManager } from '../worker/ingestion/organization-manager' +import { DeferredPersonOverrideWorker } from '../worker/ingestion/person-state' import { TeamManager } from '../worker/ingestion/team-manager' import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina' import { GraphileWorker } from './graphile-worker/graphile-worker' @@ -108,6 +110,8 @@ export async function startPluginsServer( let jobsConsumer: Consumer | undefined let schedulerTasksConsumer: Consumer | undefined + let personOverridesPeriodicTask: PeriodicTask | undefined + let httpServer: Server | undefined // healthcheck server let graphileWorker: GraphileWorker | undefined @@ -146,6 +150,7 @@ export async function startPluginsServer( jobsConsumer?.disconnect(), stopSessionRecordingBlobConsumer?.(), schedulerTasksConsumer?.disconnect(), + personOverridesPeriodicTask?.stop(), ]) if (piscina) { @@ -428,6 +433,18 @@ export async function startPluginsServer( } } + if (capabilities.personOverrides) { + const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) + const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) + + personOverridesPeriodicTask = new DeferredPersonOverrideWorker(postgres, kafkaProducer).runTask(5000) + personOverridesPeriodicTask.promise.catch(async () => { + status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...') + await closeJobs() + process.exit(1) + }) + } + if (capabilities.http) { httpServer = createHttpServer(serverConfig.HTTP_SERVER_PORT, healthChecks, analyticsEventsIngestionConsumer) } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index e1374f3d8a376..50aeec3cf63a7 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -76,6 +76,7 @@ export enum PluginServerMode { scheduler = 'scheduler', analytics_ingestion = 'analytics-ingestion', recordings_blob_ingestion = 'recordings-blob-ingestion', + person_overrides = 'person-overrides', } export const stringToPluginServerMode = Object.fromEntries( @@ -198,6 +199,7 @@ export interface PluginsServerConfig { DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string DROP_EVENTS_BY_TOKEN: string POE_EMBRACE_JOIN_FOR_TEAMS: string + POE_DEFERRED_WRITES_ENABLED: boolean RELOAD_PLUGIN_JITTER_MAX_MS: number SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP: boolean @@ -286,6 +288,7 @@ export interface PluginServerCapabilities { processAsyncOnEventHandlers?: boolean processAsyncWebhooksHandlers?: boolean sessionRecordingBlobIngestion?: boolean + personOverrides?: boolean transpileFrontendApps?: boolean // TODO: move this away from pod startup, into a graphile job preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud http?: boolean diff --git a/plugin-server/src/utils/periodic-task.ts b/plugin-server/src/utils/periodic-task.ts new file mode 100644 index 0000000000000..bdee496e606ea --- /dev/null +++ b/plugin-server/src/utils/periodic-task.ts @@ -0,0 +1,57 @@ +import { instrument } from './metrics' +import { status } from './status' +import { sleep } from './utils' + +export class PeriodicTask { + public readonly promise: Promise + private running = true + private abortController: AbortController + + constructor(public name: string, task: () => Promise, intervalMs: number, minimumWaitMs = 0) { + this.abortController = new AbortController() + + const abortRequested = new Promise((resolve) => { + this.abortController.signal.addEventListener('abort', resolve, { once: true }) + }) + + this.promise = new Promise(async (resolve, reject) => { + try { + status.debug('🔄', `${this}: Starting...`) + while (!this.abortController.signal.aborted) { + const startTimeMs = Date.now() + await instrument({ metricName: this.name }, task) + const durationMs = Date.now() - startTimeMs + const waitTimeMs = Math.max(intervalMs - durationMs, minimumWaitMs) + status.debug( + '🔄', + `${this}: Task completed in ${durationMs / 1000}s, next evaluation in ${waitTimeMs / 1000}s` + ) + await Promise.race([sleep(waitTimeMs), abortRequested]) + } + status.info('🔴', `${this}: Stopped by request.`) + resolve() + } catch (error) { + status.warn('⚠️', `${this}: Unexpected error!`, { error }) + reject(error) + } finally { + this.running = false + } + }) + } + + public toString(): string { + return `Periodic Task (${this.name})` + } + + public isRunning(): boolean { + return this.running + } + + public async stop(): Promise { + status.info(`⏳`, `${this}: Stop requested...`) + this.abortController.abort() + try { + await this.promise + } catch {} + } +} diff --git a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts index cb5c746733259..e01133f8c615f 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts @@ -4,7 +4,7 @@ import { Person } from 'types' import { normalizeEvent } from '../../../utils/event' import { status } from '../../../utils/status' -import { PersonOverrideWriter, PersonState } from '../person-state' +import { DeferredPersonOverrideWriter, PersonOverrideWriter, PersonState } from '../person-state' import { parseEventTimestamp } from '../timestamps' import { EventPipelineRunner } from './runner' @@ -22,13 +22,22 @@ export async function processPersonsStep( throw error } + let overridesWriter: PersonOverrideWriter | DeferredPersonOverrideWriter | undefined = undefined + if (runner.poEEmbraceJoin) { + if (runner.hub.POE_DEFERRED_WRITES_ENABLED) { + overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres) + } else { + overridesWriter = new PersonOverrideWriter(runner.hub.db.postgres) + } + } + const person = await new PersonState( event, event.team_id, String(event.distinct_id), timestamp, runner.hub.db, - runner.poEEmbraceJoin ? new PersonOverrideWriter(runner.hub.db.postgres) : undefined + overridesWriter ).update() return [event, person] diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 340fbfcb97d6e..df9377a7df425 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -10,6 +10,7 @@ import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types' import { DB } from '../../utils/db/db' import { PostgresRouter, PostgresUse, TransactionClient } from '../../utils/db/postgres' import { timeoutGuard } from '../../utils/db/utils' +import { PeriodicTask } from '../../utils/periodic-task' import { promiseRetry } from '../../utils/retries' import { status } from '../../utils/status' import { castTimestampOrNow, UUIDT } from '../../utils/utils' @@ -707,12 +708,13 @@ export class PersonOverrideWriter { } } +const deferredPersonOverridesWrittenCounter = new Counter({ + name: 'deferred_person_overrides_written', + help: 'Number of person overrides that have been written as pending', +}) + export class DeferredPersonOverrideWriter { - /** - * @param lockId the lock identifier/key used to ensure that only one - * process is updating the overrides at a time - */ - constructor(private postgres: PostgresRouter, private lockId: number) {} + constructor(private postgres: PostgresRouter) {} /** * Enqueue an override for deferred processing. @@ -738,9 +740,30 @@ export class DeferredPersonOverrideWriter { undefined, 'pendingPersonOverride' ) - + deferredPersonOverridesWrittenCounter.inc() return [] } +} + +const deferredPersonOverridesProcessedCounter = new Counter({ + name: 'deferred_person_overrides_processed', + help: 'Number of pending person overrides that have been successfully processed', +}) + +export class DeferredPersonOverrideWorker { + // This lock ID is used as an advisory lock identifier/key for a lock that + // ensures only one worker is able to update the overrides table at a time. + // (We do this to make it simpler to ensure that we maintain the consistency + // of transitive updates.) There isn't any special significance to this + // particular value (other than Postgres requires it to be a numeric one), + // it just needs to be consistent across all processes. + public readonly lockId = 567 + + private writer: PersonOverrideWriter + + constructor(private postgres: PostgresRouter, private kafkaProducer: KafkaProducerWrapper) { + this.writer = new PersonOverrideWriter(this.postgres) + } /** * Process all (or up to the given limit) pending overrides. @@ -751,56 +774,77 @@ export class DeferredPersonOverrideWriter { * * @returns the number of overrides processed */ - public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper, limit?: number): Promise { - const writer = new PersonOverrideWriter(this.postgres) - - return await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => { - const { - rows: [{ acquired }], - } = await this.postgres.query( - tx, - SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`, - undefined, - 'processPendingOverrides' - ) - if (!acquired) { - throw new Error('could not acquire lock') - } + public async processPendingOverrides(limit?: number): Promise { + const overridesCount = await this.postgres.transaction( + PostgresUse.COMMON_WRITE, + 'processPendingOverrides', + async (tx) => { + const { + rows: [{ acquired }], + } = await this.postgres.query( + tx, + SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`, + undefined, + 'processPendingOverrides' + ) + if (!acquired) { + throw new Error('could not acquire lock') + } - // n.b.: Ordering by id ensures we are processing in (roughly) FIFO order - const { rows } = await this.postgres.query( - tx, - `SELECT * FROM posthog_pendingpersonoverride ORDER BY id` + - (limit !== undefined ? ` LIMIT ${limit}` : ''), - undefined, - 'processPendingOverrides' - ) - - const messages: ProducerRecord[] = [] - for (const { id, ...mergeOperation } of rows) { - messages.push(...(await writer.addPersonOverride(tx, mergeOperation))) - await this.postgres.query( + // n.b.: Ordering by id ensures we are processing in (roughly) FIFO order + const { rows } = await this.postgres.query( tx, - SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`, + `SELECT * FROM posthog_pendingpersonoverride ORDER BY id` + + (limit !== undefined ? ` LIMIT ${limit}` : ''), undefined, 'processPendingOverrides' ) + + const messages: ProducerRecord[] = [] + for (const { id, ...mergeOperation } of rows) { + messages.push(...(await this.writer.addPersonOverride(tx, mergeOperation))) + await this.postgres.query( + tx, + SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`, + undefined, + 'processPendingOverrides' + ) + } + + // n.b.: We publish the messages here (and wait for acks) to ensure + // that all of our override updates are sent to Kafka prior to + // committing the transaction. If we're unable to publish, we should + // discard updates and try again later when it's available -- not + // doing so would cause the copy of this data in ClickHouse to + // slowly drift out of sync with the copy in Postgres. This write is + // safe to retry if we write to Kafka but then fail to commit to + // Postgres for some reason -- the same row state should be + // generated each call, and the receiving ReplacingMergeTree will + // ensure we keep only the latest version after all writes settle.) + await this.kafkaProducer.queueMessages(messages, true) + + return rows.length } + ) - // n.b.: We publish the messages here (and wait for acks) to ensure - // that all of our override updates are sent to Kafka prior to - // committing the transaction. If we're unable to publish, we should - // discard updates and try again later when it's available -- not - // doing so would cause the copy of this data in ClickHouse to - // slowly drift out of sync with the copy in Postgres. This write is - // safe to retry if we write to Kafka but then fail to commit to - // Postgres for some reason -- the same row state should be - // generated each call, and the receiving ReplacingMergeTree will - // ensure we keep only the latest version after all writes settle.) - await kafkaProducer.queueMessages(messages, true) - - return rows.length - }) + deferredPersonOverridesProcessedCounter.inc(overridesCount) + + return overridesCount + } + + public runTask(intervalMs: number): PeriodicTask { + return new PeriodicTask( + 'processPendingOverrides', + async () => { + status.debug('👥', 'Processing pending overrides...') + const overridesCount = await this.processPendingOverrides() + ;(overridesCount > 0 ? status.info : status.debug)( + '👥', + `Processed ${overridesCount} pending overrides.` + ) + }, + intervalMs + ) } } diff --git a/plugin-server/tests/utils/periodic-task.test.ts b/plugin-server/tests/utils/periodic-task.test.ts new file mode 100644 index 0000000000000..c3a8588d1d09e --- /dev/null +++ b/plugin-server/tests/utils/periodic-task.test.ts @@ -0,0 +1,24 @@ +import { PeriodicTask } from '../../src/utils/periodic-task' + +describe('PeriodicTask', () => { + describe('updates completion status', () => { + it('on success', async () => { + const fn = jest.fn() + const task = new PeriodicTask('test', fn, 1000) + expect(fn).toBeCalled() + expect(task.isRunning()).toEqual(true) + await task.stop() + expect(task.isRunning()).toEqual(false) + }) + + it('on failure', async () => { + const fn = jest.fn(() => { + throw new Error() + }) + const task = new PeriodicTask('test', fn, 1000) + expect(fn).toBeCalled() + await task.stop() + expect(task.isRunning()).toEqual(false) + }) + }) +}) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 76652cb12d549..492beab70ca0d 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -9,6 +9,7 @@ import { PostgresUse } from '../../../src/utils/db/postgres' import { defaultRetryConfig } from '../../../src/utils/retries' import { UUIDT } from '../../../src/utils/utils' import { + DeferredPersonOverrideWorker, DeferredPersonOverrideWriter, PersonOverrideWriter, PersonState, @@ -70,11 +71,9 @@ const PersonOverridesModes: Record = { fetchPostgresPersonIdOverrides: (hub, teamId) => fetchPostgresPersonIdOverrides(hub, teamId), }, deferred: { - // XXX: This is kind of a mess -- ideally it'd be preferable to just - // instantiate the writer once and share it - getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres, 456), + getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), fetchPostgresPersonIdOverrides: async (hub, teamId) => { - await new DeferredPersonOverrideWriter(hub.db.postgres, 456).processPendingOverrides(hub.db.kafkaProducer) + await new DeferredPersonOverrideWorker(hub.db.postgres, hub.db.kafkaProducer).processPendingOverrides() return await fetchPostgresPersonIdOverrides(hub, teamId) }, }, @@ -2096,7 +2095,7 @@ describe('PersonState.update()', () => { }) }) -describe('DeferredPersonOverrideWriter', () => { +describe('deferred person overrides', () => { let hub: Hub let closeHub: () => Promise @@ -2104,13 +2103,14 @@ describe('DeferredPersonOverrideWriter', () => { let organizationId: string let teamId: number - const lockId = 456 let writer: DeferredPersonOverrideWriter + let worker: DeferredPersonOverrideWorker beforeAll(async () => { ;[hub, closeHub] = await createHub({}) organizationId = await createOrganization(hub.db.postgres) - writer = new DeferredPersonOverrideWriter(hub.db.postgres, lockId) + writer = new DeferredPersonOverrideWriter(hub.db.postgres) + worker = new DeferredPersonOverrideWorker(hub.db.postgres, hub.db.kafkaProducer) }) beforeEach(async () => { @@ -2144,7 +2144,7 @@ describe('DeferredPersonOverrideWriter', () => { } it('moves overrides from the pending table to the overrides table', async () => { - const { postgres, kafkaProducer } = hub.db + const { postgres } = hub.db const override = { old_person_id: new UUIDT().toString(), @@ -2157,7 +2157,7 @@ describe('DeferredPersonOverrideWriter', () => { expect(await getPendingPersonOverrides()).toEqual([override]) - expect(await writer.processPendingOverrides(kafkaProducer)).toEqual(1) + expect(await worker.processPendingOverrides()).toEqual(1) expect(await getPendingPersonOverrides()).toMatchObject([]) @@ -2181,7 +2181,7 @@ describe('DeferredPersonOverrideWriter', () => { }) it('rolls back on kafka producer error', async () => { - const { postgres, kafkaProducer } = hub.db + const { postgres } = hub.db const override = { old_person_id: new UUIDT().toString(), @@ -2194,17 +2194,17 @@ describe('DeferredPersonOverrideWriter', () => { expect(await getPendingPersonOverrides()).toEqual([override]) - jest.spyOn(kafkaProducer, 'queueMessages').mockImplementation(() => { + jest.spyOn(hub.db.kafkaProducer, 'queueMessages').mockImplementation(() => { throw new Error('something bad happened') }) - await expect(writer.processPendingOverrides(kafkaProducer)).rejects.toThrow() + await expect(worker.processPendingOverrides()).rejects.toThrow() expect(await getPendingPersonOverrides()).toEqual([override]) }) it('ensures advisory lock is held before processing', async () => { - const { postgres, kafkaProducer } = hub.db + const { postgres } = hub.db let acquiredLock: boolean const tryLockComplete = new WaitEvent() @@ -2214,7 +2214,7 @@ describe('DeferredPersonOverrideWriter', () => { .transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { const { rows } = await postgres.query( tx, - `SELECT pg_try_advisory_lock(${lockId}) as acquired, pg_backend_pid()`, + `SELECT pg_try_advisory_lock(${worker.lockId}) as acquired, pg_backend_pid()`, undefined, '' ) @@ -2229,18 +2229,18 @@ describe('DeferredPersonOverrideWriter', () => { try { await tryLockComplete.wait() expect(acquiredLock!).toBe(true) - await expect(writer.processPendingOverrides(kafkaProducer)).rejects.toThrow(Error('could not acquire lock')) + await expect(worker.processPendingOverrides()).rejects.toThrow(Error('could not acquire lock')) } finally { readyToReleaseLock.set() await transactionHolder } expect(acquiredLock!).toBe(false) - await expect(writer.processPendingOverrides(kafkaProducer)).resolves.toEqual(0) + await expect(worker.processPendingOverrides()).resolves.toEqual(0) }) it('respects limit if provided', async () => { - const { postgres, kafkaProducer } = hub.db + const { postgres } = hub.db const overrides = [...Array(3)].map(() => ({ old_person_id: new UUIDT().toString(), @@ -2262,10 +2262,10 @@ describe('DeferredPersonOverrideWriter', () => { expect(await getPendingPersonOverrides()).toEqual(overrides) - expect(await writer.processPendingOverrides(kafkaProducer, 2)).toEqual(2) + expect(await worker.processPendingOverrides(2)).toEqual(2) expect(await getPendingPersonOverrides()).toMatchObject(overrides.slice(-1)) - expect(await writer.processPendingOverrides(kafkaProducer, 2)).toEqual(1) + expect(await worker.processPendingOverrides(2)).toEqual(1) expect(await getPendingPersonOverrides()).toEqual([]) }) })