Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugin-server): Add capability to use deferred overrides writer and worker #19112

Merged
merged 20 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
347c425
Add the ability to enable deferred writes (off by default.)
tkaemming Dec 6, 2023
14541c5
Add capability and plugin-server mode.
tkaemming Dec 6, 2023
94ef79c
Run the overrides worker.
tkaemming Dec 6, 2023
a4174e6
Just let the server crash on override writer error.
tkaemming Dec 6, 2023
3bc2290
Split DeferredPersonOverride{Worker,Writer}.
tkaemming Dec 6, 2023
e2a7487
The advisory lock key can be static rather than a constructor argument.
tkaemming Dec 6, 2023
7c1ad87
Kafka producer can be provided as a constructor parameter to worker.
tkaemming Dec 6, 2023
dbfb158
Move worker main run logic into worker class.
tkaemming Dec 6, 2023
9a7a729
Clean up names a little bit.
tkaemming Dec 6, 2023
e24f427
Hub is also a configuration object, so just use that.
tkaemming Dec 6, 2023
6c4b2d9
Fix stupid mistake in periodic task wait time calculation.
tkaemming Dec 6, 2023
f239cd9
More sensible defaults for `PeriodicTask`.
tkaemming Dec 6, 2023
9784c7b
Update query snapshots
github-actions[bot] Dec 6, 2023
8d24b2a
Update query snapshots
github-actions[bot] Dec 6, 2023
c812221
Only instantiate override writer once (and prepare for dep injection.)
tkaemming Dec 6, 2023
432bddc
Add `instrumenr` call to periodic task.
tkaemming Dec 7, 2023
fa1fc26
Add count metrics for overrides written and overrides processed.
tkaemming Dec 7, 2023
b7de72b
Move instrumentation down into periodic task.
tkaemming Dec 8, 2023
4d1114c
Make internal logging more consistent (and emojis more consistent wit…
tkaemming Dec 8, 2023
fb43fbd
Add more documentation about the advisory lock ID.
tkaemming Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
personOverrides: config.POE_DEFERRED_WRITES_ENABLED,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is off by default, keeping it from inadvertently being enabled for hobby deploys.

transpileFrontendApps: true,
preflightSchedules: true,
...sharedCapabilities,
Expand Down Expand Up @@ -75,5 +76,10 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
transpileFrontendApps: true, // TODO: move this away from pod startup, into a graphile job
...sharedCapabilities,
}
case PluginServerMode.person_overrides:
return {
personOverrides: true,
...sharedCapabilities,
}
}
}
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export function getDefaultConfig(): PluginsServerConfig {
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
DROP_EVENTS_BY_TOKEN: '',
POE_DEFERRED_WRITES_ENABLED: false,
POE_EMBRACE_JOIN_FOR_TEAMS: '',
RELOAD_PLUGIN_JITTER_MAX_MS: 60000,

Expand Down
22 changes: 22 additions & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PeriodicTask } from '../utils/periodic-task'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { delay } from '../utils/utils'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { DeferredPersonOverrideWriter } from '../worker/ingestion/person-state'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
import { GraphileWorker } from './graphile-worker/graphile-worker'
Expand Down Expand Up @@ -108,6 +110,8 @@ export async function startPluginsServer(
let jobsConsumer: Consumer | undefined
let schedulerTasksConsumer: Consumer | undefined

let personOverridesWorker: PeriodicTask | undefined

let httpServer: Server | undefined // healthcheck server

let graphileWorker: GraphileWorker | undefined
Expand Down Expand Up @@ -146,6 +150,7 @@ export async function startPluginsServer(
jobsConsumer?.disconnect(),
stopSessionRecordingBlobConsumer?.(),
schedulerTasksConsumer?.disconnect(),
personOverridesWorker?.stop(),
])

if (piscina) {
Expand Down Expand Up @@ -428,6 +433,23 @@ export async function startPluginsServer(
}
}

if (capabilities.personOverrides) {
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))
const overridesWriter = new DeferredPersonOverrideWriter(postgres)
tkaemming marked this conversation as resolved.
Show resolved Hide resolved

personOverridesWorker = new PeriodicTask(async () => {
tkaemming marked this conversation as resolved.
Show resolved Hide resolved
status.debug('👥', 'Processing pending overrides...')
const overridesCount = await overridesWriter.processPendingOverrides(kafkaProducer)
;(overridesCount > 0 ? status.info : status.debug)(
'👥',
`Processed ${overridesCount} pending overrides.`
)
})
tkaemming marked this conversation as resolved.
Show resolved Hide resolved

healthChecks['person-overrides'] = () => personOverridesWorker!.isRunning()
}

if (capabilities.http) {
httpServer = createHttpServer(serverConfig.HTTP_SERVER_PORT, healthChecks, analyticsEventsIngestionConsumer)
}
Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export enum PluginServerMode {
scheduler = 'scheduler',
analytics_ingestion = 'analytics-ingestion',
recordings_blob_ingestion = 'recordings-blob-ingestion',
person_overrides = 'person-overrides',
}

