Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugin-server): Add capability to use deferred overrides writer and worker #19112

Merged
merged 20 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
347c425
Add the ability to enable deferred writes (off by default.)
tkaemming Dec 6, 2023
14541c5
Add capability and plugin-server mode.
tkaemming Dec 6, 2023
94ef79c
Run the overrides worker.
tkaemming Dec 6, 2023
a4174e6
Just let the server crash on override writer error.
tkaemming Dec 6, 2023
3bc2290
Split DeferredPersonOverride{Worker,Writer}.
tkaemming Dec 6, 2023
e2a7487
The advisory lock key can be static rather than a constructor argument.
tkaemming Dec 6, 2023
7c1ad87
Kafka producer can be provided as a constructor parameter to worker.
tkaemming Dec 6, 2023
dbfb158
Move worker main run logic into worker class.
tkaemming Dec 6, 2023
9a7a729
Clean up names a little bit.
tkaemming Dec 6, 2023
e24f427
Hub is also a configuration object, so just use that.
tkaemming Dec 6, 2023
6c4b2d9
Fix stupid mistake in periodic task wait time calculation.
tkaemming Dec 6, 2023
f239cd9
More sensible defaults for `PeriodicTask`.
tkaemming Dec 6, 2023
9784c7b
Update query snapshots
github-actions[bot] Dec 6, 2023
8d24b2a
Update query snapshots
github-actions[bot] Dec 6, 2023
c812221
Only instantiate override writer once (and prepare for dep injection.)
tkaemming Dec 6, 2023
432bddc
Add `instrumenr` call to periodic task.
tkaemming Dec 7, 2023
fa1fc26
Add count metrics for overrides written and overrides processed.
tkaemming Dec 7, 2023
b7de72b
Move instrumentation down into periodic task.
tkaemming Dec 8, 2023
4d1114c
Make internal logging more consistent (and emojis more consistent wit…
tkaemming Dec 8, 2023
fb43fbd
Add more documentation about the advisory lock ID.
tkaemming Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
personOverrides: config.POE_DEFERRED_WRITES_ENABLED,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is off by default, keeping it from inadvertently being enabled for hobby deploys.

transpileFrontendApps: true,
preflightSchedules: true,
...sharedCapabilities,
Expand Down Expand Up @@ -75,5 +76,10 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
transpileFrontendApps: true, // TODO: move this away from pod startup, into a graphile job
...sharedCapabilities,
}
case PluginServerMode.person_overrides:
return {
personOverrides: true,
...sharedCapabilities,
}
}
}
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export function getDefaultConfig(): PluginsServerConfig {
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
DROP_EVENTS_BY_TOKEN: '',
POE_DEFERRED_WRITES_ENABLED: false,
POE_EMBRACE_JOIN_FOR_TEAMS: '',
RELOAD_PLUGIN_JITTER_MAX_MS: 60000,

Expand Down
17 changes: 17 additions & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PeriodicTask } from '../utils/periodic-task'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { delay } from '../utils/utils'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { DeferredPersonOverrideWorker } from '../worker/ingestion/person-state'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
import { GraphileWorker } from './graphile-worker/graphile-worker'
Expand Down Expand Up @@ -108,6 +110,8 @@ export async function startPluginsServer(
let jobsConsumer: Consumer | undefined
let schedulerTasksConsumer: Consumer | undefined

let personOverridesPeriodicTask: PeriodicTask | undefined

let httpServer: Server | undefined // healthcheck server

let graphileWorker: GraphileWorker | undefined
Expand Down Expand Up @@ -146,6 +150,7 @@ export async function startPluginsServer(
jobsConsumer?.disconnect(),
stopSessionRecordingBlobConsumer?.(),
schedulerTasksConsumer?.disconnect(),
personOverridesPeriodicTask?.stop(),
])

if (piscina) {
Expand Down Expand Up @@ -428,6 +433,18 @@ export async function startPluginsServer(
}
}

if (capabilities.personOverrides) {
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))

personOverridesPeriodicTask = new DeferredPersonOverrideWorker(postgres, kafkaProducer).runTask(5000)
personOverridesPeriodicTask.promise.catch(async () => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposing the promise here seems kind of leaky but not so much that I want to forward catch through as PeriodicTask.catch right now.

status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...')
await closeJobs()
process.exit(1)
})
}

if (capabilities.http) {
httpServer = createHttpServer(serverConfig.HTTP_SERVER_PORT, healthChecks, analyticsEventsIngestionConsumer)
}
Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export enum PluginServerMode {
scheduler = 'scheduler',
analytics_ingestion = 'analytics-ingestion',
recordings_blob_ingestion = 'recordings-blob-ingestion',
person_overrides = 'person-overrides',
}

