Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement person overrides backfill task with adaptive query range selection. #20495

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions posthog/management/commands/backfill_distinct_id_overrides.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from __future__ import annotations

import logging
from dataclasses import dataclass
from typing import Iterator, NamedTuple

import structlog
from clickhouse_driver.errors import ErrorCodes, ServerException
from django.core.management.base import BaseCommand, CommandError

from posthog.clickhouse.client.execute import sync_execute
from posthog.models.team.team import Team


logger = structlog.get_logger(__name__)


class Range(NamedTuple):
lower: int # lower bound, inclusive
upper: int # upper bound, exclusive

@property
def size(self):
return self.upper - self.lower

def split(self) -> Iterator[Range]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterator[Range] here is mostly for future proofing in case it makes sense to split into variable N values instead of a fixed N = 2.

if self.size < 2:
raise ValueError("cannot split range")

midpoint = self.lower + (self.upper - self.lower) // 2
return iter(
[
Range(self.lower, midpoint),
Range(midpoint, self.upper),
]
)


@dataclass
class BackfillQuery:
team_id: int
range: Range = Range(0, 2**64)

def split(self) -> Iterator[BackfillQuery]:
for chunk in self.range.split():
yield BackfillQuery(self.team_id, chunk)

def execute(self, dry_run: bool = False) -> None:
query = """
SELECT
team_id,
distinct_id,
pdi.person_id as person_id,
-1 as version -- overrides that come in via Kafka will overwrite this
FROM events
LEFT JOIN (
SELECT
distinct_id,
argMax(person_id, version) as person_id
FROM person_distinct_id2
WHERE
team_id = %(team_id)s
AND %(range_lower)s <= cityHash64(distinct_id)
AND cityHash64(distinct_id) < %(range_upper)s
GROUP BY ALL
) pdi
ON pdi.distinct_id = events.distinct_id
WHERE
team_id = %(team_id)s
and %(range_lower)s <= cityHash64(distinct_id)
and cityHash64(distinct_id) < %(range_upper)s
and events.person_id != pdi.person_id
GROUP BY ALL
"""

parameters = {
"team_id": self.team_id,
"range_lower": self.range.lower,
"range_upper": self.range.upper,
}

if dry_run:
[(count,)] = sync_execute(f"SELECT count() FROM ({query})", parameters)
logger.info("%r would have inserted %r records.", self, count)
else:
# XXX: Nothing useful to report here, unfortunately... all that is
# returned is an empty result set.
Comment on lines +86 to +87
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😾

sync_execute(
f"""
INSERT INTO person_distinct_id_overrides
(team_id, distinct_id, person_id, version)
{query}
""",
parameters,
)


def execute_backfill(query: BackfillQuery, dry_run: bool = False) -> None:
logger.info(f"Executing %r...", query)
try:
query.execute(dry_run=dry_run)
except ServerException as e:
if e.code not in {ErrorCodes.TOO_SLOW, ErrorCodes.TOO_MANY_ROWS}:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this set is exhaustive — just going based on what I saw when poking around doing the equivalent of the dry run query a few weeks ago.

raise e
logger.warn(f"Caught %s when running %r! Trying smaller ranges...", e, query)
for chunk in query.split():
execute_backfill(chunk)
Comment on lines +100 to +107
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's probably a kinder and more intelligent way to do this than just recursively dividing the keyspace into smaller chunks until it doesn't error, but this seems good enough for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some ad hoc testing, I think it might be more effective to try and set a target size for the right hand side table, and split into how ever many chunks are needed to achieve that target size by using the value of count() / target_size with the presumption that cityHash64(distinct_id) is uniformly distributed. I'm not sure what that target size should be though: figuring that out might require some numberwanging.

As implemented, this has the drawback that queries can either be too slow because the right hand side table is too big (the join is too complex) or because the left hand side table requires scanning too many rows. Taking care of the size of the left hand side likely means breaking down the job by partitions when the size is unwieldily large.

