diff --git a/posthog/temporal/batch_exports/heartbeat.py b/posthog/temporal/batch_exports/heartbeat.py index fdd21d0613eee..f0d3227a9f655 100644 --- a/posthog/temporal/batch_exports/heartbeat.py +++ b/posthog/temporal/batch_exports/heartbeat.py @@ -1,14 +1,14 @@ -import typing -import datetime as dt import collections.abc import dataclasses +import datetime as dt +import typing import structlog from posthog.temporal.common.heartbeat import ( + EmptyHeartbeatError, HeartbeatDetails, HeartbeatParseError, - EmptyHeartbeatError, NotEnoughHeartbeatValuesError, ) @@ -99,9 +99,9 @@ def track_done_range( done_range = (data_interval_start, done_range[1]) - self.insert_done_range(done_range, merge=merge) + self.insert_done_range(done_range) - def insert_done_range(self, done_range: DateRange, merge: bool = True): + def insert_done_range(self, done_range: DateRange): """Insert a date range into `self.done_ranges` in order.""" for index, range in enumerate(self.done_ranges, start=0): if done_range[0] > range[1]: @@ -117,35 +117,6 @@ def insert_done_range(self, done_range: DateRange, merge: bool = True): # Date range should go at the end self.done_ranges.append(done_range) - if merge: - self.merge_done_ranges() - - def merge_done_ranges(self): - """Merge as many date ranges together as possible in `self.done_ranges`. - - This method looks for ranges whose opposite ends are touching and merges - them together. Notice that this method does not have enough information - to merge ranges that are not touching. - """ - marked_for_deletion = set() - for index, range in enumerate(self.done_ranges, start=0): - if index in marked_for_deletion: - continue - try: - next_range = self.done_ranges[index + 1] - except IndexError: - continue - - if next_range[0] == range[1]: - # Touching start of next range with end of range. - # End of next range set as end of existing range. - # Next range marked for deletion as it's now covered by range. - self.done_ranges[index] = (range[0], next_range[1]) - marked_for_deletion.add(index + 1) - - for index in marked_for_deletion: - self.done_ranges.pop(index) - def complete_done_ranges(self, data_interval_end_input: str | dt.datetime): """Complete the entire range covered by the batch export. diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index 3b02efddb5a0b..40778b5e097d5 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -53,6 +53,23 @@ from posthog.temporal.common.heartbeat import Heartbeater from posthog.temporal.common.logger import configure_temporal_worker_logger +NON_RETRYABLE_ERROR_TYPES = [ + # Raised on errors that are related to database operation. + # For example: unexpected disconnect, database or other object not found. + "OperationalError", + # The schema name provided is invalid (usually because it doesn't exist). + "InvalidSchemaName", + # Missing permissions to, e.g., insert into table. + "InsufficientPrivilege", + # A column, usually properties, exceeds the limit for a VARCHAR field, + # usually the max of 65535 bytes + "StringDataRightTruncation", + # Raised by our PostgreSQL client when failing to connect after several attempts. + "PostgreSQLConnectionError", + # Column missing in Redshift, likely the schema was altered. + "UndefinedColumn", +] + def remove_escaped_whitespace_recursive(value): """Remove all escaped whitespace characters from given value. @@ -593,7 +610,7 @@ async def record_generator() -> ( ) return else: - await asyncio.sleep(0.1) + await asyncio.sleep(0) continue for record in record_batch.to_pylist(): @@ -702,21 +719,6 @@ async def run(self, inputs: RedshiftBatchExportInputs): insert_into_redshift_activity, insert_inputs, interval=inputs.interval, - non_retryable_error_types=[ - # Raised on errors that are related to database operation. - # For example: unexpected disconnect, database or other object not found. - "OperationalError", - # The schema name provided is invalid (usually because it doesn't exist). - "InvalidSchemaName", - # Missing permissions to, e.g., insert into table. - "InsufficientPrivilege", - # A column, usually properties, exceeds the limit for a VARCHAR field, - # usually the max of 65535 bytes - "StringDataRightTruncation", - # Raised by our PostgreSQL client when failing to connect after several attempts. - "PostgreSQLConnectionError", - # Column missing in Redshift, likely the schema was altered. - "UndefinedColumn", - ], + non_retryable_error_types=NON_RETRYABLE_ERROR_TYPES, finish_inputs=finish_inputs, ) diff --git a/posthog/temporal/tests/batch_exports/test_heartbeat.py b/posthog/temporal/tests/batch_exports/test_heartbeat.py index d09863befa5c3..054b4d11b3f49 100644 --- a/posthog/temporal/tests/batch_exports/test_heartbeat.py +++ b/posthog/temporal/tests/batch_exports/test_heartbeat.py @@ -53,52 +53,7 @@ def test_insert_done_range(initial_done_ranges, done_range, expected_index): """ heartbeat_details = BatchExportRangeHeartbeatDetails() heartbeat_details.done_ranges.extend(initial_done_ranges) - heartbeat_details.insert_done_range(done_range, merge=False) + heartbeat_details.insert_done_range(done_range) assert len(heartbeat_details.done_ranges) == len(initial_done_ranges) + 1 assert heartbeat_details.done_ranges.index(done_range) == expected_index - - -@pytest.mark.parametrize( - "initial_done_ranges,expected_done_ranges", - [ - # Case 1: Disconnected ranges are not merged. - ( - [ - (dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(5)), - (dt.datetime.fromtimestamp(6), dt.datetime.fromtimestamp(10)), - ], - [ - (dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(5)), - (dt.datetime.fromtimestamp(6), dt.datetime.fromtimestamp(10)), - ], - ), - # Case 2: Connected ranges are merged. - ( - [ - (dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(5)), - (dt.datetime.fromtimestamp(5), dt.datetime.fromtimestamp(10)), - ], - [(dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(10))], - ), - # Case 3: Connected ranges are merged, but disconnected are not. - ( - [ - (dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(5)), - (dt.datetime.fromtimestamp(5), dt.datetime.fromtimestamp(10)), - (dt.datetime.fromtimestamp(11), dt.datetime.fromtimestamp(12)), - ], - [ - (dt.datetime.fromtimestamp(0), dt.datetime.fromtimestamp(10)), - (dt.datetime.fromtimestamp(11), dt.datetime.fromtimestamp(12)), - ], - ), - ], -) -def test_merge_done_ranges(initial_done_ranges, expected_done_ranges): - """Test `BatchExportRangeHeartbeatDetails` merges done ranges.""" - heartbeat_details = BatchExportRangeHeartbeatDetails() - heartbeat_details.done_ranges.extend(initial_done_ranges) - heartbeat_details.merge_done_ranges() - - assert heartbeat_details.done_ranges == expected_done_ranges