export const stringToPluginServerMode = Object.fromEntries(
Expand Down Expand Up @@ -198,6 +199,7 @@ export interface PluginsServerConfig {
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string
DROP_EVENTS_BY_TOKEN: string
POE_EMBRACE_JOIN_FOR_TEAMS: string
POE_DEFERRED_WRITES_ENABLED: boolean
RELOAD_PLUGIN_JITTER_MAX_MS: number
SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP: boolean

Expand Down Expand Up @@ -286,6 +288,7 @@ export interface PluginServerCapabilities {
processAsyncOnEventHandlers?: boolean
processAsyncWebhooksHandlers?: boolean
sessionRecordingBlobIngestion?: boolean
personOverrides?: boolean
transpileFrontendApps?: boolean // TODO: move this away from pod startup, into a graphile job
preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud
http?: boolean
Expand Down
57 changes: 57 additions & 0 deletions plugin-server/src/utils/periodic-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { instrument } from './metrics'
import { status } from './status'
import { sleep } from './utils'

export class PeriodicTask {
public readonly promise: Promise<void>
private running = true
private abortController: AbortController

constructor(public name: string, task: () => Promise<void>, intervalMs: number, minimumWaitMs = 0) {
this.abortController = new AbortController()

const abortRequested = new Promise((resolve) => {
this.abortController.signal.addEventListener('abort', resolve, { once: true })
})

this.promise = new Promise(async (resolve, reject) => {
try {
status.debug('🔄', `${this}: Starting...`)
while (!this.abortController.signal.aborted) {
const startTimeMs = Date.now()
await instrument({ metricName: this.name }, task)
const durationMs = Date.now() - startTimeMs
const waitTimeMs = Math.max(intervalMs - durationMs, minimumWaitMs)
status.debug(
'🔄',
`${this}: Task completed in ${durationMs / 1000}s, next evaluation in ${waitTimeMs / 1000}s`
tkaemming marked this conversation as resolved.
Show resolved Hide resolved
)
await Promise.race([sleep(waitTimeMs), abortRequested])
}
status.info('🔴', `${this}: Stopped by request.`)
resolve()
} catch (error) {
status.warn('⚠️', `${this}: Unexpected error!`, { error })
reject(error)
} finally {
this.running = false
}
})
}

public toString(): string {
return `Periodic Task (${this.name})`
}

public isRunning(): boolean {
return this.running
}

public async stop(): Promise<void> {
status.info(`⏳`, `${this}: Stop requested...`)
this.abortController.abort()
try {
await this.promise
} catch {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Person } from 'types'

import { normalizeEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { PersonOverrideWriter, PersonState } from '../person-state'
import { DeferredPersonOverrideWriter, PersonOverrideWriter, PersonState } from '../person-state'
import { parseEventTimestamp } from '../timestamps'
import { EventPipelineRunner } from './runner'

Expand All @@ -22,13 +22,22 @@ export async function processPersonsStep(
throw error
}

let overridesWriter: PersonOverrideWriter | DeferredPersonOverrideWriter | undefined = undefined
if (runner.poEEmbraceJoin) {
if (runner.hub.POE_DEFERRED_WRITES_ENABLED) {
overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres)
} else {
overridesWriter = new PersonOverrideWriter(runner.hub.db.postgres)
}
}

const person = await new PersonState(
event,
event.team_id,
String(event.distinct_id),
timestamp,
runner.hub.db,
runner.poEEmbraceJoin ? new PersonOverrideWriter(runner.hub.db.postgres) : undefined
overridesWriter
).update()

return [event, person]
Expand Down
142 changes: 93 additions & 49 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types'
import { DB } from '../../utils/db/db'
import { PostgresRouter, PostgresUse, TransactionClient } from '../../utils/db/postgres'
import { timeoutGuard } from '../../utils/db/utils'
import { PeriodicTask } from '../../utils/periodic-task'
import { promiseRetry } from '../../utils/retries'
import { status } from '../../utils/status'
import { castTimestampOrNow, UUIDT } from '../../utils/utils'
Expand Down Expand Up @@ -707,12 +708,13 @@ export class PersonOverrideWriter {
}
}

const deferredPersonOverridesWrittenCounter = new Counter({
name: 'deferred_person_overrides_written',
help: 'Number of person overrides that have been written as pending',
})

export class DeferredPersonOverrideWriter {
/**
* @param lockId the lock identifier/key used to ensure that only one
* process is updating the overrides at a time
*/
constructor(private postgres: PostgresRouter, private lockId: number) {}
constructor(private postgres: PostgresRouter) {}

/**
* Enqueue an override for deferred processing.
Expand All @@ -738,9 +740,30 @@ export class DeferredPersonOverrideWriter {
undefined,
'pendingPersonOverride'
)

deferredPersonOverridesWrittenCounter.inc()
return []
}
}

const deferredPersonOverridesProcessedCounter = new Counter({
name: 'deferred_person_overrides_processed',
help: 'Number of pending person overrides that have been successfully processed',
})

export class DeferredPersonOverrideWorker {
// This lock ID is used as an advisory lock identifier/key for a lock that
// ensures only one worker is able to update the overrides table at a time.
// (We do this to make it simpler to ensure that we maintain the consistency
// of transitive updates.) There isn't any special significance to this
// particular value (other than Postgres requires it to be a numeric one),
// it just needs to be consistent across all processes.
public readonly lockId = 567

private writer: PersonOverrideWriter

constructor(private postgres: PostgresRouter, private kafkaProducer: KafkaProducerWrapper) {
this.writer = new PersonOverrideWriter(this.postgres)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next step is going to be making this PersonOverrideWriter able to be substituted with one that doesn't require the mapping table and constraints, and preparing the process for moving data from the existing table to the new one.

}

/**
* Process all (or up to the given limit) pending overrides.
Expand All @@ -751,56 +774,77 @@ export class DeferredPersonOverrideWriter {
*
* @returns the number of overrides processed
*/
public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper, limit?: number): Promise<number> {
const writer = new PersonOverrideWriter(this.postgres)

return await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => {
const {
rows: [{ acquired }],
} = await this.postgres.query(
tx,
SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`,
undefined,
'processPendingOverrides'
)
if (!acquired) {
throw new Error('could not acquire lock')
}
public async processPendingOverrides(limit?: number): Promise<number> {
const overridesCount = await this.postgres.transaction(
PostgresUse.COMMON_WRITE,
'processPendingOverrides',
async (tx) => {
const {
rows: [{ acquired }],
} = await this.postgres.query(
tx,
SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`,
undefined,
'processPendingOverrides'
)
if (!acquired) {
throw new Error('could not acquire lock')
}

// n.b.: Ordering by id ensures we are processing in (roughly) FIFO order
const { rows } = await this.postgres.query(
tx,
`SELECT * FROM posthog_pendingpersonoverride ORDER BY id` +
(limit !== undefined ? ` LIMIT ${limit}` : ''),
undefined,
'processPendingOverrides'
)

const messages: ProducerRecord[] = []
for (const { id, ...mergeOperation } of rows) {
messages.push(...(await writer.addPersonOverride(tx, mergeOperation)))
await this.postgres.query(
// n.b.: Ordering by id ensures we are processing in (roughly) FIFO order
const { rows } = await this.postgres.query(
tx,
SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`,
`SELECT * FROM posthog_pendingpersonoverride ORDER BY id` +
(limit !== undefined ? ` LIMIT ${limit}` : ''),
undefined,
'processPendingOverrides'
)

const messages: ProducerRecord[] = []
for (const { id, ...mergeOperation } of rows) {
messages.push(...(await this.writer.addPersonOverride(tx, mergeOperation)))
await this.postgres.query(
tx,
SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`,
undefined,
'processPendingOverrides'
)
}

// n.b.: We publish the messages here (and wait for acks) to ensure
// that all of our override updates are sent to Kafka prior to
// committing the transaction. If we're unable to publish, we should
// discard updates and try again later when it's available -- not
// doing so would cause the copy of this data in ClickHouse to
// slowly drift out of sync with the copy in Postgres. This write is
// safe to retry if we write to Kafka but then fail to commit to
// Postgres for some reason -- the same row state should be
// generated each call, and the receiving ReplacingMergeTree will
// ensure we keep only the latest version after all writes settle.)
await this.kafkaProducer.queueMessages(messages, true)

return rows.length
}
)

// n.b.: We publish the messages here (and wait for acks) to ensure
// that all of our override updates are sent to Kafka prior to
// committing the transaction. If we're unable to publish, we should
// discard updates and try again later when it's available -- not
// doing so would cause the copy of this data in ClickHouse to
// slowly drift out of sync with the copy in Postgres. This write is
// safe to retry if we write to Kafka but then fail to commit to
// Postgres for some reason -- the same row state should be
// generated each call, and the receiving ReplacingMergeTree will
// ensure we keep only the latest version after all writes settle.)
await kafkaProducer.queueMessages(messages, true)

return rows.length
})
deferredPersonOverridesProcessedCounter.inc(overridesCount)

return overridesCount
}

public runTask(intervalMs: number): PeriodicTask {
return new PeriodicTask(
'processPendingOverrides',
async () => {
status.debug('👥', 'Processing pending overrides...')
const overridesCount = await this.processPendingOverrides()
;(overridesCount > 0 ? status.info : status.debug)(
'👥',
`Processed ${overridesCount} pending overrides.`
)
},
intervalMs
)
}
}

Expand Down
24 changes: 24 additions & 0 deletions plugin-server/tests/utils/periodic-task.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { PeriodicTask } from '../../src/utils/periodic-task'

describe('PeriodicTask', () => {
describe('updates completion status', () => {
it('on success', async () => {
const fn = jest.fn()
const task = new PeriodicTask('test', fn, 1000)
expect(fn).toBeCalled()
expect(task.isRunning()).toEqual(true)
await task.stop()
expect(task.isRunning()).toEqual(false)
})

it('on failure', async () => {
const fn = jest.fn(() => {
throw new Error()
})
const task = new PeriodicTask('test', fn, 1000)
expect(fn).toBeCalled()
await task.stop()
expect(task.isRunning()).toEqual(false)
})
})
})
Loading
Loading