Going to put this back as a draft while I look into doing that.

else:
logger.info("Successfully executed %r.", query)
Comment on lines +98 to +109
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be part of BackfillQuery.execute, or even some other higher level class — there didn't seem like an obvious best place to put it right now, so I just left it kind of dangling here.



class Command(BaseCommand):
help = "Backfill the person_distinct_id_overrides for a team."

def add_arguments(self, parser):
parser.add_argument("--team-id", required=True, type=int, help="team to backfill for")
parser.add_argument("--live-run", action="store_true", help="execute INSERT queries (default is dry-run)")

def handle(self, *, team_id: int, live_run: bool, **options):
logger.setLevel(logging.INFO)

if not Team.objects.filter(id=team_id).exists():
raise CommandError(f"Team with id={team_id!r} does not exist")

execute_backfill(BackfillQuery(team_id), dry_run=not live_run)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll eventually want to store if/when a team was backfilled, but that can wait until later once we've got a sense of whether or not this is a viable approach in general.

We'll also likely eventually want to be able to provide a cursor value (a Range) to resume backfilling in case of interruption.

Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import itertools
import uuid
from unittest import mock

from clickhouse_driver.client import Client
from clickhouse_driver.errors import ErrorCodes, ServerException
from posthog.client import sync_execute
from posthog.management.commands.backfill_distinct_id_overrides import BackfillQuery, execute_backfill
from posthog.models.person.util import create_person_distinct_id
from posthog.test.base import BaseTest, ClickhouseTestMixin, _create_event, flush_persons_and_events


class ExecuteBackfillTestCase(ClickhouseTestMixin, BaseTest):
def __run_test_backfill(self, dry_run: bool) -> None:
distinct_ids_to_person_id = {
"no-override": uuid.uuid4(),
"needs-override": uuid.uuid4(),
}

for _ in range(3):
_create_event(
team=self.team,
event="invalid",
distinct_id="no-override",
person_id=distinct_ids_to_person_id["no-override"], # keep correct person id
)

for _ in range(3):
_create_event(
team=self.team,
event="invalid",
distinct_id="needs-override",
person_id=str(uuid.uuid4()), # mismatched value causes a backfill row
)

flush_persons_and_events()

for distinct_id, person_id in distinct_ids_to_person_id.items():
create_person_distinct_id(
team_id=self.team.pk,
distinct_id=distinct_id,
person_id=str(person_id),
version=1,
)
Comment on lines +38 to +44
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to detach the materialized view once #20349 is ready, otherwise test this will start failing or flaking because the version 1 written to overrides will override the version -1.


execute_backfill(BackfillQuery(self.team.id), dry_run=dry_run)

backfill_rows = sync_execute(
"""
SELECT distinct_id, person_id, version
FROM person_distinct_id_overrides
WHERE team_id = %(team_id)s
""",
{"team_id": self.team.id},
)

assert backfill_rows == (
[("needs-override", distinct_ids_to_person_id["needs-override"], -1)] if not dry_run else []
)

def test_execute_backfill(self) -> None:
self.__run_test_backfill(dry_run=False)

def test_execute_backfill_dry_run(self) -> None:
self.__run_test_backfill(dry_run=True)
Comment on lines +61 to +65
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem like pytest.mark.parametrize works on methods, just plain functions? (… at least I couldn't figure it out quickly.)


def test_execute_backfill_split_query(self) -> None:
with mock.patch.object(Client, "execute") as mock_execute:
mock_execute.side_effect = itertools.chain(
[ServerException("(error)", code=ErrorCodes.TOO_MANY_ROWS)],
itertools.repeat(mock.DEFAULT),
)
execute_backfill(BackfillQuery(self.team.id))
assert mock_execute.call_count == 3 # initial query (error), then two queries after splitting
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty clumsy but does the important thing. (I guess if I also mocked execute_backfill I could validate the correct split ranges were being passed, which would help here…)

Loading