-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(temporal/squash): Support flat person override table in squash workflow #19347
Conversation
f24a57d
to
7d04c33
Compare
SELECT_ID_FROM_OVERRIDE_UUID = """ | ||
SELECT | ||
id | ||
FROM | ||
posthog_personoverridemapping | ||
WHERE | ||
team_id = %(team_id)s | ||
AND uuid = %(uuid)s; | ||
""" | ||
|
||
DELETE_FROM_PERSON_OVERRIDES = """ | ||
DELETE FROM | ||
posthog_personoverride | ||
WHERE | ||
team_id = %(team_id)s | ||
AND old_person_id = %(old_person_id)s | ||
AND override_person_id = %(override_person_id)s | ||
AND version = %(latest_version)s | ||
RETURNING | ||
old_person_id; | ||
""" | ||
|
||
DELETE_FROM_PERSON_OVERRIDE_MAPPINGS = """ | ||
DELETE FROM | ||
posthog_personoverridemapping | ||
WHERE | ||
id = %(id)s; | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved these to their call site because they are now only used conditionally based on which override manager is used. (I also find them a little easier to read that way, too.)
def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: bool = False) -> None: | ||
with self.connection.cursor() as cursor: | ||
SELECT_ID_FROM_OVERRIDE_UUID = """ | ||
SELECT | ||
id | ||
FROM | ||
posthog_personoverridemapping | ||
WHERE | ||
team_id = %(team_id)s | ||
AND uuid = %(uuid)s; | ||
""" | ||
|
||
cursor.execute( | ||
SELECT_ID_FROM_OVERRIDE_UUID, | ||
{ | ||
"team_id": person_override.team_id, | ||
"uuid": person_override.old_person_id, | ||
}, | ||
) | ||
row = cursor.fetchone() | ||
if not row: | ||
return | ||
|
||
old_person_id = row[0] | ||
|
||
cursor.execute( | ||
SELECT_ID_FROM_OVERRIDE_UUID, | ||
{ | ||
"team_id": person_override.team_id, | ||
"uuid": person_override.override_person_id, | ||
}, | ||
) | ||
row = cursor.fetchone() | ||
if not row: | ||
return | ||
|
||
override_person_id = row[0] | ||
|
||
DELETE_FROM_PERSON_OVERRIDES = """ | ||
DELETE FROM | ||
posthog_personoverride | ||
WHERE | ||
team_id = %(team_id)s | ||
AND old_person_id = %(old_person_id)s | ||
AND override_person_id = %(override_person_id)s | ||
AND version = %(latest_version)s | ||
RETURNING | ||
old_person_id; | ||
""" | ||
|
||
parameters = { | ||
"team_id": person_override.team_id, | ||
"old_person_id": old_person_id, | ||
"override_person_id": override_person_id, | ||
"latest_version": person_override.latest_version, | ||
} | ||
|
||
if dry_run is True: | ||
activity.logger.info("This is a DRY RUN so nothing will be deleted.") | ||
activity.logger.info( | ||
"Would have run query: %s with parameters %s", | ||
DELETE_FROM_PERSON_OVERRIDES, | ||
parameters, | ||
) | ||
return | ||
|
||
cursor.execute(DELETE_FROM_PERSON_OVERRIDES, parameters) | ||
row = cursor.fetchone() | ||
if not row: | ||
# There is no existing mapping for this (old_person_id, override_person_id) pair. | ||
# It could be that a newer one was added (with a later version). | ||
return | ||
|
||
deleted_id = row[0] | ||
|
||
DELETE_FROM_PERSON_OVERRIDE_MAPPINGS = """ | ||
DELETE FROM | ||
posthog_personoverridemapping | ||
WHERE | ||
id = %(deleted_id)s; | ||
""" | ||
|
||
cursor.execute( | ||
DELETE_FROM_PERSON_OVERRIDE_MAPPINGS, | ||
{ | ||
"deleted_id": deleted_id, | ||
}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of a similar story to #19220 (comment): the rest of this class is mostly here to support easier substitution during tests — this is the only method that gets used on production paths.
This particular block was just copied in from the original call site.
There's also a lot of duplication here with that (JS/TS) implementation as well. Unfortunate consequence of cross-runtime logic, I guess.
override_person_id: UUID | ||
|
||
|
||
class PostgresPersonOverridesManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not married to this name (kind of actively dislike it) — not sure if something like PostgresPersonOverrideStrategy
would make sense? Or something else completely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No strong opinion here. PostgresPersonOverrideStrategy
is good too. 🤷♂️
POSTGRES_PERSON_OVERRIDES_MANAGERS = { | ||
"mappings": PostgresPersonOverridesManager, | ||
"flat": FlatPostgresPersonOverridesManager, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These technically could/should support an abstract base but I don't think that's worth the investment right now.
DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER = "mappings" | ||
assert DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER in POSTGRES_PERSON_OVERRIDES_MANAGERS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also clunky, but similar to above I don't think it's worth wrapping this in an abstraction.
@@ -184,6 +421,7 @@ class QueryInputs: | |||
dictionary_name: str = "person_overrides_join_dict" | |||
dry_run: bool = True | |||
_latest_created_at: str | datetime | None = None | |||
postgres_person_overrides_manager: str = DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really like tacking this on here as it's only used in one activity within the workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could have a class holding all the workflow activities and also the manager. This class and the manager would be initialized once when starting the temporal worker. Here's a sample of the pattern: https://github.com/temporalio/samples-python/blob/main/hello/hello_activity_method.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not saying it would be better, just wanted to give a suggestion given your comment 👍
@@ -928,7 +926,7 @@ def team_id(query_inputs, organization_uuid, pg_connection): | |||
""", | |||
{"organization_uuid": organization_uuid}, | |||
) | |||
team_id = cursor.fetchone() | |||
[team_id] = cursor.fetchone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was honestly kind of surprised this was working at all — this is passed back into psycopg2.execute
as parameters later on in situations where I'd assume it'd expect a scalar, but I guess this works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird...
# XXX: Several activity-based tests use this person overrides fixture and | ||
# should vary their behavior to ensure that they work with both the old | ||
# (mappings) and new (flat) approaches, but not all tests that use | ||
# `query_inputs` need to be vary on the overrides manager type as many of | ||
# them don't use Postgres overrides at all. To ensure that whenever Postgres | ||
# overrides *are* used, we need to update the fixture here. This indirection | ||
# isn't good, but this code should be short-lived, right? (... right???) | ||
query_inputs.postgres_person_overrides_manager = request.param |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are definitely better approaches here, but I couldn't think of any that wouldn't require a bunch of refactoring. Second opinions welcomed!
overrides_manager = inputs.get_postgres_person_overrides_manager(connection) | ||
for person_override_to_delete in inputs.iter_person_overides_to_delete(): | ||
activity.logger.debug("%s", person_override_to_delete) | ||
overrides_manager.delete(person_override_to_delete, inputs.dry_run) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bretthoerner brought up a good question here about whether we should bother with deleting this data at all: #19220 (comment)
I'm not sure I have a strong opinion one way or another.
In the case of data corruption, our best source of truth for the event to person relationship would be by looking up the current person associated with an event's distinct_id1, so I'm not sure we gain a lot by keeping these records around. (I figure that if we break persons somehow, we likely break them by accidentally writing bad data to this table.)
If we expect to need to rebuild the ClickHouse overrides table for some reason though (schema change, mostly, or the unlikely scenario that the Kafka engine goes haywire), I could see the benefit of keeping this around, but I'm not sure how likely that would be.
Footnotes
%(team_id)s, | ||
%(old_person_id)s, | ||
%(override_person_id)s, | ||
NOW(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this is only for tests, right? Otherwise we'd use something from one of the users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep!
%(team_id)s, | ||
%(old_person_id)s, | ||
%(override_person_id)s, | ||
NOW(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, same here.
@@ -928,7 +926,7 @@ def team_id(query_inputs, organization_uuid, pg_connection): | |||
""", | |||
{"organization_uuid": organization_uuid}, | |||
) | |||
team_id = cursor.fetchone() | |||
[team_id] = cursor.fetchone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird...
override_person_id: UUID | ||
|
||
|
||
class PostgresPersonOverridesManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No strong opinion here. PostgresPersonOverrideStrategy
is good too. 🤷♂️
This PR hasn't seen activity in a week! Should it be merged, closed, or further worked on? If you want to keep it open, post a comment or remove the |
* master: (94 commits) feat: Add cta on billing for >20k annual spend customers (#19508) refactor(temporal/squash): Support flat person override table in squash workflow (#19347) fix(surveys): remove link in user interview template (#19584) feat: populate plugin capabilities on install and edit (#19188) chore: Make plugin-server ignore deleted plugin configs (#18444) chore: Add Flutter feature flags snippets (#19563) fix(bi): fixed some of the query duplications (#19573) fix: assume typeless series nodes are of type events node (#19550) chore(deps): bump chromaui/action from 1 to 10 (#19560) fix(trends): fix breakdowns persons label (#19534) fix(trends): fix breakdown legend (#19533) feat: incremental updates for mobile transformer (#19567) chore(deps): bump peter-evans/find-comment from 1 to 2 (#19559) chore(data-warehouse): cleanup unused celery code and extend time (#19568) Revert "feat(data-warehouse): hubspot integration" (#19569) feat(data-warehouse): hubspot integration (#19529) feat: Feature gate session replay controls using available_product_features (#19401) fix: Padding on bullet lists (#19565) chore: Post 3000 LemonButtton cleanup (#19540) chore(deps): bump aws-actions/amazon-ecr-login from 1 to 2 (#19558) ...
Problem
Building on top of #19220, this adds support for the "flat" person overrides model/table in the squash workflow.
Whether the existing path or flat path is used can be controlled with the
postgres_person_overrides_manager
workflow input parameter key. Values can either bemappings
(existing path; the default) orflat
(new table.)This can be merged prior to the flat overrides write path being turned on, as it supports both the old and new paths. Once the migration occurs to use that new write path,
DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER
should be updated to use flat overrides as default: otherwise, it'd try to delete from the wrong table.How did you test this code?