-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(plugin-server): Add capability to use deferred overrides writer and worker #19112
Changes from 17 commits
347c425
14541c5
94ef79c
a4174e6
3bc2290
e2a7487
7c1ad87
dbfb158
9a7a729
e24f427
6c4b2d9
f239cd9
9784c7b
8d24b2a
c812221
432bddc
fa1fc26
b7de72b
4d1114c
fb43fbd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,11 +15,13 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' | |
import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' | ||
import { PostgresRouter } from '../utils/db/postgres' | ||
import { cancelAllScheduledJobs } from '../utils/node-schedule' | ||
import { PeriodicTask } from '../utils/periodic-task' | ||
import { PubSub } from '../utils/pubsub' | ||
import { status } from '../utils/status' | ||
import { delay } from '../utils/utils' | ||
import { AppMetrics } from '../worker/ingestion/app-metrics' | ||
import { OrganizationManager } from '../worker/ingestion/organization-manager' | ||
import { DeferredPersonOverrideWorker } from '../worker/ingestion/person-state' | ||
import { TeamManager } from '../worker/ingestion/team-manager' | ||
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina' | ||
import { GraphileWorker } from './graphile-worker/graphile-worker' | ||
|
@@ -108,6 +110,8 @@ export async function startPluginsServer( | |
let jobsConsumer: Consumer | undefined | ||
let schedulerTasksConsumer: Consumer | undefined | ||
|
||
let personOverridesPeriodicTask: PeriodicTask | undefined | ||
|
||
let httpServer: Server | undefined // healthcheck server | ||
|
||
let graphileWorker: GraphileWorker | undefined | ||
|
@@ -146,6 +150,7 @@ export async function startPluginsServer( | |
jobsConsumer?.disconnect(), | ||
stopSessionRecordingBlobConsumer?.(), | ||
schedulerTasksConsumer?.disconnect(), | ||
personOverridesPeriodicTask?.stop(), | ||
]) | ||
|
||
if (piscina) { | ||
|
@@ -428,6 +433,18 @@ export async function startPluginsServer( | |
} | ||
} | ||
|
||
if (capabilities.personOverrides) { | ||
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) | ||
const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) | ||
|
||
personOverridesPeriodicTask = new DeferredPersonOverrideWorker(postgres, kafkaProducer).runTask(5000) | ||
personOverridesPeriodicTask.promise.catch(async () => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exposing the |
||
status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...') | ||
await closeJobs() | ||
process.exit(1) | ||
}) | ||
} | ||
|
||
if (capabilities.http) { | ||
httpServer = createHttpServer(serverConfig.HTTP_SERVER_PORT, healthChecks, analyticsEventsIngestionConsumer) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import { status } from './status' | ||
import { sleep } from './utils' | ||
|
||
export class PeriodicTask { | ||
public readonly promise: Promise<void> | ||
private running = true | ||
private abortController: AbortController | ||
|
||
constructor(task: () => Promise<void> | void, intervalMs: number, minimumWaitMs = 0) { | ||
this.abortController = new AbortController() | ||
|
||
const abortRequested = new Promise((resolve) => { | ||
this.abortController.signal.addEventListener('abort', resolve, { once: true }) | ||
}) | ||
|
||
this.promise = new Promise<void>(async (resolve, reject) => { | ||
try { | ||
status.debug('🔄', 'Periodic task starting...', { task }) | ||
while (!this.abortController.signal.aborted) { | ||
const startTimeMs = Date.now() | ||
await task() | ||
const durationMs = Date.now() - startTimeMs | ||
const waitTimeMs = Math.max(intervalMs - durationMs, minimumWaitMs) | ||
status.debug( | ||
'🔄', | ||
`Task completed in ${durationMs / 1000}s, next evaluation in ${waitTimeMs / 1000}s`, | ||
{ task } | ||
) | ||
await Promise.race([sleep(waitTimeMs), abortRequested]) | ||
} | ||
status.info('✅', 'Periodic task stopped by request.', { task }) | ||
resolve() | ||
} catch (error) { | ||
status.warn('⚠️', 'Error in periodic task!', { task, error }) | ||
reject(error) | ||
} finally { | ||
this.running = false | ||
} | ||
}) | ||
} | ||
|
||
public isRunning(): boolean { | ||
return this.running | ||
} | ||
|
||
public async stop(): Promise<void> { | ||
this.abortController.abort() | ||
try { | ||
await this.promise | ||
} catch {} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,8 @@ import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types' | |
import { DB } from '../../utils/db/db' | ||
import { PostgresRouter, PostgresUse, TransactionClient } from '../../utils/db/postgres' | ||
import { timeoutGuard } from '../../utils/db/utils' | ||
import { instrument } from '../../utils/metrics' | ||
import { PeriodicTask } from '../../utils/periodic-task' | ||
import { promiseRetry } from '../../utils/retries' | ||
import { status } from '../../utils/status' | ||
import { castTimestampOrNow, UUIDT } from '../../utils/utils' | ||
|
@@ -707,12 +709,13 @@ export class PersonOverrideWriter { | |
} | ||
} | ||
|
||
const deferredPersonOverridesWrittenCounter = new Counter({ | ||
name: 'deferred_person_overrides_written', | ||
help: 'Number of person overrides that have been written as pending', | ||
}) | ||
|
||
export class DeferredPersonOverrideWriter { | ||
/** | ||
* @param lockId the lock identifier/key used to ensure that only one | ||
* process is updating the overrides at a time | ||
*/ | ||
constructor(private postgres: PostgresRouter, private lockId: number) {} | ||
constructor(private postgres: PostgresRouter) {} | ||
|
||
/** | ||
* Enqueue an override for deferred processing. | ||
|
@@ -738,9 +741,26 @@ export class DeferredPersonOverrideWriter { | |
undefined, | ||
'pendingPersonOverride' | ||
) | ||
|
||
deferredPersonOverridesWrittenCounter.inc() | ||
return [] | ||
} | ||
} | ||
|
||
const deferredPersonOverridesProcessedCounter = new Counter({ | ||
name: 'deferred_person_overrides_processed', | ||
help: 'Number of pending person overrides that have been successfully processed', | ||
}) | ||
|
||
export class DeferredPersonOverrideWorker { | ||
// The advisory lock identifier/key used to ensure that only one process is | ||
// updating the overrides at a time. | ||
public readonly lockId = 567 | ||
|
||
private writer: PersonOverrideWriter | ||
|
||
constructor(private postgres: PostgresRouter, private kafkaProducer: KafkaProducerWrapper) { | ||
this.writer = new PersonOverrideWriter(this.postgres) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Next step is going to be making this |
||
} | ||
|
||
/** | ||
* Process all (or up to the given limit) pending overrides. | ||
|
@@ -751,56 +771,72 @@ export class DeferredPersonOverrideWriter { | |
* | ||
* @returns the number of overrides processed | ||
*/ | ||
public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper, limit?: number): Promise<number> { | ||
const writer = new PersonOverrideWriter(this.postgres) | ||
|
||
return await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => { | ||
const { | ||
rows: [{ acquired }], | ||
} = await this.postgres.query( | ||
tx, | ||
SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`, | ||
undefined, | ||
'processPendingOverrides' | ||
) | ||
if (!acquired) { | ||
throw new Error('could not acquire lock') | ||
} | ||
|
||
// n.b.: Ordering by id ensures we are processing in (roughly) FIFO order | ||
const { rows } = await this.postgres.query( | ||
tx, | ||
`SELECT * FROM posthog_pendingpersonoverride ORDER BY id` + | ||
(limit !== undefined ? ` LIMIT ${limit}` : ''), | ||
undefined, | ||
'processPendingOverrides' | ||
) | ||
public async processPendingOverrides(limit?: number): Promise<number> { | ||
const overridesCount = await this.postgres.transaction( | ||
PostgresUse.COMMON_WRITE, | ||
'processPendingOverrides', | ||
async (tx) => { | ||
const { | ||
rows: [{ acquired }], | ||
} = await this.postgres.query( | ||
tx, | ||
SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`, | ||
undefined, | ||
'processPendingOverrides' | ||
) | ||
if (!acquired) { | ||
throw new Error('could not acquire lock') | ||
} | ||
|
||
const messages: ProducerRecord[] = [] | ||
for (const { id, ...mergeOperation } of rows) { | ||
messages.push(...(await writer.addPersonOverride(tx, mergeOperation))) | ||
await this.postgres.query( | ||
// n.b.: Ordering by id ensures we are processing in (roughly) FIFO order | ||
const { rows } = await this.postgres.query( | ||
tx, | ||
SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`, | ||
`SELECT * FROM posthog_pendingpersonoverride ORDER BY id` + | ||
(limit !== undefined ? ` LIMIT ${limit}` : ''), | ||
undefined, | ||
'processPendingOverrides' | ||
) | ||
|
||
const messages: ProducerRecord[] = [] | ||
for (const { id, ...mergeOperation } of rows) { | ||
messages.push(...(await this.writer.addPersonOverride(tx, mergeOperation))) | ||
await this.postgres.query( | ||
tx, | ||
SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`, | ||
undefined, | ||
'processPendingOverrides' | ||
) | ||
} | ||
|
||
// n.b.: We publish the messages here (and wait for acks) to ensure | ||
// that all of our override updates are sent to Kafka prior to | ||
// committing the transaction. If we're unable to publish, we should | ||
// discard updates and try again later when it's available -- not | ||
// doing so would cause the copy of this data in ClickHouse to | ||
// slowly drift out of sync with the copy in Postgres. This write is | ||
// safe to retry if we write to Kafka but then fail to commit to | ||
// Postgres for some reason -- the same row state should be | ||
// generated each call, and the receiving ReplacingMergeTree will | ||
// ensure we keep only the latest version after all writes settle.) | ||
await this.kafkaProducer.queueMessages(messages, true) | ||
|
||
return rows.length | ||
} | ||
) | ||
|
||
// n.b.: We publish the messages here (and wait for acks) to ensure | ||
// that all of our override updates are sent to Kafka prior to | ||
// committing the transaction. If we're unable to publish, we should | ||
// discard updates and try again later when it's available -- not | ||
// doing so would cause the copy of this data in ClickHouse to | ||
// slowly drift out of sync with the copy in Postgres. This write is | ||
// safe to retry if we write to Kafka but then fail to commit to | ||
// Postgres for some reason -- the same row state should be | ||
// generated each call, and the receiving ReplacingMergeTree will | ||
// ensure we keep only the latest version after all writes settle.) | ||
await kafkaProducer.queueMessages(messages, true) | ||
|
||
return rows.length | ||
}) | ||
deferredPersonOverridesProcessedCounter.inc(overridesCount) | ||
|
||
return overridesCount | ||
} | ||
|
||
public runTask(intervalMs: number): PeriodicTask { | ||
return new PeriodicTask(async () => { | ||
status.debug('👥', 'Processing pending overrides...') | ||
const overridesCount = await instrument({ metricName: 'processPendingOverrides' }, () => | ||
this.processPendingOverrides() | ||
) | ||
tkaemming marked this conversation as resolved.
Show resolved
Hide resolved
|
||
;(overridesCount > 0 ? status.info : status.debug)('👥', `Processed ${overridesCount} pending overrides.`) | ||
}, intervalMs) | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
import { PeriodicTask } from '../../src/utils/periodic-task' | ||
|
||
describe('PeriodicTask', () => { | ||
describe('updates completion status', () => { | ||
it('on success', async () => { | ||
const fn = jest.fn() | ||
const task = new PeriodicTask(fn, 1000) | ||
expect(fn).toBeCalled() | ||
expect(task.isRunning()).toEqual(true) | ||
await task.stop() | ||
expect(task.isRunning()).toEqual(false) | ||
}) | ||
|
||
it('on failure', async () => { | ||
const fn = jest.fn(() => { | ||
throw new Error() | ||
}) | ||
const task = new PeriodicTask(fn, 1000) | ||
expect(fn).toBeCalled() | ||
await task.stop() | ||
expect(task.isRunning()).toEqual(false) | ||
}) | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is off by default, keeping it from inadvertently being enabled for hobby deploys.