Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugin-server): Add capability to use deferred overrides writer and worker #19112

Merged
merged 20 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
347c425
Add the ability to enable deferred writes (off by default.)
tkaemming Dec 6, 2023
14541c5
Add capability and plugin-server mode.
tkaemming Dec 6, 2023
94ef79c
Run the overrides worker.
tkaemming Dec 6, 2023
a4174e6
Just let the server crash on override writer error.
tkaemming Dec 6, 2023
3bc2290
Split DeferredPersonOverride{Worker,Writer}.
tkaemming Dec 6, 2023
e2a7487
The advisory lock key can be static rather than a constructor argument.
tkaemming Dec 6, 2023
7c1ad87
Kafka producer can be provided as a constructor parameter to worker.
tkaemming Dec 6, 2023
dbfb158
Move worker main run logic into worker class.
tkaemming Dec 6, 2023
9a7a729
Clean up names a little bit.
tkaemming Dec 6, 2023
e24f427
Hub is also a configuration object, so just use that.
tkaemming Dec 6, 2023
6c4b2d9
Fix stupid mistake in periodic task wait time calculation.
tkaemming Dec 6, 2023
f239cd9
More sensible defaults for `PeriodicTask`.
tkaemming Dec 6, 2023
9784c7b
Update query snapshots
github-actions[bot] Dec 6, 2023
8d24b2a
Update query snapshots
github-actions[bot] Dec 6, 2023
c812221
Only instantiate override writer once (and prepare for dep injection.)
tkaemming Dec 6, 2023
432bddc
Add `instrumenr` call to periodic task.
tkaemming Dec 7, 2023
fa1fc26
Add count metrics for overrides written and overrides processed.
tkaemming Dec 7, 2023
b7de72b
Move instrumentation down into periodic task.
tkaemming Dec 8, 2023
4d1114c
Make internal logging more consistent (and emojis more consistent wit…
tkaemming Dec 8, 2023
fb43fbd
Add more documentation about the advisory lock ID.
tkaemming Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
personOverrides: config.POE_DEFERRED_WRITES_ENABLED,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is off by default, keeping it from inadvertently being enabled for hobby deploys.

transpileFrontendApps: true,
preflightSchedules: true,
...sharedCapabilities,
Expand Down Expand Up @@ -75,5 +76,10 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
transpileFrontendApps: true, // TODO: move this away from pod startup, into a graphile job
...sharedCapabilities,
}
case PluginServerMode.person_overrides:
return {
personOverrides: true,
...sharedCapabilities,
}
}
}
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export function getDefaultConfig(): PluginsServerConfig {
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
DROP_EVENTS_BY_TOKEN: '',
POE_DEFERRED_WRITES_ENABLED: false,
POE_EMBRACE_JOIN_FOR_TEAMS: '',
RELOAD_PLUGIN_JITTER_MAX_MS: 60000,

Expand Down
17 changes: 17 additions & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PeriodicTask } from '../utils/periodic-task'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { delay } from '../utils/utils'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { DeferredPersonOverrideWorker } from '../worker/ingestion/person-state'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
import { GraphileWorker } from './graphile-worker/graphile-worker'
Expand Down Expand Up @@ -108,6 +110,8 @@ export async function startPluginsServer(
let jobsConsumer: Consumer | undefined
let schedulerTasksConsumer: Consumer | undefined

let personOverridesPeriodicTask: PeriodicTask | undefined

let httpServer: Server | undefined // healthcheck server

let graphileWorker: GraphileWorker | undefined
Expand Down Expand Up @@ -146,6 +150,7 @@ export async function startPluginsServer(
jobsConsumer?.disconnect(),
stopSessionRecordingBlobConsumer?.(),
schedulerTasksConsumer?.disconnect(),
personOverridesPeriodicTask?.stop(),
])

if (piscina) {
Expand Down Expand Up @@ -428,6 +433,18 @@ export async function startPluginsServer(
}
}

if (capabilities.personOverrides) {
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))

personOverridesPeriodicTask = new DeferredPersonOverrideWorker(postgres, kafkaProducer).runTask(5000)
personOverridesPeriodicTask.promise.catch(async () => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposing the promise here seems kind of leaky but not so much that I want to forward catch through as PeriodicTask.catch right now.

status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...')
await closeJobs()
process.exit(1)
})
}