export const stringToPluginServerMode = Object.fromEntries(
Expand Down Expand Up @@ -198,6 +199,7 @@ export interface PluginsServerConfig {
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string
DROP_EVENTS_BY_TOKEN: string
POE_EMBRACE_JOIN_FOR_TEAMS: string
POE_DEFERRED_WRITES_ENABLED: boolean
RELOAD_PLUGIN_JITTER_MAX_MS: number
SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP: boolean

Expand Down Expand Up @@ -286,6 +288,7 @@ export interface PluginServerCapabilities {
processAsyncOnEventHandlers?: boolean
processAsyncWebhooksHandlers?: boolean
sessionRecordingBlobIngestion?: boolean
personOverrides?: boolean
transpileFrontendApps?: boolean // TODO: move this away from pod startup, into a graphile job
preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud
http?: boolean
Expand Down
47 changes: 47 additions & 0 deletions plugin-server/src/utils/periodic-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { status } from './status'
import { sleep } from './utils'

export class PeriodicTask {
private promise: Promise<void>
private running = true
private abortController: AbortController

constructor(task: () => Promise<void> | void, intervalMs = 1000, minimumWaitMs = 1000) {
tkaemming marked this conversation as resolved.
Show resolved Hide resolved
this.abortController = new AbortController()

const abortRequested = new Promise((resolve) => {
this.abortController.signal.addEventListener('abort', resolve, { once: true })
})

this.promise = new Promise<void>(async (resolve, reject) => {
try {
status.debug('🔄', 'Periodic task starting...', { task })
while (!this.abortController.signal.aborted) {
const startTimeMs = +Date.now()
await task()
const waitTimeMs = Math.max(intervalMs - startTimeMs, minimumWaitMs)
tkaemming marked this conversation as resolved.
Show resolved Hide resolved
status.debug('🔄', `Next evaluation in ${waitTimeMs / 1000}s`, { task })
await Promise.race([sleep(waitTimeMs), abortRequested])
}
status.info('✅', 'Periodic task stopped by request.', { task })
resolve()
} catch (error) {
status.warn('⚠️', 'Error in periodic task!', { task, error })
reject(error)
} finally {
this.running = false
}
})
}

public isRunning(): boolean {
return this.running
}

public async stop(): Promise<void> {
this.abortController.abort()
try {
await this.promise
} catch {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'
import { Person } from 'types'

import { defaultConfig } from '../../../config/config'
import { normalizeEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { PersonOverrideWriter, PersonState } from '../person-state'
import { DeferredPersonOverrideWriter, PersonOverrideWriter, PersonState } from '../person-state'
import { parseEventTimestamp } from '../timestamps'
import { EventPipelineRunner } from './runner'

Expand All @@ -22,13 +23,22 @@ export async function processPersonsStep(
throw error
}

let overridesWriter: PersonOverrideWriter | DeferredPersonOverrideWriter | undefined = undefined
if (runner.poEEmbraceJoin) {
if (defaultConfig.POE_DEFERRED_WRITES_ENABLED) {
tkaemming marked this conversation as resolved.
Show resolved Hide resolved
overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres)
} else {
overridesWriter = new PersonOverrideWriter(runner.hub.db.postgres)
}
}

const person = await new PersonState(
event,
event.team_id,
String(event.distinct_id),
timestamp,
runner.hub.db,
runner.poEEmbraceJoin ? new PersonOverrideWriter(runner.hub.db.postgres) : undefined
overridesWriter
).update()

return [event, person]
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ export class DeferredPersonOverrideWriter {
* @param lockId the lock identifier/key used to ensure that only one
* process is updating the overrides at a time
*/
constructor(private postgres: PostgresRouter, private lockId: number) {}
constructor(private postgres: PostgresRouter, private lockId: number = 567) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that intentional? how is this lockId used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is intentional; this is for the advisory lock acquired here that ensures only one process at a time is processing pending overrides to avoid race conditions in transitive overrides processing from overlapping transactions (also described on line 712 above):

const {
rows: [{ acquired }],
} = await this.postgres.query(
tx,
SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`,
undefined,
'processPendingOverrides'
)
if (!acquired) {
throw new Error('could not acquire lock')
}

It's annoying that these are numbers and some varchar/text type that could be more descriptive, but that's what the Postgres API provides: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS

The value could also be provided via a setting, but I couldn't think of a reason to make that configurable so I avoided that for now. I guess this could be a class property rather than an instance property though, I had initially made it an instance property so that the functional test runner could be running while other unit/integration tests were run — but thinking about it more, those are still sharing the same tables on the test database so they probably should conflict with each other.

It's possible that this could get replaced with a table lock on the overrides (not pending overrides) table but I want to defer that at least until the immediate overrides path is fully removed so that we don't lock the overrides table by accident while it's still being used in the ingest consumer transaction during the changeover.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this static with e2a7487.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's minor, but it may be worth putting a "yep, this is how the PG API works, we're just picking a consistent int yadda yadda" line alongside your existing comment. I have to admit I read it and was still head-tilt at the magic number.

Copy link
Contributor Author

@tkaemming tkaemming Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this look? fb43fbd

// This lock ID is used as an advisory lock identifier/key for a lock that
// ensures only one worker is able to update the overrides table at a time.
// (We do this to make it simpler to ensure that we maintain the consistency
// of transitive updates.) There isn't any special significance to this
// particular value (other than Postgres requires it to be a numeric one),
// it just needs to be consistent across all processes.
public readonly lockId = 567


/**
* Enqueue an override for deferred processing.
Expand Down
24 changes: 24 additions & 0 deletions plugin-server/tests/utils/periodic-task.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { PeriodicTask } from '../../src/utils/periodic-task'

describe('PeriodicTask', () => {
describe('updates completion status', () => {
it('on success', async () => {
const fn = jest.fn()
const task = new PeriodicTask(fn)
expect(fn).toBeCalled()
expect(task.isRunning()).toEqual(true)
await task.stop()
expect(task.isRunning()).toEqual(false)
})

it('on failure', async () => {
const fn = jest.fn(() => {
throw new Error()
})
const task = new PeriodicTask(fn)
expect(fn).toBeCalled()
await task.stop()
expect(task.isRunning()).toEqual(false)
})
})
})
Loading