Skip to content

Commit

Permalink
Consolidate override data structures
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming committed Dec 15, 2023
1 parent 7696cf9 commit f85cadf
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 56 deletions.
35 changes: 19 additions & 16 deletions posthog/temporal/batch_exports/squash_person_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections.abc import Iterator
from dataclasses import asdict, dataclass, field
from datetime import datetime, timedelta, timezone
from typing import AsyncIterator, Iterable, NamedTuple
from typing import AsyncIterator, Iterable, NamedTuple, Sequence
from uuid import UUID

import psycopg2
Expand Down Expand Up @@ -132,16 +132,20 @@ class SerializablePersonOverrideToDelete(NamedTuple):
oldest_event_at: str


class PersonOverrideTuple(NamedTuple):
old_person_id: UUID
override_person_id: UUID


class PostgresPersonOverridesManager:
def __init__(self, connection):
self.connection = connection

def fetchall(self, team_id: int):
def fetchall(self, team_id: int) -> Sequence[PersonOverrideTuple]:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT
override.team_id,
old_person.uuid,
override_person.uuid
FROM posthog_personoverride override
Expand All @@ -155,12 +159,12 @@ def fetchall(self, team_id: int):
""",
{"team_id": team_id},
)
return cursor.fetchall()
return [PersonOverrideTuple(*row) for row in cursor.fetchall()]

def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> None:
def insert(self, team_id: int, override: PersonOverrideTuple) -> None:
with self.connection.cursor() as cursor:
person_ids = []
for person_uuid in (override_person_id, old_person_id):
mapping_ids = []
for person_uuid in (override.override_person_id, override.old_person_id):
cursor.execute(
"""
INSERT INTO posthog_personoverridemapping(
Expand All @@ -176,7 +180,7 @@ def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) ->
""",
{"team_id": team_id, "uuid": person_uuid},
)
person_ids.append(cursor.fetchone())
mapping_ids.append(cursor.fetchone())

cursor.execute(
"""
Expand All @@ -197,8 +201,8 @@ def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) ->
""",
{
"team_id": team_id,
"old_person_id": person_ids[1],
"override_person_id": person_ids[0],
"old_person_id": mapping_ids[1],
"override_person_id": mapping_ids[0],
},
)

Expand Down Expand Up @@ -307,22 +311,21 @@ class FlatPostgresPersonOverridesManager:
def __init__(self, connection):
self.connection = connection

def fetchall(self, team_id: int):
def fetchall(self, team_id: int) -> Sequence[PersonOverrideTuple]:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT
team_id,
old_person_id,
override_person_id
FROM posthog_flatpersonoverride
WHERE team_id = %(team_id)s
""",
{"team_id": team_id},
)
return cursor.fetchall()
return [PersonOverrideTuple(*row) for row in cursor.fetchall()]

def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> None:
def insert(self, team_id: int, override: PersonOverrideTuple) -> None:
with self.connection.cursor() as cursor:
cursor.execute(
"""
Expand All @@ -343,8 +346,8 @@ def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) ->
""",
{
"team_id": team_id,
"old_person_id": old_person_id,
"override_person_id": override_person_id,
"old_person_id": override.old_person_id,
"override_person_id": override.override_person_id,
},
)

