diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index d5cbe7f05cf5a..c9b444067eeda 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -19,7 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin processAsyncOnEventHandlers: true, processAsyncWebhooksHandlers: true, sessionRecordingBlobIngestion: true, - personOverrides: config.POE_DEFERRED_WRITES_ENABLED, + personOverrides: true, appManagementSingleton: true, preflightSchedules: true, ...sharedCapabilities, diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index b78ca7713a675..fa1b290c0793a 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -128,8 +128,6 @@ export function getDefaultConfig(): PluginsServerConfig { EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '', DROP_EVENTS_BY_TOKEN: '', - POE_DEFERRED_WRITES_ENABLED: false, - POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES: false, POE_EMBRACE_JOIN_FOR_TEAMS: '', POE_WRITES_ENABLED_MAX_TEAM_ID: 0, POE_WRITES_EXCLUDE_TEAMS: '', diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index edf8a2f787833..f44567183e144 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -21,11 +21,7 @@ import { status } from '../utils/status' import { delay } from '../utils/utils' import { AppMetrics } from '../worker/ingestion/app-metrics' import { OrganizationManager } from '../worker/ingestion/organization-manager' -import { - DeferredPersonOverrideWorker, - FlatPersonOverrideWriter, - PersonOverrideWriter, -} from '../worker/ingestion/person-state' +import { DeferredPersonOverrideWorker, FlatPersonOverrideWriter } from '../worker/ingestion/person-state' import { TeamManager } from '../worker/ingestion/team-manager' import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina' import { GraphileWorker } from './graphile-worker/graphile-worker' @@ -450,9 +446,7 @@ export async function startPluginsServer( personOverridesPeriodicTask = new DeferredPersonOverrideWorker( postgres, kafkaProducer, - serverConfig.POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES - ? new FlatPersonOverrideWriter(postgres) - : new PersonOverrideWriter(postgres) + new FlatPersonOverrideWriter(postgres) ).runTask(5000) personOverridesPeriodicTask.promise.catch(async () => { status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...') diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 2601a3068d2ab..208e8cb48b43f 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -199,8 +199,6 @@ export interface PluginsServerConfig { DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string DROP_EVENTS_BY_TOKEN: string POE_EMBRACE_JOIN_FOR_TEAMS: string - POE_DEFERRED_WRITES_ENABLED: boolean - POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES: boolean POE_WRITES_ENABLED_MAX_TEAM_ID: number POE_WRITES_EXCLUDE_TEAMS: string RELOAD_PLUGIN_JITTER_MAX_MS: number diff --git a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts index e01133f8c615f..91bf6f10f27bd 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts @@ -4,7 +4,7 @@ import { Person } from 'types' import { normalizeEvent } from '../../../utils/event' import { status } from '../../../utils/status' -import { DeferredPersonOverrideWriter, PersonOverrideWriter, PersonState } from '../person-state' +import { DeferredPersonOverrideWriter, PersonState } from '../person-state' import { parseEventTimestamp } from '../timestamps' import { EventPipelineRunner } from './runner' @@ -22,13 +22,9 @@ export async function processPersonsStep( throw error } - let overridesWriter: PersonOverrideWriter | DeferredPersonOverrideWriter | undefined = undefined + let overridesWriter: DeferredPersonOverrideWriter | undefined = undefined if (runner.poEEmbraceJoin) { - if (runner.hub.POE_DEFERRED_WRITES_ENABLED) { - overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres) - } else { - overridesWriter = new PersonOverrideWriter(runner.hub.db.postgres) - } + overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres) } const person = await new PersonState( diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index f49ee1ad334e9..958cc6f09f598 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -99,7 +99,7 @@ export class PersonState { distinctId: string, timestamp: DateTime, db: DB, - private personOverrideWriter?: PersonOverrideWriter | DeferredPersonOverrideWriter, + private personOverrideWriter?: DeferredPersonOverrideWriter, uuid: UUIDT | undefined = undefined, maxMergeAttempts: number = MAX_FAILED_PERSON_MERGE_ATTEMPTS ) { @@ -496,23 +496,14 @@ export class PersonState { const deletePersonMessages = await this.db.deletePerson(otherPerson, tx) - let personOverrideMessages: ProducerRecord[] = [] if (this.personOverrideWriter) { - personOverrideMessages = await this.personOverrideWriter.addPersonOverride( + await this.personOverrideWriter.addPersonOverride( tx, getPersonOverrideDetails(this.teamId, otherPerson, mergeInto) ) } - return [ - [ - ...personOverrideMessages, - ...updatePersonMessages, - ...distinctIdMessages, - ...deletePersonMessages, - ], - person, - ] + return [[...updatePersonMessages, ...distinctIdMessages, ...deletePersonMessages], person] } ) @@ -554,185 +545,6 @@ function getPersonOverrideDetails(teamId: number, oldPerson: Person, overridePer } } -export class PersonOverrideWriter { - constructor(private postgres: PostgresRouter) {} - - public async addPersonOverride( - tx: TransactionClient, - overrideDetails: PersonOverrideDetails - ): Promise { - const mergedAt = DateTime.now() - /** - We'll need to do 4 updates: - - 1. Add the persons involved to the helper table (2 of them) - 2. Add an override from oldPerson to override person - 3. Update any entries that have oldPerson as the override person to now also point to the new override person. Note that we don't update `oldest_event`, because it's a heuristic (used to optimise squashing) tied to the old_person and nothing changed about the old_person who's events need to get squashed. - */ - const oldPersonMappingId = await this.addPersonOverrideMapping( - tx, - overrideDetails.team_id, - overrideDetails.old_person_id - ) - const overridePersonMappingId = await this.addPersonOverrideMapping( - tx, - overrideDetails.team_id, - overrideDetails.override_person_id - ) - - await this.postgres.query( - tx, - SQL` - INSERT INTO posthog_personoverride ( - team_id, - old_person_id, - override_person_id, - oldest_event, - version - ) VALUES ( - ${overrideDetails.team_id}, - ${oldPersonMappingId}, - ${overridePersonMappingId}, - ${overrideDetails.oldest_event}, - 0 - ) - `, - undefined, - 'personOverride' - ) - - // The follow-up JOIN is required as ClickHouse requires UUIDs, so we need to fetch the UUIDs - // of the IDs we updated from the mapping table. - const { rows: transitiveUpdates } = await this.postgres.query( - tx, - SQL` - WITH updated_ids AS ( - UPDATE - posthog_personoverride - SET - override_person_id = ${overridePersonMappingId}, version = COALESCE(version, 0)::numeric + 1 - WHERE - team_id = ${overrideDetails.team_id} AND override_person_id = ${oldPersonMappingId} - RETURNING - old_person_id, - version, - oldest_event - ) - SELECT - helper.uuid as old_person_id, - updated_ids.version, - updated_ids.oldest_event - FROM - updated_ids - JOIN - posthog_personoverridemapping helper - ON - helper.id = updated_ids.old_person_id; - `, - undefined, - 'transitivePersonOverrides' - ) - - status.debug('🔁', 'person_overrides_updated', { transitiveUpdates }) - - const personOverrideMessages: ProducerRecord[] = [ - { - topic: KAFKA_PERSON_OVERRIDE, - messages: [ - { - value: JSON.stringify({ - team_id: overrideDetails.team_id, - old_person_id: overrideDetails.old_person_id, - override_person_id: overrideDetails.override_person_id, - oldest_event: castTimestampOrNow(overrideDetails.oldest_event, TimestampFormat.ClickHouse), - merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), - version: 0, - }), - }, - ...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({ - value: JSON.stringify({ - team_id: overrideDetails.team_id, - old_person_id: old_person_id, - override_person_id: overrideDetails.override_person_id, - oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse), - merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), - version: version, - }), - })), - ], - }, - ] - - return personOverrideMessages - } - - private async addPersonOverrideMapping(tx: TransactionClient, teamId: number, personId: string): Promise { - /** - Update the helper table that serves as a mapping between a serial ID and a Person UUID. - - This mapping is used to enable an exclusion constraint in the personoverrides table, which - requires int[], while avoiding any constraints on "hotter" tables, like person. - **/ - - // ON CONFLICT nothing is returned, so we get the id in the second SELECT statement below. - // Fear not, the constraints on personoverride will handle any inconsistencies. - // This mapping table is really nothing more than a mapping to support exclusion constraints - // as we map int ids to UUIDs (the latter not supported in exclusion contraints). - const { - rows: [{ id }], - } = await this.postgres.query( - tx, - `WITH insert_id AS ( - INSERT INTO posthog_personoverridemapping( - team_id, - uuid - ) - VALUES ( - ${teamId}, - '${personId}' - ) - ON CONFLICT("team_id", "uuid") DO NOTHING - RETURNING id - ) - SELECT * FROM insert_id - UNION ALL - SELECT id - FROM posthog_personoverridemapping - WHERE team_id = ${teamId} AND uuid = '${personId}' - `, - undefined, - 'personOverrideMapping' - ) - - return id - } - - public async getPersonOverrides(teamId: number): Promise { - const { rows } = await this.postgres.query( - PostgresUse.COMMON_WRITE, - SQL` - SELECT - override.team_id, - old_person.uuid as old_person_id, - override_person.uuid as override_person_id, - oldest_event - FROM posthog_personoverride override - LEFT OUTER JOIN posthog_personoverridemapping old_person - ON override.team_id = old_person.team_id AND override.old_person_id = old_person.id - LEFT OUTER JOIN posthog_personoverridemapping override_person - ON override.team_id = override_person.team_id AND override.override_person_id = override_person.id - WHERE override.team_id = ${teamId} - `, - undefined, - 'getPersonOverrides' - ) - return rows.map((row) => ({ - ...row, - oldest_event: DateTime.fromISO(row.oldest_event), - })) - } -} - export class FlatPersonOverrideWriter { constructor(private postgres: PostgresRouter) {} @@ -848,10 +660,7 @@ export class DeferredPersonOverrideWriter { /** * Enqueue an override for deferred processing. */ - public async addPersonOverride( - tx: TransactionClient, - overrideDetails: PersonOverrideDetails - ): Promise { + public async addPersonOverride(tx: TransactionClient, overrideDetails: PersonOverrideDetails): Promise { await this.postgres.query( tx, SQL` @@ -870,7 +679,6 @@ export class DeferredPersonOverrideWriter { 'pendingPersonOverride' ) deferredPersonOverridesWrittenCounter.inc() - return [] } } @@ -891,7 +699,7 @@ export class DeferredPersonOverrideWorker { constructor( private postgres: PostgresRouter, private kafkaProducer: KafkaProducerWrapper, - private writer: PersonOverrideWriter | FlatPersonOverrideWriter + private writer: FlatPersonOverrideWriter ) {} /** diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 318c3504d3e98..df847a5112f73 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -12,7 +12,6 @@ import { DeferredPersonOverrideWorker, DeferredPersonOverrideWriter, FlatPersonOverrideWriter, - PersonOverrideWriter, PersonState, } from '../../../src/worker/ingestion/person-state' import { delayUntilEventIngested } from '../../helpers/clickhouse' @@ -27,7 +26,7 @@ const timestampch = '2020-01-01 12:00:05.000' interface PersonOverridesMode { supportsSyncTransaction: boolean - getWriter(hub: Hub): PersonOverrideWriter | DeferredPersonOverrideWriter + getWriter(hub: Hub): DeferredPersonOverrideWriter fetchPostgresPersonIdOverrides( hub: Hub, teamId: number @@ -36,37 +35,6 @@ interface PersonOverridesMode { const PersonOverridesModes: Record = { disabled: undefined, - 'immediate, with mappings': { - supportsSyncTransaction: true, - getWriter: (hub) => new PersonOverrideWriter(hub.db.postgres), - fetchPostgresPersonIdOverrides: async (hub, teamId) => { - const writer = new PersonOverrideWriter(hub.db.postgres) // XXX: ideally would reference ``this``, not new instance - return new Set( - (await writer.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => ({ - old_person_id, - override_person_id, - })) - ) - }, - }, - 'deferred, with mappings': { - supportsSyncTransaction: false, - getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), - fetchPostgresPersonIdOverrides: async (hub, teamId) => { - const syncWriter = new PersonOverrideWriter(hub.db.postgres) - await new DeferredPersonOverrideWorker( - hub.db.postgres, - hub.db.kafkaProducer, - syncWriter - ).processPendingOverrides() - return new Set( - (await syncWriter.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => ({ - old_person_id, - override_person_id, - })) - ) - }, - }, 'deferred, without mappings (flat)': { supportsSyncTransaction: false, getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), @@ -2107,23 +2075,18 @@ describe('PersonState.update()', () => { }) }) -const PersonOverridesWriterMode = { - mapping: (hub: Hub) => new PersonOverrideWriter(hub.db.postgres), - flat: (hub: Hub) => new FlatPersonOverrideWriter(hub.db.postgres), -} - -describe.each(Object.keys(PersonOverridesWriterMode))('person overrides writer: %s', (mode) => { +describe('flat person overrides writer', () => { let hub: Hub let closeHub: () => Promise let organizationId: string let teamId: number - let writer: PersonOverrideWriter | FlatPersonOverrideWriter + let writer: FlatPersonOverrideWriter beforeAll(async () => { ;[hub, closeHub] = await createHub({}) organizationId = await createOrganization(hub.db.postgres) - writer = PersonOverridesWriterMode[mode](hub) + writer = new FlatPersonOverrideWriter(hub.db.postgres) }) beforeEach(async () => { @@ -2201,14 +2164,14 @@ describe('deferred person overrides', () => { let teamId: number let writer: DeferredPersonOverrideWriter - let syncWriter: PersonOverrideWriter + let syncWriter: FlatPersonOverrideWriter let worker: DeferredPersonOverrideWorker beforeAll(async () => { ;[hub, closeHub] = await createHub({}) organizationId = await createOrganization(hub.db.postgres) writer = new DeferredPersonOverrideWriter(hub.db.postgres) - syncWriter = new PersonOverrideWriter(hub.db.postgres) + syncWriter = new FlatPersonOverrideWriter(hub.db.postgres) worker = new DeferredPersonOverrideWorker(hub.db.postgres, hub.db.kafkaProducer, syncWriter) }) diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index 6843131c60333..446bcc0777a57 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -137,176 +137,6 @@ class PersonOverrideTuple(NamedTuple): override_person_id: UUID -class PostgresPersonOverridesManager: - def __init__(self, connection): - self.connection = connection - - def fetchall(self, team_id: int) -> Sequence[PersonOverrideTuple]: - with self.connection.cursor() as cursor: - cursor.execute( - """ - SELECT - old_person.uuid, - override_person.uuid - FROM posthog_personoverride override - LEFT OUTER JOIN - posthog_personoverridemapping old_person - ON override.team_id = old_person.team_id AND override.old_person_id = old_person.id - LEFT OUTER JOIN - posthog_personoverridemapping override_person - ON override.team_id = override_person.team_id AND override.override_person_id = override_person.id - WHERE override.team_id = %(team_id)s - """, - {"team_id": team_id}, - ) - return [PersonOverrideTuple(*row) for row in cursor.fetchall()] - - def insert(self, team_id: int, override: PersonOverrideTuple) -> None: - with self.connection.cursor() as cursor: - mapping_ids = [] - for person_uuid in (override.override_person_id, override.old_person_id): - cursor.execute( - """ - INSERT INTO posthog_personoverridemapping( - team_id, - uuid - ) - VALUES ( - %(team_id)s, - %(uuid)s - ) - ON CONFLICT("team_id", "uuid") DO NOTHING - RETURNING id - """, - {"team_id": team_id, "uuid": person_uuid}, - ) - mapping_ids.append(cursor.fetchone()) - - cursor.execute( - """ - INSERT INTO posthog_personoverride( - team_id, - old_person_id, - override_person_id, - oldest_event, - version - ) - VALUES ( - %(team_id)s, - %(old_person_id)s, - %(override_person_id)s, - NOW(), - 1 - ); - """, - { - "team_id": team_id, - "old_person_id": mapping_ids[1], - "override_person_id": mapping_ids[0], - }, - ) - - def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: bool = False) -> None: - with self.connection.cursor() as cursor: - SELECT_ID_FROM_OVERRIDE_UUID = """ - SELECT - id - FROM - posthog_personoverridemapping - WHERE - team_id = %(team_id)s - AND uuid = %(uuid)s; - """ - - cursor.execute( - SELECT_ID_FROM_OVERRIDE_UUID, - { - "team_id": person_override.team_id, - "uuid": person_override.old_person_id, - }, - ) - row = cursor.fetchone() - if not row: - return - - old_person_id = row[0] - - cursor.execute( - SELECT_ID_FROM_OVERRIDE_UUID, - { - "team_id": person_override.team_id, - "uuid": person_override.override_person_id, - }, - ) - row = cursor.fetchone() - if not row: - return - - override_person_id = row[0] - - DELETE_FROM_PERSON_OVERRIDES = """ - DELETE FROM - posthog_personoverride - WHERE - team_id = %(team_id)s - AND old_person_id = %(old_person_id)s - AND override_person_id = %(override_person_id)s - AND version = %(latest_version)s - RETURNING - old_person_id; - """ - - parameters = { - "team_id": person_override.team_id, - "old_person_id": old_person_id, - "override_person_id": override_person_id, - "latest_version": person_override.latest_version, - } - - if dry_run is True: - activity.logger.info("This is a DRY RUN so nothing will be deleted.") - activity.logger.info( - "Would have run query: %s with parameters %s", - DELETE_FROM_PERSON_OVERRIDES, - parameters, - ) - return - - cursor.execute(DELETE_FROM_PERSON_OVERRIDES, parameters) - row = cursor.fetchone() - if not row: - # There is no existing mapping for this (old_person_id, override_person_id) pair. - # It could be that a newer one was added (with a later version). - return - - deleted_id = row[0] - - DELETE_FROM_PERSON_OVERRIDE_MAPPINGS = """ - DELETE FROM - posthog_personoverridemapping - WHERE - id = %(deleted_id)s; - """ - - cursor.execute( - DELETE_FROM_PERSON_OVERRIDE_MAPPINGS, - { - "deleted_id": deleted_id, - }, - ) - - def clear(self, team_id: int) -> None: - with self.connection.cursor() as cursor: - cursor.execute( - "DELETE FROM posthog_personoverride WHERE team_id = %s", - [team_id], - ) - cursor.execute( - "DELETE FROM posthog_personoverridemapping WHERE team_id = %s", - [team_id], - ) - - class FlatPostgresPersonOverridesManager: def __init__(self, connection): self.connection = connection @@ -389,15 +219,6 @@ def clear(self, team_id: int) -> None: ) -POSTGRES_PERSON_OVERRIDES_MANAGERS = { - "mappings": PostgresPersonOverridesManager, - "flat": FlatPostgresPersonOverridesManager, -} - -DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER = "flat" -assert DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER in POSTGRES_PERSON_OVERRIDES_MANAGERS - - @dataclass class QueryInputs: """Inputs for activities that run queries in the SquashPersonOverrides workflow. @@ -421,7 +242,6 @@ class QueryInputs: dictionary_name: str = "person_overrides_join_dict" dry_run: bool = True _latest_created_at: str | datetime | None = None - postgres_person_overrides_manager: str = DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER def __post_init__(self) -> None: if isinstance(self._latest_created_at, datetime): @@ -450,9 +270,6 @@ def iter_person_overides_to_delete(self) -> Iterable[SerializablePersonOverrideT for person_override_to_delete in self.person_overrides_to_delete: yield SerializablePersonOverrideToDelete(*person_override_to_delete) - def get_postgres_person_overrides_manager(self, connection): - return POSTGRES_PERSON_OVERRIDES_MANAGERS[self.postgres_person_overrides_manager](connection) - @activity.defn async def prepare_person_overrides(inputs: QueryInputs) -> None: @@ -695,7 +512,7 @@ async def delete_squashed_person_overrides_from_postgres(inputs: QueryInputs) -> port=settings.DATABASES["default"]["PORT"], **settings.DATABASES["default"].get("SSL_OPTIONS", {}), ) as connection: - overrides_manager = inputs.get_postgres_person_overrides_manager(connection) + overrides_manager = FlatPostgresPersonOverridesManager(connection) for person_override_to_delete in inputs.iter_person_overides_to_delete(): activity.logger.debug("%s", person_override_to_delete) overrides_manager.delete(person_override_to_delete, inputs.dry_run) @@ -762,7 +579,6 @@ class SquashPersonOverridesInputs: dictionary_name: str = "person_overrides_join_dict" last_n_months: int = 1 dry_run: bool = True - postgres_person_overrides_manager: str = DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER def iter_partition_ids(self) -> Iterator[str]: """Iterate over configured partition ids. @@ -797,66 +613,21 @@ def iter_last_n_months(self) -> Iterator[datetime]: class SquashPersonOverridesWorkflow(PostHogWorkflow): """Workflow to squash outstanding person overrides into events. - Squashing refers to the process of updating the person_id associated with an event - to match the new id assigned via a person override. This process must be done - regularly to control the size of the person_overrides table. - - For example, let's imagine the initial state of tables as: - - posthog_personoverridesmapping - - | id | uuid | - | ------- + -------------------------------------- | - | 1 | '179bed4d-0cf9-49a5-8826-b4c36348fae4' | - | 2 | 'ced21432-7528-4045-bc22-855cbe69a6c1' | - - posthog_personoverride - - | old_person_id | override_person_id | - | ------------- + ------------------ | - | 1 | 2 | - - The activity select_persons_to_delete will select the uuid with id 1 as safe to delete - as its the only old_person_id at the time of starting. - - While executing this job, a new override (2->3) may be inserted, leaving both tables as: - - posthog_personoverridesmapping - - | id | uuid | - | ------- + -------------------------------------- | - | 1 | '179bed4d-0cf9-49a5-8826-b4c36348fae4' | - | 2 | 'ced21432-7528-4045-bc22-855cbe69a6c1' | - | 3 | 'b57de46b-55ad-4126-9a92-966fac570ec4' | - - posthog_personoverride - - | old_person_id | override_person_id | - | ------------- + ------------------ | - | 1 | 3 | - | 2 | 3 | - - Upon executing the squash_events_partition events with person_id 1 or 2 will be correctly - updated to reference person_id 3. - - At the end, we'll cleanup the tables by deleting the old_person_ids we deemed safe to do - so (1) from both tables: - - posthog_personoverridesmapping - - | id | uuid | - | ------- + -------------------------------------- | - | 2 | 'ced21432-7528-4045-bc22-855cbe69a6c1' | - | 3 | 'b57de46b-55ad-4126-9a92-966fac570ec4' | - - posthog_personoverride - - | old_person_id | override_person_id | - | ------------- + ------------------ | - | 2 | 3 | - - Any overrides that arrived during the job will be left there for the next job run to clean - up. These will be a no-op for the next job run as the override will already have been applied. + Squashing refers to the process of updating the person ID of existing + ClickHouse event records on disk to reflect their most up-to-date person ID. + + The persons associated with existing events can change as a result of + actions such as person merges. To account for this, we keep a record of what + new person ID should be used in place of (or "override") a previously used + person ID. The ``posthog_flatpersonoverride`` table is the primary + representation of this data in Postgres. The ``person_overrides`` table in + ClickHouse contains a replica of the data stored in Postgres, and can be + joined onto the events table to get the most up-to-date person for an event. + + This process must be done regularly to control the size of the person + overrides tables -- both to reduce the amount of storage required for these + tables, as well as ensuring that the join mentioned previously does not + become prohibitively large to evaluate. """ @staticmethod @@ -882,7 +653,6 @@ async def run(self, inputs: SquashPersonOverridesInputs): dictionary_name=inputs.dictionary_name, team_ids=inputs.team_ids, dry_run=inputs.dry_run, - postgres_person_overrides_manager=inputs.postgres_person_overrides_manager, ) async with person_overrides_dictionary( diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index 4e90610914ef4..795e05879f99c 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -2,7 +2,7 @@ import random from collections import defaultdict from datetime import datetime, timedelta -from typing import Iterator, NamedTuple, TypedDict +from typing import Iterator, TypedDict from uuid import UUID, uuid4 import psycopg2 @@ -23,9 +23,8 @@ PERSON_OVERRIDES_CREATE_TABLE_SQL, ) from posthog.temporal.batch_exports.squash_person_overrides import ( - POSTGRES_PERSON_OVERRIDES_MANAGERS, + FlatPostgresPersonOverridesManager, PersonOverrideTuple, - PostgresPersonOverridesManager, QueryInputs, SerializablePersonOverrideToDelete, SquashPersonOverridesInputs, @@ -935,39 +934,23 @@ def team_id(query_inputs, organization_uuid, pg_connection): cursor.execute("DELETE FROM posthog_team WHERE id = %s", [team_id]) -class PostgresPersonOverrideFixtures(NamedTuple): - manager: str - override: PersonOverrideTuple - - -@pytest.fixture(params=POSTGRES_PERSON_OVERRIDES_MANAGERS.keys()) -def postgres_person_override_fixtures( - request, query_inputs: QueryInputs, team_id, pg_connection -) -> Iterator[PostgresPersonOverrideFixtures]: +@pytest.fixture +def postgres_person_override(team_id, pg_connection) -> Iterator[PersonOverrideTuple]: """Create a PersonOverrideMapping and a PersonOverride. We cannot use the Django ORM safely in an async context, so we INSERT INTO directly on the database. This means we need to clean up after ourselves, which we do after yielding. """ - # XXX: Several activity-based tests use this person overrides fixture and - # should vary their behavior to ensure that they work with both the old - # (mappings) and new (flat) approaches, but not all tests that use - # `query_inputs` need to be vary on the overrides manager type as many of - # them don't use Postgres overrides at all. To ensure that whenever Postgres - # overrides *are* used, we need to update the fixture here. This indirection - # isn't good, but this code should be short-lived, right? (... right???) - query_inputs.postgres_person_overrides_manager = request.param - override = PersonOverrideTuple(uuid4(), uuid4()) with pg_connection: - query_inputs.get_postgres_person_overrides_manager(pg_connection).insert(team_id, override) + FlatPostgresPersonOverridesManager(pg_connection).insert(team_id, override) - yield PostgresPersonOverrideFixtures(request.param, override) + yield override with pg_connection: - query_inputs.get_postgres_person_overrides_manager(pg_connection).clear(team_id) + FlatPostgresPersonOverridesManager(pg_connection).clear(team_id) @pytest.mark.django_db @@ -976,7 +959,7 @@ async def test_delete_squashed_person_overrides_from_postgres( query_inputs, activity_environment, team_id, - postgres_person_override_fixtures: PostgresPersonOverrideFixtures, + postgres_person_override: PersonOverrideTuple, pg_connection, ): """Test we can delete person overrides that have already been squashed. @@ -984,18 +967,16 @@ async def test_delete_squashed_person_overrides_from_postgres( For the purposes of this unit test, we take the person overrides as given. A comprehensive test will cover the entire worflow end-to-end. """ - override = postgres_person_override_fixtures.override - # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override] + assert FlatPostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [postgres_person_override] person_overrides_to_delete = [ SerializablePersonOverrideToDelete( team_id, - override.old_person_id, - override.override_person_id, + postgres_person_override.old_person_id, + postgres_person_override.override_person_id, OVERRIDES_CREATED_AT.isoformat(), 1, OLDEST_EVENT_AT.isoformat(), @@ -1007,7 +988,7 @@ async def test_delete_squashed_person_overrides_from_postgres( await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) with pg_connection: - assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [] + assert FlatPostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [] @pytest.mark.django_db @@ -1016,22 +997,20 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( query_inputs, activity_environment, team_id, - postgres_person_override_fixtures: PostgresPersonOverrideFixtures, + postgres_person_override: PersonOverrideTuple, pg_connection, ): """Test we do not delete person overrides when dry_run=True.""" - override = postgres_person_override_fixtures.override - # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override] + assert FlatPostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [postgres_person_override] person_overrides_to_delete = [ SerializablePersonOverrideToDelete( team_id, - override.old_person_id, - override.override_person_id, + postgres_person_override.old_person_id, + postgres_person_override.override_person_id, OVERRIDES_CREATED_AT.isoformat(), 1, OLDEST_EVENT_AT.isoformat(), @@ -1043,99 +1022,7 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) with pg_connection: - assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override] - - -@pytest.mark.django_db -@pytest.mark.asyncio -async def test_delete_squashed_person_overrides_from_postgres_with_newer_override( - query_inputs, - activity_environment, - team_id, - postgres_person_override_fixtures: PostgresPersonOverrideFixtures, - pg_connection, -): - """Test we do not delete a newer mapping from Postgres. - - For the purposes of this unit test, we take the person overrides as given. A - comprehensive test will cover the entire worflow end-to-end. - """ - override = postgres_person_override_fixtures.override - - # These are sanity checks to ensure the fixtures are working properly. - # If any assertions fail here, its likely a test setup issue. - with pg_connection: - overrides_manager = query_inputs.get_postgres_person_overrides_manager(pg_connection) - if not isinstance(overrides_manager, PostgresPersonOverridesManager): - pytest.xfail(f"{overrides_manager!r} does not support mappings") - - with pg_connection.cursor() as cursor: - cursor.execute("SELECT id, team_id, uuid FROM posthog_personoverridemapping") - mappings = cursor.fetchall() - assert len(mappings) == 2 - - cursor.execute("SELECT team_id, old_person_id, override_person_id FROM posthog_personoverride") - overrides = cursor.fetchall() - assert len(overrides) == 1 - - with pg_connection: - with pg_connection.cursor() as cursor: - # Let's insert a newer mapping that arrives while we are running the squash job. - # Since only one mapping can exist per old_person_id, we'll bump the version number. - cursor.execute( - """ - UPDATE posthog_personoverride - SET version = version + 1 - WHERE - team_id = %(team_id)s - AND old_person_id = %(old_person_id)s - """, - { - "team_id": team_id, - "old_person_id": [mapping[0] for mapping in mappings if mapping[2] == override.old_person_id][0], - }, - ) - - person_overrides_to_delete = [ - # We are schedulling for deletion an override with lower version number, so nothing should happen. - SerializablePersonOverrideToDelete( - team_id, - override.old_person_id, - override.override_person_id, - OVERRIDES_CREATED_AT.isoformat(), - 1, - OLDEST_EVENT_AT.isoformat(), - ) - ] - query_inputs.person_overrides_to_delete = person_overrides_to_delete - query_inputs.dry_run = False - - await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) - - with pg_connection: - with pg_connection.cursor() as cursor: - cursor.execute("SELECT id, team_id, uuid FROM posthog_personoverridemapping") - mappings = cursor.fetchall() - - # Nothing was deleted from mappings table - assert len(mappings) == 2 - assert override.override_person_id in [mapping[2] for mapping in mappings] - assert override.old_person_id in [mapping[2] for mapping in mappings] - - cursor.execute("SELECT team_id, old_person_id, override_person_id, version FROM posthog_personoverride") - overrides = cursor.fetchall() - - # And nothing was deleted from overrides table - assert len(overrides) == 1 - - team_id, old_person_id, override_person_id, version = overrides[0] - assert team_id == team_id - assert old_person_id == [mapping[0] for mapping in mappings if mapping[2] == override.old_person_id][0] - assert ( - override_person_id - == [mapping[0] for mapping in mappings if mapping[2] == override.override_person_id][0] - ) - assert version == 2 + assert FlatPostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [postgres_person_override] @pytest.mark.django_db @@ -1143,7 +1030,7 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid async def test_squash_person_overrides_workflow( events_to_override, person_overrides_data, - postgres_person_override_fixtures: PostgresPersonOverrideFixtures, + postgres_person_override: PersonOverrideTuple, person_overrides_table, ): """Test the squash_person_overrides workflow end-to-end.""" @@ -1156,7 +1043,6 @@ async def test_squash_person_overrides_workflow( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, - postgres_person_overrides_manager=postgres_person_override_fixtures.manager, ) async with Worker( @@ -1192,7 +1078,7 @@ async def test_squash_person_overrides_workflow( async def test_squash_person_overrides_workflow_with_newer_overrides( events_to_override, person_overrides_data, - postgres_person_override_fixtures: PostgresPersonOverrideFixtures, + postgres_person_override: PersonOverrideTuple, newer_overrides, ): """Test the squash_person_overrides workflow end-to-end with newer overrides.""" @@ -1205,7 +1091,6 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, - postgres_person_overrides_manager=postgres_person_override_fixtures.manager, ) async with Worker( @@ -1238,7 +1123,7 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( async def test_squash_person_overrides_workflow_with_limited_team_ids( events_to_override, person_overrides_data, - postgres_person_override_fixtures: PostgresPersonOverrideFixtures, + postgres_person_override: PersonOverrideTuple, ): """Test the squash_person_overrides workflow end-to-end.""" client = await Client.connect( @@ -1251,7 +1136,6 @@ async def test_squash_person_overrides_workflow_with_limited_team_ids( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], team_ids=[random_team], - postgres_person_overrides_manager=postgres_person_override_fixtures.manager, dry_run=False, )