-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement person overrides backfill task with adaptive query range selection. #20495
Conversation
def size(self): | ||
return self.upper - self.lower | ||
|
||
def split(self) -> Iterator[Range]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iterator[Range]
here is mostly for future proofing in case it makes sense to split into variable N values instead of a fixed N = 2.
for distinct_id, person_id in distinct_ids_to_person_id.items(): | ||
create_person_distinct_id( | ||
team_id=self.team.pk, | ||
distinct_id=distinct_id, | ||
person_id=str(person_id), | ||
version=1, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will need to detach the materialized view once #20349 is ready, otherwise test this will start failing or flaking because the version 1 written to overrides will override the version -1.
if not Team.objects.filter(id=team_id).exists(): | ||
raise CommandError(f"Team with id={team_id!r} does not exist") | ||
|
||
execute_backfill(BackfillQuery(team_id), dry_run=not live_run) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll eventually want to store if/when a team was backfilled, but that can wait until later once we've got a sense of whether or not this is a viable approach in general.
We'll also likely eventually want to be able to provide a cursor value (a Range
) to resume backfilling in case of interruption.
def execute_backfill(query: BackfillQuery, dry_run: bool = False) -> None: | ||
logger.info(f"Executing %r...", query) | ||
try: | ||
query.execute(dry_run=dry_run) | ||
except ServerException as e: | ||
if e.code not in {ErrorCodes.TOO_SLOW, ErrorCodes.TOO_MANY_ROWS}: | ||
raise e | ||
logger.warn(f"Caught %s when running %r! Trying smaller ranges...", e, query) | ||
for chunk in query.split(): | ||
execute_backfill(chunk) | ||
else: | ||
logger.info("Successfully executed %r.", query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be part of BackfillQuery.execute
, or even some other higher level class — there didn't seem like an obvious best place to put it right now, so I just left it kind of dangling here.
try: | ||
query.execute(dry_run=dry_run) | ||
except ServerException as e: | ||
if e.code not in {ErrorCodes.TOO_SLOW, ErrorCodes.TOO_MANY_ROWS}: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this set is exhaustive — just going based on what I saw when poking around doing the equivalent of the dry run query a few weeks ago.
itertools.repeat(mock.DEFAULT), | ||
) | ||
execute_backfill(BackfillQuery(self.team.id)) | ||
assert mock_execute.call_count == 3 # initial query (error), then two queries after splitting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty clumsy but does the important thing. (I guess if I also mocked execute_backfill
I could validate the correct split ranges were being passed, which would help here…)
def test_execute_backfill(self) -> None: | ||
self.__run_test_backfill(dry_run=False) | ||
|
||
def test_execute_backfill_dry_run(self) -> None: | ||
self.__run_test_backfill(dry_run=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem like pytest.mark.parametrize
works on methods, just plain functions? (… at least I couldn't figure it out quickly.)
# XXX: Nothing useful to report here, unfortunately... all that is | ||
# returned is an empty result set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😾
try: | ||
query.execute(dry_run=dry_run) | ||
except ServerException as e: | ||
if e.code not in {ErrorCodes.TOO_SLOW, ErrorCodes.TOO_MANY_ROWS}: | ||
raise e | ||
logger.warn(f"Caught %s when running %r! Trying smaller ranges...", e, query) | ||
for chunk in query.split(): | ||
execute_backfill(chunk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's probably a kinder and more intelligent way to do this than just recursively dividing the keyspace into smaller chunks until it doesn't error, but this seems good enough for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some ad hoc testing, I think it might be more effective to try and set a target size for the right hand side table, and split into how ever many chunks are needed to achieve that target size by using the value of count() / target_size
with the presumption that cityHash64(distinct_id)
is uniformly distributed. I'm not sure what that target size should be though: figuring that out might require some numberwanging.
As implemented, this has the drawback that queries can either be too slow because the right hand side table is too big (the join is too complex) or because the left hand side table requires scanning too many rows. Taking care of the size of the left hand side likely means breaking down the job by partitions when the size is unwieldily large.
Going to put this back as a draft while I look into doing that.
is this still a draft or would you like us to review? |
It was still a draft — but since we'll likely change direction to simplify even further (based on this conversation), I'm just going to close this. |
(Superseded by #20562.) |
Problem
This adds a basic management command for backfilling the
person_distinct_id_overrides
table in ClickHouse for any events where the row'sperson_id
column does not match the latest stored value in theperson_distinct_id2
table.This approach is based on the general design described in the (internal) RFC with a couple of minor changes.
This will likely need some iteration for optimization and robustness, but should be enough to give us a good sense of how realistic this method of backfilling will be.
(This is part of #20460, obviously.)
How did you test this code?
Added tests to cover the basic functionality.