-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(persons): Add bulk deletion endpoint #24790
Changes from all commits
bf66822
d0aa06f
5269bb7
e90f3c7
2cf9516
5f65b12
86b966e
5e25cc7
8d5053d
4196136
5c72dda
380f683
f5791bb
68d7252
49629bf
0da6e88
dacc610
4f2c08d
be8187a
28f3367
91cd660
97e8ec2
682e8f9
b399e5f
e08c403
2dcb74f
c7b854f
af48da5
4507c31
e391da0
39ea30f
9bfb4a7
e8596cb
aca1bae
431d733
338686a
6f17af9
e317d39
be3d8f0
6221da0
bb122d1
3fb562c
5cd21ea
7777500
ec97dd8
a42d0fc
b1c688c
1fb2ec9
46c9a32
5630444
ca766ee
5bb8cc6
7a67664
47e860b
5c875d5
bb05558
dceb26f
b7e8a75
735c969
0ff2391
0bb1f89
93857ed
8cd543b
3c5f554
9a29853
a5d1230
ac40e2a
37af305
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -353,12 +353,15 @@ def list(self, request: request.Request, *args: Any, **kwargs: Any) -> response. | |
OpenApiParameter( | ||
"delete_events", | ||
OpenApiTypes.BOOL, | ||
description="If true, a task to delete all events associated with this person will be created and queued. The task does not run immediately and instead is batched together and at 5AM UTC every Sunday (controlled by environment variable CLEAR_CLICKHOUSE_REMOVED_DATA_SCHEDULE_CRON)", | ||
description="If true, a task to delete all events associated with this person will be created and queued. The task does not run immediately and instead is batched together and at 5AM UTC every Sunday", | ||
default=False, | ||
), | ||
], | ||
) | ||
def destroy(self, request: request.Request, pk=None, **kwargs): | ||
""" | ||
Use this endpoint to delete individual persons. For bulk deletion, use the bulk_delete endpoint instead. | ||
""" | ||
try: | ||
person = self.get_object() | ||
person_id = person.id | ||
|
@@ -391,6 +394,70 @@ def destroy(self, request: request.Request, pk=None, **kwargs): | |
except Person.DoesNotExist: | ||
raise NotFound(detail="Person not found.") | ||
|
||
@extend_schema( | ||
parameters=[ | ||
OpenApiParameter( | ||
"delete_events", | ||
OpenApiTypes.BOOL, | ||
description="If true, a task to delete all events associated with this person will be created and queued. The task does not run immediately and instead is batched together and at 5AM UTC every Sunday", | ||
default=False, | ||
), | ||
OpenApiParameter( | ||
"distinct_ids", | ||
OpenApiTypes.OBJECT, | ||
description="A list of distinct IDs, up to 100 of them. We'll delete all persons associated with those distinct IDs.", | ||
), | ||
OpenApiParameter( | ||
"ids", | ||
OpenApiTypes.OBJECT, | ||
description="A list of PostHog person IDs, up to 100 of them. We'll delete all the persons listed.", | ||
), | ||
], | ||
) | ||
@action(methods=["POST"], detail=False, required_scopes=["person:write"]) | ||
def bulk_delete(self, request: request.Request, pk=None, **kwargs): | ||
""" | ||
This endpoint allows you to bulk delete persons, either by the PostHog person IDs or by distinct IDs. You can pass in a maximum of 100 IDs per call. | ||
""" | ||
if distinct_ids := request.data.get("distinct_ids"): | ||
if len(distinct_ids) > 100: | ||
raise ValidationError("You can only pass 100 distinct_ids in one call") | ||
persons = self.get_queryset().filter(persondistinctid__distinct_id__in=distinct_ids) | ||
elif ids := request.data.get("ids"): | ||
if len(ids) > 100: | ||
raise ValidationError("You can only pass 100 ids in one call") | ||
persons = self.get_queryset().filter(uuid__in=ids) | ||
else: | ||
raise ValidationError("You need to specify either distinct_ids or ids") | ||
|
||
for person in persons: | ||
delete_person(person=person) | ||
self.perform_destroy(person) | ||
log_activity( | ||
organization_id=self.organization.id, | ||
team_id=self.team_id, | ||
user=cast(User, request.user), | ||
was_impersonated=is_impersonated_session(request), | ||
item_id=person.id, | ||
scope="Person", | ||
activity="deleted", | ||
detail=Detail(name=str(person.uuid)), | ||
) | ||
# Once the person is deleted, queue deletion of associated data, if that was requested | ||
if request.data.get("delete_events"): | ||
AsyncDeletion.objects.bulk_create( | ||
[ | ||
AsyncDeletion( | ||
deletion_type=DeletionType.Person, | ||
team_id=self.team_id, | ||
key=str(person.uuid), | ||
created_by=cast(User, self.request.user), | ||
) | ||
], | ||
ignore_conflicts=True, | ||
) | ||
Comment on lines
+448
to
+458
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is going to be up to 100 INSERTs per request, would be great to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same problem as #24790 (comment) |
||
return response.Response(status=202) | ||
|
||
@action(methods=["GET"], detail=False, required_scopes=["person:read"]) | ||
def values(self, request: request.Request, **kwargs) -> response.Response: | ||
key = request.GET.get("key") | ||
|
@@ -636,16 +703,17 @@ def _set_properties(self, properties, user): | |
}, | ||
) | ||
|
||
log_activity( | ||
organization_id=self.organization.id, | ||
team_id=self.team.id, | ||
user=user, | ||
was_impersonated=is_impersonated_session(self.request), | ||
item_id=instance.pk, | ||
scope="Person", | ||
activity="updated", | ||
detail=Detail(changes=[Change(type="Person", action="changed", field="properties")]), | ||
) | ||
if self.organization.id: # should always be true, but mypy... | ||
Twixes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log_activity( | ||
organization_id=self.organization.id, | ||
team_id=self.team.id, | ||
user=user, | ||
was_impersonated=is_impersonated_session(self.request), | ||
item_id=instance.pk, | ||
scope="Person", | ||
activity="updated", | ||
detail=Detail(changes=[Change(type="Person", action="changed", field="properties")]), | ||
) | ||
|
||
# PRAGMA: Methods for getting Persons via clickhouse queries | ||
def _respond_with_cached_results( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not bulk
persons.objects.delete()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was worried about deletions failing halfway through, because we can't do transactions in clickhouse. So if we first bulk-deleted all the persons in posthog, but failed to half the people in clickhouse, we'd end up in a weird state. By doing it sequentially, at least the failure will be contained to one person, rather than potentially up to 100.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I don't think we can tell if a ClickHouse-side delete fails anyway, because all
delete_person()
does is queue a deletion row into Kafka, which is extremely unlikely to fail. So if we first bulk-delete in Postgres, and then emit deletion rows to CH, that should be the highest level of integrity possible in this situation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is we may not even emit those rows if we fail halfway through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's definitely a tradeoff – in this explicit bulk delete case it doesn't feel great to put this O(n) load on Postgres, but I see what you mean. For maximum integrity this route, we should swap the
delete_person(person=person)
andself.perform_destroy(person)
lines though.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm this doesn't work as the person gets deleted before clickhouse get a chance to delete it. I also think it's more likely for clickhouse to fail, thus the current order does make sense