From 2f6344ab776ba93ae7afd30a5b085d67ef1f0f41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 23 Apr 2024 09:51:38 +0200 Subject: [PATCH] fix: Make timeouts variable according to interval (#21467) * fix: Make timeouts variable according to interval * chore: Bump maximum attempts and initial retry interval * fix: Pass the interval along to the execute function * feat: Remove multiplier We'll deal with timeouts in higher frequency batch exports. * refactor: Add a 10 min floor to timeout * fix: Test event generation function now batches inserts to clickhouse * fix: Add missing type hint * Update query snapshots * Update query snapshots * fix: Account for tests that rely on event name generation * Update query snapshots * fix: Event generation in test * chore: Bump batch size --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- .../temporal/batch_exports/batch_exports.py | 22 +++++++--- .../batch_exports/bigquery_batch_export.py | 1 + .../batch_exports/http_batch_export.py | 1 + .../batch_exports/postgres_batch_export.py | 1 + .../batch_exports/redshift_batch_export.py | 1 + .../temporal/batch_exports/s3_batch_export.py | 1 + .../batch_exports/snowflake_batch_export.py | 1 + posthog/temporal/tests/utils/events.py | 41 +++++++++++-------- 8 files changed, 47 insertions(+), 22 deletions(-) diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 66279ccd7183e..c522a75bce2c5 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -593,10 +593,10 @@ async def execute_batch_export_insert_activity( inputs, non_retryable_error_types: list[str], finish_inputs: FinishBatchExportRunInputs, - start_to_close_timeout_seconds: int = 3600, + interval: str, heartbeat_timeout_seconds: int | None = 120, - maximum_attempts: int = 10, - initial_retry_interval_seconds: int = 10, + maximum_attempts: int = 15, + initial_retry_interval_seconds: int = 30, maximum_retry_interval_seconds: int = 120, ) -> None: """Execute the main insert activity of a batch export handling any errors. @@ -610,7 +610,7 @@ async def execute_batch_export_insert_activity( inputs: The inputs to the activity. non_retryable_error_types: A list of errors to not retry on when executing the activity. finish_inputs: Inputs to the 'finish_batch_export_run' to run at the end. - start_to_close_timeout: A timeout for the 'insert_into_*' activity function. + interval: The interval of the batch export used to set the start to close timeout. maximum_attempts: Maximum number of retries for the 'insert_into_*' activity function. Assuming the error that triggered the retry is not in non_retryable_error_types. initial_retry_interval_seconds: When retrying, seconds until the first retry. @@ -624,11 +624,23 @@ async def execute_batch_export_insert_activity( non_retryable_error_types=non_retryable_error_types, ) + if interval == "hour": + start_to_close_timeout = dt.timedelta(hours=1) + elif interval == "day": + start_to_close_timeout = dt.timedelta(days=1) + elif interval.startswith("every"): + _, value, unit = interval.split(" ") + kwargs = {unit: int(value)} + # TODO: Consider removing this 10 minute minimum once we are more confident about hitting 5 minute or lower SLAs. + start_to_close_timeout = max(dt.timedelta(minutes=10), dt.timedelta(**kwargs)) + else: + raise ValueError(f"Unsupported interval: '{interval}'") + try: records_completed = await workflow.execute_activity( activity, inputs, - start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds), + start_to_close_timeout=start_to_close_timeout, heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None, retry_policy=retry_policy, ) diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index 93a2e522e1e7f..9e81aafe13883 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -432,6 +432,7 @@ async def run(self, inputs: BigQueryBatchExportInputs): await execute_batch_export_insert_activity( insert_into_bigquery_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ # Raised on missing permissions. "Forbidden", diff --git a/posthog/temporal/batch_exports/http_batch_export.py b/posthog/temporal/batch_exports/http_batch_export.py index f86703f3cf792..623cc53bed622 100644 --- a/posthog/temporal/batch_exports/http_batch_export.py +++ b/posthog/temporal/batch_exports/http_batch_export.py @@ -373,6 +373,7 @@ async def run(self, inputs: HttpBatchExportInputs): await execute_batch_export_insert_activity( insert_into_http_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ "NonRetryableResponseError", ], diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index 6281862a72f21..6ebede565bc35 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -439,6 +439,7 @@ async def run(self, inputs: PostgresBatchExportInputs): await execute_batch_export_insert_activity( insert_into_postgres_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ # Raised on errors that are related to database operation. # For example: unexpected disconnect, database or other object not found. diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index e98fa9106c15f..61e5ad7c02655 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -469,6 +469,7 @@ async def run(self, inputs: RedshiftBatchExportInputs): await execute_batch_export_insert_activity( insert_into_redshift_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ # Raised on errors that are related to database operation. # For example: unexpected disconnect, database or other object not found. diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index febdac88b45cd..39a2755a72139 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -687,6 +687,7 @@ async def run(self, inputs: S3BatchExportInputs): await execute_batch_export_insert_activity( insert_into_s3_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ # S3 parameter validation failed. "ParamValidationError", diff --git a/posthog/temporal/batch_exports/snowflake_batch_export.py b/posthog/temporal/batch_exports/snowflake_batch_export.py index 2d782c1f94d5c..c769862af96f1 100644 --- a/posthog/temporal/batch_exports/snowflake_batch_export.py +++ b/posthog/temporal/batch_exports/snowflake_batch_export.py @@ -631,6 +631,7 @@ async def run(self, inputs: SnowflakeBatchExportInputs): await execute_batch_export_insert_activity( insert_into_snowflake_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ # Raised when we cannot connect to Snowflake. "DatabaseError", diff --git a/posthog/temporal/tests/utils/events.py b/posthog/temporal/tests/utils/events.py index 71ce7f7f61615..ce48257381801 100644 --- a/posthog/temporal/tests/utils/events.py +++ b/posthog/temporal/tests/utils/events.py @@ -44,6 +44,7 @@ def generate_test_events( site_url: str | None = "", set_field: dict | None = None, set_once: dict | None = None, + start: int = 0, ): """Generate a list of events for testing.""" _timestamp = random.choice(possible_datetimes) @@ -77,7 +78,7 @@ def generate_test_events( "set": set_field, "set_once": set_once, } - for i in range(count) + for i in range(start, count + start) ] return events @@ -138,6 +139,7 @@ async def generate_test_events_in_clickhouse( person_properties: dict | None = None, inserted_at: str | dt.datetime | None = "_timestamp", duplicate: bool = False, + batch_size: int = 10000, ) -> tuple[list[EventValues], list[EventValues], list[EventValues]]: """Insert test events into the sharded_events table. @@ -165,20 +167,27 @@ async def generate_test_events_in_clickhouse( possible_datetimes = list(date_range(start_time, end_time, dt.timedelta(minutes=1))) # Base events - events = generate_test_events( - count=count, - team_id=team_id, - possible_datetimes=possible_datetimes, - event_name=event_name, - properties=properties, - person_properties=person_properties, - inserted_at=inserted_at, - ) + events: list[EventValues] = [] + while len(events) < count: + events_to_insert = generate_test_events( + count=min(count - len(events), batch_size), + team_id=team_id, + possible_datetimes=possible_datetimes, + event_name=event_name, + properties=properties, + person_properties=person_properties, + inserted_at=inserted_at, + start=len(events), + ) + + # Add duplicates if required + duplicate_events = [] + if duplicate is True: + duplicate_events = events_to_insert - # Add duplicates if required - duplicate_events = [] - if duplicate is True: - duplicate_events = events + await insert_event_values_in_clickhouse(client=client, events=events_to_insert + duplicate_events) + + events.extend(events_to_insert) # Events outside original date range delta = end_time - start_time @@ -207,7 +216,5 @@ async def generate_test_events_in_clickhouse( inserted_at=inserted_at, ) - await insert_event_values_in_clickhouse( - client=client, events=events + events_outside_range + events_from_other_team + duplicate_events - ) + await insert_event_values_in_clickhouse(client=client, events=events_outside_range + events_from_other_team) return (events, events_outside_range, events_from_other_team)