From 021a926adc55f04d01adcf0e902c84ab9abb0899 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Wed, 6 Mar 2024 09:07:33 -0700 Subject: [PATCH] =?UTF-8?q?feat(batch-export):=20allow=20batch=20export=20?= =?UTF-8?q?backfills=20with=20no=20end,=20which=20u=E2=80=A6=20(#20717)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- latest_migrations.manifest | 2 +- mypy-baseline.txt | 42 +++++----- posthog/api/test/batch_exports/test_pause.py | 1 + posthog/batch_exports/http.py | 6 +- posthog/batch_exports/models.py | 2 +- posthog/batch_exports/service.py | 31 +++++-- .../0395_alter_batchexportbackfill_end_at.py | 17 ++++ .../batch_exports/backfill_batch_export.py | 48 ++++++++--- .../temporal/batch_exports/batch_exports.py | 2 +- .../test_backfill_batch_export.py | 82 +++++++++++++++---- posthog/temporal/tests/utils/models.py | 15 +++- 11 files changed, 178 insertions(+), 70 deletions(-) create mode 100644 posthog/migrations/0395_alter_batchexportbackfill_end_at.py diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 73888cbff5227..9ade46300e425 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0394_organization_customer_trust_scores_and_more +posthog: 0395_alter_batchexportbackfill_end_at sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 174d617986a20..d4beb524d4186 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -4,13 +4,13 @@ posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmeth posthog/hogql/database/argmax.py:0: error: Argument "chain" to "Field" has incompatible type "list[str]"; expected "list[str | int]" [arg-type] posthog/hogql/database/argmax.py:0: note: "List" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance posthog/hogql/database/argmax.py:0: note: Consider using "Sequence" instead, which is covariant +posthog/hogql/database/schema/numbers.py:0: error: Incompatible types in assignment (expression has type "dict[str, IntegerDatabaseField]", variable has type "dict[str, FieldOrTable]") [assignment] +posthog/hogql/database/schema/numbers.py:0: note: "Dict" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance +posthog/hogql/database/schema/numbers.py:0: note: Consider using "Mapping" instead, which is covariant in the value type posthog/hogql/ast.py:0: error: Argument "chain" to "FieldTraverserType" has incompatible type "list[str]"; expected "list[str | int]" [arg-type] posthog/hogql/ast.py:0: note: "List" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance posthog/hogql/ast.py:0: note: Consider using "Sequence" instead, which is covariant posthog/hogql/ast.py:0: error: Incompatible return value type (got "bool | None", expected "bool") [return-value] -posthog/hogql/database/schema/numbers.py:0: error: Incompatible types in assignment (expression has type "dict[str, IntegerDatabaseField]", variable has type "dict[str, FieldOrTable]") [assignment] -posthog/hogql/database/schema/numbers.py:0: note: "Dict" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance -posthog/hogql/database/schema/numbers.py:0: note: Consider using "Mapping" instead, which is covariant in the value type posthog/hogql/visitor.py:0: error: Statement is unreachable [unreachable] posthog/hogql/visitor.py:0: error: Argument 1 to "visit" of "Visitor" has incompatible type "Type | None"; expected "AST" [arg-type] posthog/hogql/visitor.py:0: error: Argument 1 to "visit" of "Visitor" has incompatible type "Type | None"; expected "AST" [arg-type] @@ -174,7 +174,6 @@ posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item " posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "SelectUnionQuery" of "SelectQuery | SelectUnionQuery" has no attribute "select" [union-attr] posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "SelectUnionQuery" of "SelectQuery | SelectUnionQuery" has no attribute "group_by" [union-attr] posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "None" of "list[Expr] | Any | None" has no attribute "append" [union-attr] -posthog/batch_exports/service.py:0: error: Missing positional argument "end_at" in call to "backfill_export" [call-arg] ee/billing/billing_manager.py:0: error: TypedDict "CustomerInfo" has no key "available_product_features" [typeddict-item] ee/billing/billing_manager.py:0: note: Did you mean "available_features"? posthog/hogql/resolver.py:0: error: Argument 1 of "visit" is incompatible with supertype "Visitor"; supertype defines the argument type as "AST" [override] @@ -346,11 +345,6 @@ posthog/hogql/autocomplete.py:0: error: Unused "type: ignore" comment [unused-i posthog/hogql_queries/insights/trends/breakdown_values.py:0: error: Item "SelectUnionQuery" of "SelectQuery | SelectUnionQuery" has no attribute "select" [union-attr] posthog/hogql_queries/insights/trends/breakdown_values.py:0: error: Value of type "list[Any] | None" is not indexable [index] posthog/hogql_queries/sessions_timeline_query_runner.py:0: error: Statement is unreachable [unreachable] -posthog/hogql_queries/hogql_query_runner.py:0: error: Statement is unreachable [unreachable] -posthog/hogql_queries/hogql_query_runner.py:0: error: Argument "placeholders" to "parse_select" has incompatible type "dict[str, Constant] | None"; expected "dict[str, Expr] | None" [arg-type] -posthog/hogql_queries/hogql_query_runner.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "SelectQuery | SelectUnionQuery") [assignment] -posthog/hogql_queries/hogql_query_runner.py:0: error: Incompatible return value type (got "SelectQuery | SelectUnionQuery", expected "SelectQuery") [return-value] -posthog/hogql_queries/events_query_runner.py:0: error: Statement is unreachable [unreachable] posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown_type" [union-attr] posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown_histogram_bin_count" [union-attr] posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown_type" [union-attr] @@ -375,6 +369,11 @@ posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "Bre posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown" [union-attr] posthog/hogql_queries/insights/trends/breakdown.py:0: error: Argument "breakdown_field" to "get_properties_chain" has incompatible type "str | float | list[str | float] | Any | None"; expected "str" [arg-type] posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown_group_type_index" [union-attr] +posthog/hogql_queries/hogql_query_runner.py:0: error: Statement is unreachable [unreachable] +posthog/hogql_queries/hogql_query_runner.py:0: error: Argument "placeholders" to "parse_select" has incompatible type "dict[str, Constant] | None"; expected "dict[str, Expr] | None" [arg-type] +posthog/hogql_queries/hogql_query_runner.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "SelectQuery | SelectUnionQuery") [assignment] +posthog/hogql_queries/hogql_query_runner.py:0: error: Incompatible return value type (got "SelectQuery | SelectUnionQuery", expected "SelectQuery") [return-value] +posthog/hogql_queries/events_query_runner.py:0: error: Statement is unreachable [unreachable] posthog/hogql/metadata.py:0: error: Argument "metadata_source" to "translate_hogql" has incompatible type "SelectQuery | SelectUnionQuery"; expected "SelectQuery | None" [arg-type] posthog/hogql/metadata.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "SelectQuery | SelectUnionQuery") [assignment] posthog/queries/breakdown_props.py:0: error: Argument 1 to "translate_hogql" has incompatible type "str | int"; expected "str" [arg-type] @@ -386,6 +385,17 @@ posthog/api/person.py:0: error: Argument 1 to "loads" has incompatible type "str posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type] posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type] posthog/hogql_queries/web_analytics/web_analytics_query_runner.py:0: error: Argument 1 to "append" of "list" has incompatible type "EventPropertyFilter"; expected "Expr" [arg-type] +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Signature of "to_actors_query" incompatible with supertype "QueryRunner" [override] +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Superclass: +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self) -> SelectQuery | SelectUnionQuery +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Subclass: +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self, time_frame: str | int | None, series_index: int, breakdown_value: str | int | None = ..., compare: Compare | None = ...) -> SelectQuery | SelectUnionQuery +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Superclass: +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self) -> SelectQuery | SelectUnionQuery +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Subclass: +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self, time_frame: str | int | None, series_index: int, breakdown_value: str | int | None = ..., compare: Compare | None = ...) -> SelectQuery | SelectUnionQuery +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Statement is unreachable [unreachable] +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "_event_property" of "TrendsQueryRunner" has incompatible type "str | float | list[str | float] | None"; expected "str" [arg-type] posthog/hogql_queries/insights/retention_query_runner.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "Call") [assignment] posthog/hogql_queries/insights/retention_query_runner.py:0: error: Incompatible types in assignment (expression has type "Call", variable has type "Field") [assignment] posthog/hogql_queries/insights/retention_query_runner.py:0: error: Argument "select" to "SelectQuery" has incompatible type "list[Alias]"; expected "list[Expr]" [arg-type] @@ -410,17 +420,6 @@ posthog/hogql_queries/insights/lifecycle_query_runner.py:0: note: Consider using posthog/hogql_queries/insights/lifecycle_query_runner.py:0: error: Argument 1 to "sorted" has incompatible type "list[Any] | None"; expected "Iterable[Any]" [arg-type] posthog/hogql_queries/insights/lifecycle_query_runner.py:0: error: Item "SelectUnionQuery" of "SelectQuery | SelectUnionQuery" has no attribute "select_from" [union-attr] posthog/hogql_queries/insights/lifecycle_query_runner.py:0: error: Item "None" of "JoinExpr | Any | None" has no attribute "sample" [union-attr] -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Signature of "to_actors_query" incompatible with supertype "QueryRunner" [override] -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Superclass: -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self) -> SelectQuery | SelectUnionQuery -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Subclass: -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self, time_frame: str | int | None, series_index: int, breakdown_value: str | int | None = ..., compare: Compare | None = ...) -> SelectQuery | SelectUnionQuery -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Superclass: -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self) -> SelectQuery | SelectUnionQuery -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Subclass: -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self, time_frame: str | int | None, series_index: int, breakdown_value: str | int | None = ..., compare: Compare | None = ...) -> SelectQuery | SelectUnionQuery -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Statement is unreachable [unreachable] -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "_event_property" of "TrendsQueryRunner" has incompatible type "str | float | list[str | float] | None"; expected "str" [arg-type] posthog/hogql_queries/legacy_compatibility/process_insight.py:0: error: Incompatible types in assignment (expression has type "PathFilter", variable has type "RetentionFilter") [assignment] posthog/hogql_queries/legacy_compatibility/process_insight.py:0: error: Incompatible types in assignment (expression has type "StickinessFilter", variable has type "RetentionFilter") [assignment] posthog/hogql_queries/legacy_compatibility/process_insight.py:0: error: Incompatible types in assignment (expression has type "Filter", variable has type "RetentionFilter") [assignment] @@ -792,9 +791,6 @@ posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "last_uploaded_part_timestamp" [attr-defined] posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined] posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined] -posthog/temporal/tests/batch_exports/test_backfill_batch_export.py:0: error: Argument "name" to "acreate_batch_export" has incompatible type "object"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_backfill_batch_export.py:0: error: Argument "destination_data" to "acreate_batch_export" has incompatible type "object"; expected "dict[Any, Any]" [arg-type] -posthog/temporal/tests/batch_exports/test_backfill_batch_export.py:0: error: Argument "interval" to "acreate_batch_export" has incompatible type "object"; expected "str" [arg-type] posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: error: Incompatible types in assignment (expression has type "str | int", variable has type "int") [assignment] posthog/api/test/batch_exports/conftest.py:0: error: Argument "activities" to "ThreadedWorker" has incompatible type "list[function]"; expected "Sequence[Callable[..., Any]]" [arg-type] posthog/management/commands/test/test_create_batch_export_from_app.py:0: error: Incompatible return value type (got "dict[str, Collection[str]]", expected "dict[str, str]") [return-value] diff --git a/posthog/api/test/batch_exports/test_pause.py b/posthog/api/test/batch_exports/test_pause.py index 2c92a00af94ee..7db786347e09c 100644 --- a/posthog/api/test/batch_exports/test_pause.py +++ b/posthog/api/test/batch_exports/test_pause.py @@ -402,6 +402,7 @@ def test_unpause_can_trigger_a_backfill(client: HttpClient): mock_backfill.assert_called_once_with( ANY, batch_export["id"], + team.pk, start_at, end_at, ) diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index 9570f23c0902b..5e84d7f446b3d 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -29,6 +29,7 @@ BatchExportServiceError, BatchExportServiceRPCError, BatchExportServiceScheduleNotFound, + BatchExportWithNoEndNotAllowedError, backfill_export, batch_export_delete_schedule, cancel_running_batch_export_backfill, @@ -371,7 +372,10 @@ def backfill(self, request: request.Request, *args, **kwargs) -> response.Respon batch_export = self.get_object() temporal = sync_connect() - backfill_id = backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at) + try: + backfill_id = backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at) + except BatchExportWithNoEndNotAllowedError: + raise ValidationError("Backfilling a BatchExport with no end date is not allowed") return response.Response({"backfill_id": backfill_id}) diff --git a/posthog/batch_exports/models.py b/posthog/batch_exports/models.py index 9af64f1c2733d..35fa6e8ba4754 100644 --- a/posthog/batch_exports/models.py +++ b/posthog/batch_exports/models.py @@ -285,7 +285,7 @@ class Status(models.TextChoices): help_text="The BatchExport this backfill belongs to.", ) start_at: models.DateTimeField = models.DateTimeField(help_text="The start of the data interval.") - end_at: models.DateTimeField = models.DateTimeField(help_text="The end of the data interval.") + end_at: models.DateTimeField = models.DateTimeField(help_text="The end of the data interval.", null=True) status: models.CharField = models.CharField( choices=Status.choices, max_length=64, help_text="The status of this backfill." ) diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index ad0812031d2a7..4930665d13f6d 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -19,6 +19,7 @@ from posthog.batch_exports.models import ( BatchExport, BatchExportBackfill, + BatchExportDestination, BatchExportRun, ) from posthog.constants import BATCH_EXPORTS_TASK_QUEUE @@ -209,6 +210,10 @@ class BatchExportServiceRPCError(BatchExportServiceError): """Exception raised when the underlying Temporal RPC fails.""" +class BatchExportWithNoEndNotAllowedError(BatchExportServiceError): + """Exception raised when a BatchExport without an end_at is not allowed for a given destination.""" + + class BatchExportServiceScheduleNotFound(BatchExportServiceRPCError): """Exception raised when the underlying Temporal RPC fails because a schedule was not found.""" @@ -283,7 +288,7 @@ def unpause_batch_export( start_at = batch_export.last_paused_at end_at = batch_export.last_updated_at - backfill_export(temporal, batch_export_id, start_at, end_at) + backfill_export(temporal, batch_export_id, batch_export.team_id, start_at, end_at) def batch_export_delete_schedule(temporal: Client, schedule_id: str) -> None: @@ -316,7 +321,7 @@ class BackfillBatchExportInputs: team_id: int batch_export_id: str start_at: str - end_at: str + end_at: str | None buffer_limit: int = 1 wait_delay: float = 5.0 @@ -326,7 +331,7 @@ def backfill_export( batch_export_id: str, team_id: int, start_at: dt.datetime, - end_at: dt.datetime, + end_at: dt.datetime | None, ) -> str: """Starts a backfill for given team and batch export covering given date range. @@ -335,18 +340,26 @@ def backfill_export( batch_export_id: The id of the BatchExport to backfill. team_id: The id of the Team the BatchExport belongs to. start_at: From when to backfill. - end_at: Up to when to backfill. + end_at: Up to when to backfill, if None it will backfill until it has caught up with realtime + and then unpause the underlying BatchExport. """ try: - BatchExport.objects.get(id=batch_export_id, team_id=team_id) + batch_export = BatchExport.objects.select_related("destination").get(id=batch_export_id, team_id=team_id) except BatchExport.DoesNotExist: raise BatchExportIdError(batch_export_id) + # Ensure we don't allow users access to this feature until we are ready. + if not end_at and batch_export.destination.type not in ( + BatchExportDestination.Destination.HTTP, + BatchExportDestination.Destination.NOOP, # For tests. + ): + raise BatchExportWithNoEndNotAllowedError(f"BatchExport {batch_export_id} has no end_at and is not HTTP") + inputs = BackfillBatchExportInputs( batch_export_id=batch_export_id, team_id=team_id, start_at=start_at.isoformat(), - end_at=end_at.isoformat(), + end_at=end_at.isoformat() if end_at else None, ) workflow_id = start_backfill_batch_export_workflow(temporal, inputs=inputs) return workflow_id @@ -358,7 +371,7 @@ async def start_backfill_batch_export_workflow(temporal: Client, inputs: Backfil handle = temporal.get_schedule_handle(inputs.batch_export_id) description = await handle.describe() - if description.schedule.spec.jitter is not None: + if description.schedule.spec.jitter is not None and inputs.end_at is not None: # Adjust end_at to account for jitter if present. inputs.end_at = (dt.datetime.fromisoformat(inputs.end_at) + description.schedule.spec.jitter).isoformat() @@ -464,7 +477,7 @@ def create_batch_export_backfill( batch_export_id: UUID, team_id: int, start_at: str, - end_at: str, + end_at: str | None, status: str = BatchExportRun.Status.RUNNING, ) -> BatchExportBackfill: """Create a BatchExportBackfill. @@ -481,7 +494,7 @@ def create_batch_export_backfill( batch_export_id=batch_export_id, status=status, start_at=dt.datetime.fromisoformat(start_at), - end_at=dt.datetime.fromisoformat(end_at), + end_at=dt.datetime.fromisoformat(end_at) if end_at else None, team_id=team_id, ) backfill.save() diff --git a/posthog/migrations/0395_alter_batchexportbackfill_end_at.py b/posthog/migrations/0395_alter_batchexportbackfill_end_at.py new file mode 100644 index 0000000000000..b28d8dee0a5bc --- /dev/null +++ b/posthog/migrations/0395_alter_batchexportbackfill_end_at.py @@ -0,0 +1,17 @@ +# Generated by Django 4.1.13 on 2024-03-05 19:53 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0394_organization_customer_trust_scores_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="batchexportbackfill", + name="end_at", + field=models.DateTimeField(help_text="The end of the data interval.", null=True), + ), + ] diff --git a/posthog/temporal/batch_exports/backfill_batch_export.py b/posthog/temporal/batch_exports/backfill_batch_export.py index d6cbf2b553338..789857462c2c0 100644 --- a/posthog/temporal/batch_exports/backfill_batch_export.py +++ b/posthog/temporal/batch_exports/backfill_batch_export.py @@ -5,6 +5,7 @@ import json import typing +from asgiref.sync import sync_to_async import temporalio import temporalio.activity import temporalio.client @@ -14,7 +15,7 @@ from django.conf import settings from posthog.batch_exports.models import BatchExportBackfill -from posthog.batch_exports.service import BackfillBatchExportInputs +from posthog.batch_exports.service import BackfillBatchExportInputs, unpause_batch_export from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.batch_exports.batch_exports import ( CreateBatchExportBackfillInputs, @@ -37,6 +38,8 @@ class HeartbeatDetails(typing.NamedTuple): schedule_id: str start_at: str + # Note that this `end_at` is not optional, because heartbeats details describe the last concrete + # period of time we were waiting to backfill, and not the entire backfill job itself. end_at: str wait_start_at: str @@ -106,12 +109,18 @@ class BackfillScheduleInputs: schedule_id: str start_at: str - end_at: str + end_at: str | None frequency_seconds: float buffer_limit: int = 1 wait_delay: float = 5.0 +def get_utcnow(): + """Return the current time in UTC. This function is only required for mocking during tests, + because mocking the global datetime breaks Temporal.""" + return dt.datetime.now(dt.timezone.utc) + + @temporalio.activity.defn async def backfill_schedule(inputs: BackfillScheduleInputs) -> None: """Temporal Activity to backfill a Temporal Schedule. @@ -122,7 +131,7 @@ async def backfill_schedule(inputs: BackfillScheduleInputs) -> None: This activity heartbeats while waiting to allow cancelling an ongoing backfill. """ start_at = dt.datetime.fromisoformat(inputs.start_at) - end_at = dt.datetime.fromisoformat(inputs.end_at) + end_at = dt.datetime.fromisoformat(inputs.end_at) if inputs.end_at else None client = await connect( settings.TEMPORAL_HOST, @@ -163,7 +172,13 @@ async def backfill_schedule(inputs: BackfillScheduleInputs) -> None: full_backfill_range = backfill_range(start_at, end_at, frequency * inputs.buffer_limit) for backfill_start_at, backfill_end_at in full_backfill_range: - utcnow = dt.datetime.now(dt.timezone.utc) + utcnow = get_utcnow() + + if end_at is None and backfill_end_at >= utcnow: + # This backfill (with no `end_at`) has caught up with real time and should unpause the + # underlying batch export and exit. + await sync_to_async(unpause_batch_export)(client, inputs.schedule_id) + return if jitter is not None: backfill_end_at = backfill_end_at + jitter @@ -286,15 +301,15 @@ async def check_temporal_schedule_exists(client: temporalio.client.Client, sched def backfill_range( - start_at: dt.datetime, end_at: dt.datetime, step: dt.timedelta + start_at: dt.datetime, end_at: dt.datetime | None, step: dt.timedelta ) -> typing.Generator[tuple[dt.datetime, dt.datetime], None, None]: """Generate range of dates between start_at and end_at.""" current = start_at - while current < end_at: + while end_at is None or current < end_at: current_end = current + step - if current_end > end_at: + if end_at and current_end > end_at: # Do not yield a range that is less than step. # Same as built-in range. break @@ -354,8 +369,18 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: retry_policy=temporalio.common.RetryPolicy(maximum_attempts=0), ) - backfill_duration = dt.datetime.fromisoformat(inputs.end_at) - dt.datetime.fromisoformat(inputs.start_at) - number_of_expected_runs = backfill_duration / dt.timedelta(seconds=frequency_seconds) + # Temporal requires that we set a timeout. + if inputs.end_at is None: + # Set timeout to a month for now, as unending backfills are an internal feature we are + # testing for HTTP-based migrations. We'll need to pick a more realistic timeout + # if we release this to customers. + start_to_close_timeout = dt.timedelta(days=31) + else: + # Allocate 5 minutes per expected number of runs to backfill as a timeout. + # The 5 minutes are just an assumption and we may tweak this in the future + backfill_duration = dt.datetime.fromisoformat(inputs.end_at) - dt.datetime.fromisoformat(inputs.start_at) + number_of_expected_runs = backfill_duration / dt.timedelta(seconds=frequency_seconds) + start_to_close_timeout = dt.timedelta(minutes=5 * number_of_expected_runs) backfill_schedule_inputs = BackfillScheduleInputs( schedule_id=inputs.batch_export_id, @@ -374,10 +399,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: maximum_interval=dt.timedelta(seconds=60), non_retryable_error_types=["TemporalScheduleNotFoundError"], ), - # Temporal requires that we set a timeout. - # Allocate 5 minutes per expected number of runs to backfill as a timeout. - # The 5 minutes are just an assumption and we may tweak this in the future - start_to_close_timeout=dt.timedelta(minutes=5 * number_of_expected_runs), + start_to_close_timeout=start_to_close_timeout, heartbeat_timeout=dt.timedelta(minutes=2), ) diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 3f89769b78ef1..8fa61a370d5c3 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -566,7 +566,7 @@ class CreateBatchExportBackfillInputs: team_id: int batch_export_id: str start_at: str - end_at: str + end_at: str | None status: str diff --git a/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py index dc5ef36c0b5f0..fcb2021975e57 100644 --- a/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py +++ b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py @@ -1,4 +1,5 @@ import datetime as dt +from unittest import mock import uuid import pytest @@ -24,6 +25,7 @@ acreate_batch_export, adelete_batch_export, afetch_batch_export_backfills, + afetch_batch_export, ) pytestmark = [pytest.mark.asyncio] @@ -31,25 +33,16 @@ @pytest_asyncio.fixture async def temporal_schedule(temporal_client, team): - """Manage a test Temopral Schedule yielding its handle.""" - destination_data = { - "type": "NoOp", - "config": {}, - } - - interval = "every 1 minutes" - batch_export_data = { - "name": "no-op-export", - "destination": destination_data, - "interval": interval, - "paused": True, - } - + """Manage a test Temporal Schedule yielding its handle.""" batch_export = await acreate_batch_export( team_id=team.pk, - name=batch_export_data["name"], - destination_data=batch_export_data["destination"], - interval=batch_export_data["interval"], + name="no-op-export", + destination_data={ + "type": "NoOp", + "config": {}, + }, + interval="every 1 minutes", + paused=True, ) handle = temporal_client.get_schedule_handle(str(batch_export.id)) @@ -211,6 +204,61 @@ async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule assert backfill.status == "Completed" +@pytest.mark.django_db(transaction=True) +@mock.patch("posthog.temporal.batch_exports.backfill_batch_export.get_utcnow") +async def test_backfill_batch_export_workflow_no_end_at( + mock_utcnow, temporal_worker, temporal_schedule, temporal_client, team +): + """Test BackfillBatchExportWorkflow executes all backfill runs and updates model.""" + + # Note the mocked time here, we should stop backfilling at 8 minutes and unpause the job. + mock_utcnow.return_value = dt.datetime(2023, 1, 1, 0, 8, 12, tzinfo=dt.timezone.utc) + + start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc) + end_at = None + + desc = await temporal_schedule.describe() + + workflow_id = str(uuid.uuid4()) + inputs = BackfillBatchExportInputs( + team_id=team.pk, + batch_export_id=desc.id, + start_at=start_at.isoformat(), + end_at=end_at, + buffer_limit=2, + wait_delay=0.1, + ) + + batch_export = await afetch_batch_export(desc.id) + assert batch_export.paused is True + + handle = await temporal_client.start_workflow( + BackfillBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + execution_timeout=dt.timedelta(minutes=1), + retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), + ) + await handle.result() + + desc = await temporal_schedule.describe() + result = desc.info.num_actions + expected = 8 + + assert result == expected + + backfills = await afetch_batch_export_backfills(batch_export_id=desc.id) + + assert len(backfills) == 1, "Expected one backfill to have been created" + + backfill = backfills.pop() + assert backfill.status == "Completed" + + batch_export = await afetch_batch_export(desc.id) + assert batch_export.paused is False + + @pytest.mark.django_db(transaction=True) async def test_backfill_batch_export_workflow_fails_when_schedule_deleted( temporal_worker, temporal_schedule, temporal_client, team diff --git a/posthog/temporal/tests/utils/models.py b/posthog/temporal/tests/utils/models.py index 65b40e91332c6..04da6fe21b0fb 100644 --- a/posthog/temporal/tests/utils/models.py +++ b/posthog/temporal/tests/utils/models.py @@ -13,11 +13,11 @@ from posthog.batch_exports.service import sync_batch_export -def create_batch_export(team_id: int, interval: str, name: str, destination_data: dict) -> BatchExport: +def create_batch_export(team_id: int, interval: str, name: str, destination_data: dict, paused: bool) -> BatchExport: """Create a BatchExport and its underlying Schedule.""" destination = BatchExportDestination(**destination_data) - batch_export = BatchExport(team_id=team_id, destination=destination, interval=interval, name=name) + batch_export = BatchExport(team_id=team_id, destination=destination, interval=interval, name=name, paused=paused) sync_batch_export(batch_export, created=True) @@ -27,9 +27,11 @@ def create_batch_export(team_id: int, interval: str, name: str, destination_data return batch_export -async def acreate_batch_export(team_id: int, interval: str, name: str, destination_data: dict) -> BatchExport: +async def acreate_batch_export( + team_id: int, interval: str, name: str, destination_data: dict, paused: bool = False +) -> BatchExport: """Async create a BatchExport and its underlying Schedule.""" - return await sync_to_async(create_batch_export)(team_id, interval, name, destination_data) + return await sync_to_async(create_batch_export)(team_id, interval, name, destination_data, paused) async def adelete_batch_export(batch_export: BatchExport, temporal_client: temporalio.client.Client) -> None: @@ -45,6 +47,11 @@ async def adelete_batch_export(batch_export: BatchExport, temporal_client: tempo await sync_to_async(batch_export.delete)() +async def afetch_batch_export(batch_export_id: uuid.UUID) -> BatchExport: + """Fetch a BatchExport by its ID.""" + return await sync_to_async(BatchExport.objects.get)(id=batch_export_id) + + def fetch_batch_export_runs(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportRun]: """Fetch the BatchExportRuns for a given BatchExport.""" return list(BatchExportRun.objects.filter(batch_export_id=batch_export_id).order_by("-created_at")[:limit])