Expand Down
62 changes: 22 additions & 40 deletions posthog/temporal/tests/test_squash_person_overrides_workflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import operator
import random
from collections import defaultdict, namedtuple
from collections import defaultdict
from datetime import datetime, timedelta
from typing import NamedTuple, TypedDict
from uuid import UUID, uuid4
Expand All @@ -24,6 +24,7 @@
)
from posthog.temporal.batch_exports.squash_person_overrides import (
POSTGRES_PERSON_OVERRIDES_MANAGERS,
PersonOverrideTuple,
PostgresPersonOverridesManager,
QueryInputs,
SerializablePersonOverrideToDelete,
Expand Down Expand Up @@ -85,9 +86,6 @@ def person_overrides_table(query_inputs):
sync_execute("DROP TABLE person_overrides")


PersonOverrideTuple = namedtuple("PersonOverrideTuple", ("old_person_id", "override_person_id"))


OVERRIDES_CREATED_AT = datetime.fromisoformat("2020-01-02T00:00:00.123123+00:00")
OLDEST_EVENT_AT = OVERRIDES_CREATED_AT - timedelta(days=1)

Expand Down Expand Up @@ -961,18 +959,12 @@ def person_override_fixtures(request, query_inputs: QueryInputs, team_id, pg_con
# isn't good, but this code should be short-lived, right? (... right???)
query_inputs.postgres_person_overrides_manager = request.param

old_person_id = uuid4()
override_person_id = uuid4()
person_override = PersonOverrideTuple(old_person_id, override_person_id)
override = PersonOverrideTuple(uuid4(), uuid4())

with pg_connection:
query_inputs.get_postgres_person_overrides_manager(pg_connection).insert(
team_id,
old_person_id=person_override.old_person_id,
override_person_id=person_override.override_person_id,
)
query_inputs.get_postgres_person_overrides_manager(pg_connection).insert(team_id, override)

yield PostgresPersonOverrideFixtures(request.param, person_override)
yield PostgresPersonOverrideFixtures(request.param, override)

with pg_connection:
query_inputs.get_postgres_person_overrides_manager(pg_connection).clear(team_id)
Expand All @@ -988,20 +980,18 @@ async def test_delete_squashed_person_overrides_from_postgres(
For the purposes of this unit test, we take the person overrides as given. A
comprehensive test will cover the entire worflow end-to-end.
"""
person_override = person_override_fixtures.override
override = person_override_fixtures.override

# These are sanity checks to ensure the fixtures are working properly.
# If any assertions fail here, its likely a test setup issue.
with pg_connection:
assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [
(team_id, person_override.old_person_id, person_override.override_person_id)
]
assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override]

person_overrides_to_delete = [
SerializablePersonOverrideToDelete(
team_id,
person_override.old_person_id,
person_override.override_person_id,
override.old_person_id,
override.override_person_id,
OVERRIDES_CREATED_AT.isoformat(),
1,
OLDEST_EVENT_AT.isoformat(),
Expand All @@ -1022,20 +1012,18 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run(
query_inputs, activity_environment, team_id, person_override_fixtures: PostgresPersonOverrideFixtures, pg_connection
):
"""Test we do not delete person overrides when dry_run=True."""
person_override = person_override_fixtures.override
override = person_override_fixtures.override

# These are sanity checks to ensure the fixtures are working properly.
# If any assertions fail here, its likely a test setup issue.
with pg_connection:
assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [
(team_id, person_override.old_person_id, person_override.override_person_id)
]
assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override]

person_overrides_to_delete = [
SerializablePersonOverrideToDelete(
team_id,
person_override.old_person_id,
person_override.override_person_id,
override.old_person_id,
override.override_person_id,
OVERRIDES_CREATED_AT.isoformat(),
1,
OLDEST_EVENT_AT.isoformat(),
Expand All @@ -1047,9 +1035,7 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run(
await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs)

with pg_connection:
assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [
(team_id, person_override.old_person_id, person_override.override_person_id)
]
assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override]


@pytest.mark.django_db
Expand All @@ -1062,7 +1048,7 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid
For the purposes of this unit test, we take the person overrides as given. A
comprehensive test will cover the entire worflow end-to-end.
"""
person_override = person_override_fixtures.override
override = person_override_fixtures.override

# These are sanity checks to ensure the fixtures are working properly.
# If any assertions fail here, its likely a test setup issue.
Expand Down Expand Up @@ -1094,18 +1080,16 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid
""",
{
"team_id": team_id,
"old_person_id": [
mapping[0] for mapping in mappings if mapping[2] == person_override.old_person_id
][0],
"old_person_id": [mapping[0] for mapping in mappings if mapping[2] == override.old_person_id][0],
},
)

person_overrides_to_delete = [
# We are schedulling for deletion an override with lower version number, so nothing should happen.
SerializablePersonOverrideToDelete(
team_id,
person_override.old_person_id,
person_override.override_person_id,
override.old_person_id,
override.override_person_id,
OVERRIDES_CREATED_AT.isoformat(),
1,
OLDEST_EVENT_AT.isoformat(),
Expand All @@ -1123,8 +1107,8 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid

# Nothing was deleted from mappings table
assert len(mappings) == 2
assert person_override.override_person_id in [mapping[2] for mapping in mappings]
assert person_override.old_person_id in [mapping[2] for mapping in mappings]
assert override.override_person_id in [mapping[2] for mapping in mappings]
assert override.old_person_id in [mapping[2] for mapping in mappings]

cursor.execute("SELECT team_id, old_person_id, override_person_id, version FROM posthog_personoverride")
overrides = cursor.fetchall()
Expand All @@ -1134,12 +1118,10 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid

team_id, old_person_id, override_person_id, version = overrides[0]
assert team_id == team_id
assert (
old_person_id == [mapping[0] for mapping in mappings if mapping[2] == person_override.old_person_id][0]
)
assert old_person_id == [mapping[0] for mapping in mappings if mapping[2] == override.old_person_id][0]
assert (
override_person_id
== [mapping[0] for mapping in mappings if mapping[2] == person_override.override_person_id][0]
== [mapping[0] for mapping in mappings if mapping[2] == override.override_person_id][0]
)
assert version == 2

Expand Down

0 comments on commit f85cadf

Please sign in to comment.