Skip to content

Commit

Permalink
fix: Make timeouts variable according to interval (#21467)
Browse files Browse the repository at this point in the history
* fix: Make timeouts variable according to interval

* chore: Bump maximum attempts and initial retry interval

* fix: Pass the interval along to the execute function

* feat: Remove multiplier

We'll deal with timeouts in higher frequency batch exports.

* refactor: Add a 10 min floor to timeout

* fix: Test event generation function now batches inserts to clickhouse

* fix: Add missing type hint

* Update query snapshots

* Update query snapshots

* fix: Account for tests that rely on event name generation

* Update query snapshots

* fix: Event generation in test

* chore: Bump batch size

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
tomasfarias and github-actions[bot] authored Apr 23, 2024
1 parent 6183f88 commit 2f6344a
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 22 deletions.
22 changes: 17 additions & 5 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,10 +593,10 @@ async def execute_batch_export_insert_activity(
inputs,
non_retryable_error_types: list[str],
finish_inputs: FinishBatchExportRunInputs,
start_to_close_timeout_seconds: int = 3600,
interval: str,
heartbeat_timeout_seconds: int | None = 120,
maximum_attempts: int = 10,
initial_retry_interval_seconds: int = 10,
maximum_attempts: int = 15,
initial_retry_interval_seconds: int = 30,
maximum_retry_interval_seconds: int = 120,
) -> None:
"""Execute the main insert activity of a batch export handling any errors.
Expand All @@ -610,7 +610,7 @@ async def execute_batch_export_insert_activity(
inputs: The inputs to the activity.
non_retryable_error_types: A list of errors to not retry on when executing the activity.
finish_inputs: Inputs to the 'finish_batch_export_run' to run at the end.
start_to_close_timeout: A timeout for the 'insert_into_*' activity function.
interval: The interval of the batch export used to set the start to close timeout.
maximum_attempts: Maximum number of retries for the 'insert_into_*' activity function.
Assuming the error that triggered the retry is not in non_retryable_error_types.
initial_retry_interval_seconds: When retrying, seconds until the first retry.
Expand All @@ -624,11 +624,23 @@ async def execute_batch_export_insert_activity(
non_retryable_error_types=non_retryable_error_types,
)

if interval == "hour":
start_to_close_timeout = dt.timedelta(hours=1)
elif interval == "day":
start_to_close_timeout = dt.timedelta(days=1)
elif interval.startswith("every"):
_, value, unit = interval.split(" ")
kwargs = {unit: int(value)}
# TODO: Consider removing this 10 minute minimum once we are more confident about hitting 5 minute or lower SLAs.
start_to_close_timeout = max(dt.timedelta(minutes=10), dt.timedelta(**kwargs))
else:
raise ValueError(f"Unsupported interval: '{interval}'")

try:
records_completed = await workflow.execute_activity(
activity,
inputs,
start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds),
start_to_close_timeout=start_to_close_timeout,
heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None,
retry_policy=retry_policy,
)
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ async def run(self, inputs: BigQueryBatchExportInputs):
await execute_batch_export_insert_activity(
insert_into_bigquery_activity,
insert_inputs,
interval=inputs.interval,
non_retryable_error_types=[
# Raised on missing permissions.
"Forbidden",
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/batch_exports/http_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ async def run(self, inputs: HttpBatchExportInputs):
await execute_batch_export_insert_activity(
insert_into_http_activity,
insert_inputs,
interval=inputs.interval,
non_retryable_error_types=[
"NonRetryableResponseError",
],
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ async def run(self, inputs: PostgresBatchExportInputs):
await execute_batch_export_insert_activity(
insert_into_postgres_activity,
insert_inputs,
interval=inputs.interval,
non_retryable_error_types=[
# Raised on errors that are related to database operation.
# For example: unexpected disconnect, database or other object not found.
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ async def run(self, inputs: RedshiftBatchExportInputs):
await execute_batch_export_insert_activity(
insert_into_redshift_activity,
insert_inputs,
interval=inputs.interval,
non_retryable_error_types=[
# Raised on errors that are related to database operation.
# For example: unexpected disconnect, database or other object not found.
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ async def run(self, inputs: S3BatchExportInputs):
await execute_batch_export_insert_activity(
insert_into_s3_activity,
insert_inputs,
interval=inputs.interval,
non_retryable_error_types=[
# S3 parameter validation failed.
"ParamValidationError",
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/batch_exports/snowflake_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ async def run(self, inputs: SnowflakeBatchExportInputs):
await execute_batch_export_insert_activity(
insert_into_snowflake_activity,
insert_inputs,
interval=inputs.interval,
non_retryable_error_types=[
# Raised when we cannot connect to Snowflake.
"DatabaseError",
Expand Down
41 changes: 24 additions & 17 deletions posthog/temporal/tests/utils/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def generate_test_events(
site_url: str | None = "",
set_field: dict | None = None,
set_once: dict | None = None,
start: int = 0,
):
"""Generate a list of events for testing."""
_timestamp = random.choice(possible_datetimes)
Expand Down Expand Up @@ -77,7 +78,7 @@ def generate_test_events(
"set": set_field,
"set_once": set_once,
}
for i in range(count)
for i in range(start, count + start)
]

return events
Expand Down Expand Up @@ -138,6 +139,7 @@ async def generate_test_events_in_clickhouse(
person_properties: dict | None = None,
inserted_at: str | dt.datetime | None = "_timestamp",
duplicate: bool = False,
batch_size: int = 10000,
) -> tuple[list[EventValues], list[EventValues], list[EventValues]]:
"""Insert test events into the sharded_events table.
Expand Down Expand Up @@ -165,20 +167,27 @@ async def generate_test_events_in_clickhouse(
possible_datetimes = list(date_range(start_time, end_time, dt.timedelta(minutes=1)))

# Base events
events = generate_test_events(
count=count,
team_id=team_id,
possible_datetimes=possible_datetimes,
event_name=event_name,
properties=properties,
person_properties=person_properties,
inserted_at=inserted_at,
)
events: list[EventValues] = []
while len(events) < count:
events_to_insert = generate_test_events(
count=min(count - len(events), batch_size),
team_id=team_id,
possible_datetimes=possible_datetimes,
event_name=event_name,
properties=properties,
person_properties=person_properties,
inserted_at=inserted_at,
start=len(events),
)

# Add duplicates if required
duplicate_events = []
if duplicate is True:
duplicate_events = events_to_insert

# Add duplicates if required
duplicate_events = []
if duplicate is True:
duplicate_events = events
await insert_event_values_in_clickhouse(client=client, events=events_to_insert + duplicate_events)

events.extend(events_to_insert)

# Events outside original date range
delta = end_time - start_time
Expand Down Expand Up @@ -207,7 +216,5 @@ async def generate_test_events_in_clickhouse(
inserted_at=inserted_at,
)

await insert_event_values_in_clickhouse(
client=client, events=events + events_outside_range + events_from_other_team + duplicate_events
)
await insert_event_values_in_clickhouse(client=client, events=events_outside_range + events_from_other_team)
return (events, events_outside_range, events_from_other_team)

0 comments on commit 2f6344a

Please sign in to comment.