diff --git a/posthog/temporal/batch_exports/heartbeat.py b/posthog/temporal/batch_exports/heartbeat.py index fdd21d0613eee..f0d3227a9f655 100644 --- a/posthog/temporal/batch_exports/heartbeat.py +++ b/posthog/temporal/batch_exports/heartbeat.py @@ -1,14 +1,14 @@ -import typing -import datetime as dt import collections.abc import dataclasses +import datetime as dt +import typing import structlog from posthog.temporal.common.heartbeat import ( + EmptyHeartbeatError, HeartbeatDetails, HeartbeatParseError, - EmptyHeartbeatError, NotEnoughHeartbeatValuesError, ) @@ -99,9 +99,9 @@ def track_done_range( done_range = (data_interval_start, done_range[1]) - self.insert_done_range(done_range, merge=merge) + self.insert_done_range(done_range) - def insert_done_range(self, done_range: DateRange, merge: bool = True): + def insert_done_range(self, done_range: DateRange): """Insert a date range into `self.done_ranges` in order.""" for index, range in enumerate(self.done_ranges, start=0): if done_range[0] > range[1]: @@ -117,35 +117,6 @@ def insert_done_range(self, done_range: DateRange, merge: bool = True): # Date range should go at the end self.done_ranges.append(done_range) - if merge: - self.merge_done_ranges() - - def merge_done_ranges(self): - """Merge as many date ranges together as possible in `self.done_ranges`. - - This method looks for ranges whose opposite ends are touching and merges - them together. Notice that this method does not have enough information - to merge ranges that are not touching. - """ - marked_for_deletion = set() - for index, range in enumerate(self.done_ranges, start=0): - if index in marked_for_deletion: - continue - try: - next_range = self.done_ranges[index + 1] - except IndexError: - continue - - if next_range[0] == range[1]: - # Touching start of next range with end of range. - # End of next range set as end of existing range. - # Next range marked for deletion as it's now covered by range. - self.done_ranges[index] = (range[0], next_range[1]) - marked_for_deletion.add(index + 1) - - for index in marked_for_deletion: - self.done_ranges.pop(index) - def complete_done_ranges(self, data_interval_end_input: str | dt.datetime): """Complete the entire range covered by the batch export. diff --git a/posthog/temporal/tests/batch_exports/test_heartbeat.py b/posthog/temporal/tests/batch_exports/test_heartbeat.py index d09863befa5c3..054b4d11b3f49 100644 --- a/posthog/temporal/tests/batch_exports/test_heartbeat.py +++ b/posthog/temporal/tests/batch_exports/test_heartbeat.py @@ -53,52 +53,7 @@ def test_insert_done_range(initial_done_ranges, done_range, expected_index): """ heartbeat_details = BatchExportRangeHeartbeatDetails() heartbeat_details.done_ranges.extend(initial_done_ranges) - heartbeat_details.insert_done_range(done_range, merge=False) + heartbeat_details.insert_done_range(done_range) assert len(heartbeat_details.done_ranges) == len(initial_done_ranges) + 1 assert heartbeat_details.done_ranges.index(done_range) == expected_index - - -@pytest.mark.parametrize( - "initial_done_ranges,expected_done_ranges", - [ - # Case 1: Disconnected ranges are not merged. - ( - [ - (dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(5)), - (dt.datetime.fromtimestamp(6), dt.datetime.fromtimestamp(10)), - ], - [ - (dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(5)), - (dt.datetime.fromtimestamp(6), dt.datetime.fromtimestamp(10)), - ], - ), - # Case 2: Connected ranges are merged. - ( - [ - (dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(5)), - (dt.datetime.fromtimestamp(5), dt.datetime.fromtimestamp(10)), - ], - [(dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(10))], - ), - # Case 3: Connected ranges are merged, but disconnected are not. - ( - [ - (dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(5)), - (dt.datetime.fromtimestamp(5), dt.datetime.fromtimestamp(10)), - (dt.datetime.fromtimestamp(11), dt.datetime.fromtimestamp(12)), - ], - [ - (dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(10)), - (dt.datetime.fromtimestamp(11), dt.datetime.fromtimestamp(12)), - ], - ), - ], -) -def test_merge_done_ranges(initial_done_ranges, expected_done_ranges): - """Test `BatchExportRangeHeartbeatDetails` merges done ranges.""" - heartbeat_details = BatchExportRangeHeartbeatDetails() - heartbeat_details.done_ranges.extend(initial_done_ranges) - heartbeat_details.merge_done_ranges() - - assert heartbeat_details.done_ranges == expected_done_ranges