if (capabilities.http) {
httpServer = createHttpServer(serverConfig.HTTP_SERVER_PORT, healthChecks, analyticsEventsIngestionConsumer)
}
Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export enum PluginServerMode {
scheduler = 'scheduler',
analytics_ingestion = 'analytics-ingestion',
recordings_blob_ingestion = 'recordings-blob-ingestion',
person_overrides = 'person-overrides',
}

export const stringToPluginServerMode = Object.fromEntries(
Expand Down Expand Up @@ -198,6 +199,7 @@ export interface PluginsServerConfig {
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string
DROP_EVENTS_BY_TOKEN: string
POE_EMBRACE_JOIN_FOR_TEAMS: string
POE_DEFERRED_WRITES_ENABLED: boolean
RELOAD_PLUGIN_JITTER_MAX_MS: number
SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP: boolean

Expand Down Expand Up @@ -286,6 +288,7 @@ export interface PluginServerCapabilities {
processAsyncOnEventHandlers?: boolean
processAsyncWebhooksHandlers?: boolean
sessionRecordingBlobIngestion?: boolean
personOverrides?: boolean
transpileFrontendApps?: boolean // TODO: move this away from pod startup, into a graphile job
preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud
http?: boolean
Expand Down
52 changes: 52 additions & 0 deletions plugin-server/src/utils/periodic-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { status } from './status'
import { sleep } from './utils'

export class PeriodicTask {
public readonly promise: Promise<void>
private running = true
private abortController: AbortController

constructor(task: () => Promise<void> | void, intervalMs: number, minimumWaitMs = 0) {
this.abortController = new AbortController()

const abortRequested = new Promise((resolve) => {
this.abortController.signal.addEventListener('abort', resolve, { once: true })
})

this.promise = new Promise<void>(async (resolve, reject) => {
try {
status.debug('🔄', 'Periodic task starting...', { task })
while (!this.abortController.signal.aborted) {
const startTimeMs = Date.now()
await task()
const durationMs = Date.now() - startTimeMs
const waitTimeMs = Math.max(intervalMs - durationMs, minimumWaitMs)
status.debug(
'🔄',
`Task completed in ${durationMs / 1000}s, next evaluation in ${waitTimeMs / 1000}s`,
{ task }
)
await Promise.race([sleep(waitTimeMs), abortRequested])
}
status.info('✅', 'Periodic task stopped by request.', { task })
resolve()
} catch (error) {
status.warn('⚠️', 'Error in periodic task!', { task, error })
reject(error)
} finally {
this.running = false
}
})
}

public isRunning(): boolean {
return this.running
}

public async stop(): Promise<void> {
this.abortController.abort()
try {
await this.promise
} catch {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Person } from 'types'

import { normalizeEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { PersonOverrideWriter, PersonState } from '../person-state'
import { DeferredPersonOverrideWriter, PersonOverrideWriter, PersonState } from '../person-state'
import { parseEventTimestamp } from '../timestamps'
import { EventPipelineRunner } from './runner'

Expand All @@ -22,13 +22,22 @@ export async function processPersonsStep(
throw error
}

let overridesWriter: PersonOverrideWriter | DeferredPersonOverrideWriter | undefined = undefined
if (runner.poEEmbraceJoin) {
if (runner.hub.POE_DEFERRED_WRITES_ENABLED) {
overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres)
} else {
overridesWriter = new PersonOverrideWriter(runner.hub.db.postgres)
}
}

const person = await new PersonState(
event,
event.team_id,
String(event.distinct_id),
timestamp,
runner.hub.db,
runner.poEEmbraceJoin ? new PersonOverrideWriter(runner.hub.db.postgres) : undefined
overridesWriter
).update()

return [event, person]
Expand Down
35 changes: 25 additions & 10 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types'
import { DB } from '../../utils/db/db'
import { PostgresRouter, PostgresUse, TransactionClient } from '../../utils/db/postgres'
import { timeoutGuard } from '../../utils/db/utils'
import { PeriodicTask } from '../../utils/periodic-task'
import { promiseRetry } from '../../utils/retries'
import { status } from '../../utils/status'
import { castTimestampOrNow, UUIDT } from '../../utils/utils'
Expand Down Expand Up @@ -708,11 +709,7 @@ export class PersonOverrideWriter {
}

export class DeferredPersonOverrideWriter {
/**
* @param lockId the lock identifier/key used to ensure that only one
* process is updating the overrides at a time
*/
constructor(private postgres: PostgresRouter, private lockId: number) {}
constructor(private postgres: PostgresRouter) {}

/**
* Enqueue an override for deferred processing.
Expand Down Expand Up @@ -741,6 +738,18 @@ export class DeferredPersonOverrideWriter {

return []
}
}

export class DeferredPersonOverrideWorker {
// The advisory lock identifier/key used to ensure that only one process is
// updating the overrides at a time.
public readonly lockId = 567

private writer: PersonOverrideWriter

constructor(private postgres: PostgresRouter, private kafkaProducer: KafkaProducerWrapper) {
this.writer = new PersonOverrideWriter(this.postgres)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next step is going to be making this PersonOverrideWriter able to be substituted with one that doesn't require the mapping table and constraints, and preparing the process for moving data from the existing table to the new one.

}

/**
* Process all (or up to the given limit) pending overrides.
Expand All @@ -751,9 +760,7 @@ export class DeferredPersonOverrideWriter {
*
* @returns the number of overrides processed
*/
public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper, limit?: number): Promise<number> {
const writer = new PersonOverrideWriter(this.postgres)

public async processPendingOverrides(limit?: number): Promise<number> {
return await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => {
const {
rows: [{ acquired }],
Expand All @@ -778,7 +785,7 @@ export class DeferredPersonOverrideWriter {

const messages: ProducerRecord[] = []
for (const { id, ...mergeOperation } of rows) {
messages.push(...(await writer.addPersonOverride(tx, mergeOperation)))
messages.push(...(await this.writer.addPersonOverride(tx, mergeOperation)))
Copy link
Contributor Author

@tkaemming tkaemming Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something important I hadn't considered before: Team on the override model is a foreign key (although it's not one on the mapping table, just a bigint). I hadn't defined it as a foreign key on the pending override table, so this is either going to need to gracefully handle foreign key constraint violations, or those schemas are going to need to be brought into alignment. We'll also need to decide what to use on the new table. My initial assumption is that its probably fine to use a non-foreign key on the new table, worst case is we end up storing some extra overrides for longer than needed and doing some unnecessary squashing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll have to split persons into a separate dataset eventually (other PG or non-PG data-store), so it's fine to start decoupling ourselves from the non-person tables 👍

You're right in your analysis that the cost of non-cascading is one unnecessary squash at the end, that's probably not that bad. But FYI, we don't cascade on team deletion anymore because there's too many tables, see delete_bulky_postgres_data. You can add the new table there and get proper deletions on team deletion.

await this.postgres.query(
tx,
SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`,
Expand All @@ -797,11 +804,19 @@ export class DeferredPersonOverrideWriter {
// Postgres for some reason -- the same row state should be
// generated each call, and the receiving ReplacingMergeTree will
// ensure we keep only the latest version after all writes settle.)
await kafkaProducer.queueMessages(messages, true)
await this.kafkaProducer.queueMessages(messages, true)

return rows.length
})
}

public runTask(intervalMs: number): PeriodicTask {
return new PeriodicTask(async () => {
status.debug('👥', 'Processing pending overrides...')
const overridesCount = await this.processPendingOverrides()
;(overridesCount > 0 ? status.info : status.debug)('👥', `Processed ${overridesCount} pending overrides.`)
}, intervalMs)
}
}

function SQL(sqlParts: TemplateStringsArray, ...args: any[]): { text: string; values: any[] } {
Expand Down
24 changes: 24 additions & 0 deletions plugin-server/tests/utils/periodic-task.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { PeriodicTask } from '../../src/utils/periodic-task'

describe('PeriodicTask', () => {
describe('updates completion status', () => {
it('on success', async () => {
const fn = jest.fn()
const task = new PeriodicTask(fn, 1000)
expect(fn).toBeCalled()
expect(task.isRunning()).toEqual(true)
await task.stop()
expect(task.isRunning()).toEqual(false)
})

it('on failure', async () => {
const fn = jest.fn(() => {
throw new Error()
})
const task = new PeriodicTask(fn, 1000)
expect(fn).toBeCalled()
await task.stop()
expect(task.isRunning()).toEqual(false)
})
})
})
Loading
Loading