Skip to content

Commit

Permalink
refactor: Remove legacy overrides handling from plugin-server (#23616)
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming authored Jul 11, 2024
1 parent d205725 commit 1ed9426
Show file tree
Hide file tree
Showing 14 changed files with 853 additions and 1,923 deletions.
8 changes: 1 addition & 7 deletions .github/workflows/ci-plugin-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,24 +175,18 @@ jobs:
run: cd plugin-server && pnpm test -- --runInBand --forceExit tests/ --shard=${{matrix.shard}}

functional-tests:
name: Functional tests (POE=${{matrix.POE_EMBRACE_JOIN_FOR_TEAMS}},RDK=${{matrix.KAFKA_CONSUMPTION_USE_RDKAFKA}})
name: Functional tests
needs: changes
if: needs.changes.outputs.plugin-server == 'true'
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
POE_EMBRACE_JOIN_FOR_TEAMS: ['', '*']

env:
REDIS_URL: 'redis://localhost'
CLICKHOUSE_HOST: 'localhost'
CLICKHOUSE_DATABASE: 'posthog_test'
KAFKA_HOSTS: 'kafka:9092'
DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/posthog'
RELOAD_PLUGIN_JITTER_MAX_MS: 0
POE_EMBRACE_JOIN_FOR_TEAMS: ${{matrix.POE_EMBRACE_JOIN_FOR_TEAMS}}

steps:
- name: Code check out
Expand Down
133 changes: 64 additions & 69 deletions plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,7 @@ test.concurrent(`properties still $set even if merge fails`, async () => {
})
})

const testIfPoEEmbraceJoinEnabled =
process.env.POE_EMBRACE_JOIN_FOR_TEAMS === '*' ? test.concurrent : test.concurrent.skip
testIfPoEEmbraceJoinEnabled(`single merge results in all events resolving to the same person id`, async () => {
test.concurrent(`single merge results in all events resolving to the same person id`, async () => {
const teamId = await createTeam(organizationId)
const initialDistinctId = new UUIDT().toString()
const secondDistinctId = new UUIDT().toString()
Expand Down Expand Up @@ -680,7 +678,7 @@ testIfPoEEmbraceJoinEnabled(`single merge results in all events resolving to the
}, 10000)
})

testIfPoEEmbraceJoinEnabled(`chained merge results in all events resolving to the same person id`, async () => {
test.concurrent(`chained merge results in all events resolving to the same person id`, async () => {
const teamId = await createTeam(organizationId)
const initialDistinctId = new UUIDT().toString()
const secondDistinctId = new UUIDT().toString()
Expand Down Expand Up @@ -735,76 +733,73 @@ testIfPoEEmbraceJoinEnabled(`chained merge results in all events resolving to th
}, 20000)
})

testIfPoEEmbraceJoinEnabled(
`complex chained merge adds results in all events resolving to the same person id`,
async () => {
// let's assume we have 4 persons 1234, we'll first merge 1-2 & 3-4, then we'll merge 2-3
// this should still result in all events having the same person_id or override[person_id]
test.concurrent(`complex chained merge adds results in all events resolving to the same person id`, async () => {
// let's assume we have 4 persons 1234, we'll first merge 1-2 & 3-4, then we'll merge 2-3
// this should still result in all events having the same person_id or override[person_id]

const teamId = await createTeam(organizationId)
const initialDistinctId = new UUIDT().toString()
const secondDistinctId = new UUIDT().toString()
const thirdDistinctId = new UUIDT().toString()
const forthDistinctId = new UUIDT().toString()

// First we emit anoymous events and wait for the persons to be created.
await capture({ teamId, distinctId: initialDistinctId, uuid: new UUIDT().toString(), event: 'custom event' })
await capture({ teamId, distinctId: secondDistinctId, uuid: new UUIDT().toString(), event: 'custom event 2' })
await capture({ teamId, distinctId: thirdDistinctId, uuid: new UUIDT().toString(), event: 'custom event 3' })
await capture({ teamId, distinctId: forthDistinctId, uuid: new UUIDT().toString(), event: 'custom event 3' })
await waitForExpect(async () => {
const persons = await fetchPersons(teamId)
expect(persons.length).toBe(4)
}, 10000)
const teamId = await createTeam(organizationId)
const initialDistinctId = new UUIDT().toString()
const secondDistinctId = new UUIDT().toString()
const thirdDistinctId = new UUIDT().toString()
const forthDistinctId = new UUIDT().toString()

// Then we identify 1-2 and 3-4
await capture({
teamId,
distinctId: initialDistinctId,
uuid: new UUIDT().toString(),
event: '$identify',
properties: {
distinct_id: initialDistinctId,
$anon_distinct_id: secondDistinctId,
},
})
await capture({
teamId,
distinctId: thirdDistinctId,
uuid: new UUIDT().toString(),
event: '$identify',
properties: {
distinct_id: thirdDistinctId,
$anon_distinct_id: forthDistinctId,
},
})
// First we emit anoymous events and wait for the persons to be created.
await capture({ teamId, distinctId: initialDistinctId, uuid: new UUIDT().toString(), event: 'custom event' })
await capture({ teamId, distinctId: secondDistinctId, uuid: new UUIDT().toString(), event: 'custom event 2' })
await capture({ teamId, distinctId: thirdDistinctId, uuid: new UUIDT().toString(), event: 'custom event 3' })
await capture({ teamId, distinctId: forthDistinctId, uuid: new UUIDT().toString(), event: 'custom event 3' })
await waitForExpect(async () => {
const persons = await fetchPersons(teamId)
expect(persons.length).toBe(4)
}, 10000)

await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(6)
}, 10000)
// Then we identify 1-2 and 3-4
await capture({
teamId,
distinctId: initialDistinctId,
uuid: new UUIDT().toString(),
event: '$identify',
properties: {
distinct_id: initialDistinctId,
$anon_distinct_id: secondDistinctId,
},
})
await capture({
teamId,
distinctId: thirdDistinctId,
uuid: new UUIDT().toString(),
event: '$identify',
properties: {
distinct_id: thirdDistinctId,
$anon_distinct_id: forthDistinctId,
},
})

// Then we merge 2-3
await capture({
teamId,
distinctId: initialDistinctId,
uuid: new UUIDT().toString(),
event: '$merge_dangerously',
properties: {
distinct_id: secondDistinctId,
alias: thirdDistinctId,
},
})
await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(6)
}, 10000)

await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(7)
expect(events[0].person_id).toBeDefined()
expect(events[0].person_id).not.toBe('00000000-0000-0000-0000-000000000000')
expect(new Set(events.map((event) => event.person_id)).size).toBe(1)
}, 20000)
}
)
// Then we merge 2-3
await capture({
teamId,
distinctId: initialDistinctId,
uuid: new UUIDT().toString(),
event: '$merge_dangerously',
properties: {
distinct_id: secondDistinctId,
alias: thirdDistinctId,
},
})

