Skip to content

Commit

Permalink
Add the ability to enable deferred writes (off by default.)
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming committed Dec 6, 2023
1 parent e5e803a commit 347c425
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 3 deletions.
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export function getDefaultConfig(): PluginsServerConfig {
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
DROP_EVENTS_BY_TOKEN: '',
POE_DEFERRED_WRITES_ENABLED: false,
POE_EMBRACE_JOIN_FOR_TEAMS: '',
RELOAD_PLUGIN_JITTER_MAX_MS: 60000,

Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ export interface PluginsServerConfig {
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string
DROP_EVENTS_BY_TOKEN: string
POE_EMBRACE_JOIN_FOR_TEAMS: string
POE_DEFERRED_WRITES_ENABLED: boolean
RELOAD_PLUGIN_JITTER_MAX_MS: number
SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP: boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'
import { Person } from 'types'

import { defaultConfig } from '../../../config/config'
import { normalizeEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { PersonOverrideWriter, PersonState } from '../person-state'
import { DeferredPersonOverrideWriter, PersonOverrideWriter, PersonState } from '../person-state'
import { parseEventTimestamp } from '../timestamps'
import { EventPipelineRunner } from './runner'

Expand All @@ -22,13 +23,22 @@ export async function processPersonsStep(
throw error
}

let overridesWriter: PersonOverrideWriter | DeferredPersonOverrideWriter | undefined = undefined
if (runner.poEEmbraceJoin) {
if (defaultConfig.POE_DEFERRED_WRITES_ENABLED) {
overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres)
} else {
overridesWriter = new PersonOverrideWriter(runner.hub.db.postgres)
}
}

const person = await new PersonState(
event,
event.team_id,
String(event.distinct_id),
timestamp,
runner.hub.db,
runner.poEEmbraceJoin ? new PersonOverrideWriter(runner.hub.db.postgres) : undefined
overridesWriter
).update()

return [event, person]
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ export class DeferredPersonOverrideWriter {
* @param lockId the lock identifier/key used to ensure that only one
* process is updating the overrides at a time
*/
constructor(private postgres: PostgresRouter, private lockId: number) {}
constructor(private postgres: PostgresRouter, private lockId: number = 567) {}

/**
* Enqueue an override for deferred processing.
Expand Down

0 comments on commit 347c425

Please sign in to comment.