await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(7)
expect(events[0].person_id).toBeDefined()
expect(events[0].person_id).not.toBe('00000000-0000-0000-0000-000000000000')
expect(new Set(events.map((event) => event.person_id)).size).toBe(1)
}, 20000)
})

// TODO: adjust this test to poEEmbraceJoin
test.skip(`person properties don't see properties from descendents`, async () => {
Expand Down
12 changes: 7 additions & 5 deletions plugin-server/functional_tests/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,14 @@ export const fetchEvents = async (teamId: number, uuid?: string) => {
SELECT *,
if(notEmpty(overrides.person_id), overrides.person_id, e.person_id) as person_id
FROM events e
LEFT OUTER JOIN
(SELECT argMax(override_person_id, version) as person_id,
old_person_id
FROM person_overrides
LEFT OUTER JOIN (
SELECT
distinct_id,
argMax(person_id, version) as person_id
FROM person_distinct_id_overrides
WHERE team_id = ${teamId}
GROUP BY old_person_id) AS overrides ON e.person_id = overrides.old_person_id
GROUP BY distinct_id
) AS overrides USING distinct_id
WHERE team_id = ${teamId} ${uuid ? `AND uuid = '${uuid}'` : ``}
ORDER BY timestamp ASC
`)) as unknown as ClickHouse.ObjectQueryResult<RawClickHouseEvent>
Expand Down
7 changes: 0 additions & 7 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
sessionRecordingBlobOverflowIngestion: config.SESSION_RECORDING_OVERFLOW_ENABLED,
personOverrides: true,
appManagementSingleton: true,
preflightSchedules: true,
cdpProcessedEvents: true,
Expand Down Expand Up @@ -85,12 +84,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
appManagementSingleton: true,
...sharedCapabilities,
}
case PluginServerMode.person_overrides:
return {
personOverrides: true,
...sharedCapabilities,
}

case PluginServerMode.cdp_processed_events:
return {
cdpProcessedEvents: true,
Expand Down
3 changes: 0 additions & 3 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ export function getDefaultConfig(): PluginsServerConfig {
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
DROP_EVENTS_BY_TOKEN: '',
POE_EMBRACE_JOIN_FOR_TEAMS: '',
POE_WRITES_ENABLED_MAX_TEAM_ID: 0,
POE_WRITES_EXCLUDE_TEAMS: '',
PIPELINE_STEP_STALLED_LOG_TIMEOUT: 30,
RELOAD_PLUGIN_JITTER_MAX_MS: 60000,
RUSTY_HOOK_FOR_TEAMS: '',
Expand Down
1 change: 0 additions & 1 deletion plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ export const KAFKA_JOBS_DLQ = `${prefix}jobs_dlq${suffix}`
export const KAFKA_SCHEDULED_TASKS = `${prefix}scheduled_tasks${suffix}`
export const KAFKA_SCHEDULED_TASKS_DLQ = `${prefix}scheduled_tasks_dlq${suffix}`
export const KAFKA_METRICS_TIME_TO_SEE_DATA = `${prefix}clickhouse_metrics_time_to_see_data${suffix}`
export const KAFKA_PERSON_OVERRIDE = `${prefix}clickhouse_person_override${suffix}`

// read session recording snapshot items
export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS = `${prefix}session_recording_snapshot_item_events${suffix}`
Expand Down
21 changes: 0 additions & 21 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PeriodicTask } from '../utils/periodic-task'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { createRedisClient, delay } from '../utils/utils'
Expand All @@ -26,7 +25,6 @@ import { ActionMatcher } from '../worker/ingestion/action-matcher'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { GroupTypeManager } from '../worker/ingestion/group-type-manager'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { DeferredPersonOverrideWorker, FlatPersonOverrideWriter } from '../worker/ingestion/person-state'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
import { RustyHook } from '../worker/rusty-hook'
Expand Down Expand Up @@ -119,8 +117,6 @@ export async function startPluginsServer(
let jobsConsumer: Consumer | undefined
let schedulerTasksConsumer: Consumer | undefined

let personOverridesPeriodicTask: PeriodicTask | undefined

let httpServer: Server | undefined // server

let graphileWorker: GraphileWorker | undefined
Expand Down Expand Up @@ -160,7 +156,6 @@ export async function startPluginsServer(
stopSessionRecordingBlobConsumer?.(),
stopSessionRecordingBlobOverflowConsumer?.(),
schedulerTasksConsumer?.disconnect(),
personOverridesPeriodicTask?.stop(),
...shutdownCallbacks.map((cb) => cb()),
])

Expand Down Expand Up @@ -529,22 +524,6 @@ export async function startPluginsServer(
healthChecks['cdp-overflow'] = () => consumer.isHealthy() ?? false
}

if (capabilities.personOverrides) {
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))

personOverridesPeriodicTask = new DeferredPersonOverrideWorker(
postgres,
kafkaProducer,
new FlatPersonOverrideWriter(postgres)
).runTask(5000)
personOverridesPeriodicTask.promise.catch(async () => {
status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...')
await closeJobs()
process.exit(1)
})
}

if (capabilities.http) {
const app = setupCommonRoutes(healthChecks, analyticsEventsIngestionConsumer)

Expand Down
7 changes: 0 additions & 7 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ export enum PluginServerMode {
analytics_ingestion = 'analytics-ingestion',
recordings_blob_ingestion = 'recordings-blob-ingestion',
recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow',
person_overrides = 'person-overrides',
cdp_processed_events = 'cdp-processed-events',
cdp_function_callbacks = 'cdp-function-callbacks',
cdp_function_overflow = 'cdp-function-overflow',
Expand Down Expand Up @@ -216,9 +215,6 @@ export interface PluginsServerConfig extends CdpConfig {
EXTERNAL_REQUEST_TIMEOUT_MS: number
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string
DROP_EVENTS_BY_TOKEN: string
POE_EMBRACE_JOIN_FOR_TEAMS: string
POE_WRITES_ENABLED_MAX_TEAM_ID: number
POE_WRITES_EXCLUDE_TEAMS: string
RELOAD_PLUGIN_JITTER_MAX_MS: number
RUSTY_HOOK_FOR_TEAMS: string
RUSTY_HOOK_ROLLOUT_PERCENTAGE: number
Expand Down Expand Up @@ -316,8 +312,6 @@ export interface Hub extends PluginsServerConfig {
enqueuePluginJob: (job: EnqueuedPluginJob) => Promise<void>
// ValueMatchers used for various opt-in/out features
pluginConfigsToSkipElementsParsing: ValueMatcher<number>
poeEmbraceJoinForTeams: ValueMatcher<number>
poeWritesExcludeTeams: ValueMatcher<number>
// lookups
eventsToDropByToken: Map<string, string[]>
}
Expand All @@ -337,7 +331,6 @@ export interface PluginServerCapabilities {
cdpProcessedEvents?: boolean
cdpFunctionCallbacks?: boolean
cdpFunctionOverflow?: boolean
personOverrides?: boolean
appManagementSingleton?: boolean
preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud
http?: boolean
Expand Down
2 changes: 0 additions & 2 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,6 @@ export async function createHub(
actionManager,
conversionBufferEnabledTeams,
pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true),
poeEmbraceJoinForTeams: buildIntegerMatcher(process.env.POE_EMBRACE_JOIN_FOR_TEAMS, true),
poeWritesExcludeTeams: buildIntegerMatcher(process.env.POE_WRITES_EXCLUDE_TEAMS, false),
eventsToDropByToken: createEventsToDropByToken(process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'
import { Person } from 'types'

import { DeferredPersonOverrideWriter, PersonState } from '../person-state'
import { PersonState } from '../person-state'
import { EventPipelineRunner } from './runner'

export async function processPersonsStep(
Expand All @@ -11,19 +11,13 @@ export async function processPersonsStep(
timestamp: DateTime,
processPerson: boolean
): Promise<[PluginEvent, Person, Promise<void>]> {
let overridesWriter: DeferredPersonOverrideWriter | undefined = undefined
if (runner.poEEmbraceJoin) {
overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres)
}

const [person, kafkaAck] = await new PersonState(
event,
event.team_id,
String(event.distinct_id),
timestamp,
processPerson,
runner.hub.db,
overridesWriter
runner.hub.db
).update()

return [event, person, kafkaAck]
Expand Down
Loading

0 comments on commit 1ed9426

Please sign in to comment.