Skip to content

Commit

Permalink
feat(batch-export): allow batch export backfills with no end, which u… (
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner authored Mar 6, 2024
1 parent 390a000 commit 021a926
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 70 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0394_organization_customer_trust_scores_and_more
posthog: 0395_alter_batchexportbackfill_end_at
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
42 changes: 19 additions & 23 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmeth
posthog/hogql/database/argmax.py:0: error: Argument "chain" to "Field" has incompatible type "list[str]"; expected "list[str | int]" [arg-type]
posthog/hogql/database/argmax.py:0: note: "List" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance
posthog/hogql/database/argmax.py:0: note: Consider using "Sequence" instead, which is covariant
posthog/hogql/database/schema/numbers.py:0: error: Incompatible types in assignment (expression has type "dict[str, IntegerDatabaseField]", variable has type "dict[str, FieldOrTable]") [assignment]
posthog/hogql/database/schema/numbers.py:0: note: "Dict" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance
posthog/hogql/database/schema/numbers.py:0: note: Consider using "Mapping" instead, which is covariant in the value type
posthog/hogql/ast.py:0: error: Argument "chain" to "FieldTraverserType" has incompatible type "list[str]"; expected "list[str | int]" [arg-type]
posthog/hogql/ast.py:0: note: "List" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance
posthog/hogql/ast.py:0: note: Consider using "Sequence" instead, which is covariant
posthog/hogql/ast.py:0: error: Incompatible return value type (got "bool | None", expected "bool") [return-value]
posthog/hogql/database/schema/numbers.py:0: error: Incompatible types in assignment (expression has type "dict[str, IntegerDatabaseField]", variable has type "dict[str, FieldOrTable]") [assignment]
posthog/hogql/database/schema/numbers.py:0: note: "Dict" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance
posthog/hogql/database/schema/numbers.py:0: note: Consider using "Mapping" instead, which is covariant in the value type
posthog/hogql/visitor.py:0: error: Statement is unreachable [unreachable]
posthog/hogql/visitor.py:0: error: Argument 1 to "visit" of "Visitor" has incompatible type "Type | None"; expected "AST" [arg-type]
posthog/hogql/visitor.py:0: error: Argument 1 to "visit" of "Visitor" has incompatible type "Type | None"; expected "AST" [arg-type]
Expand Down Expand Up @@ -174,7 +174,6 @@ posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "
posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "SelectUnionQuery" of "SelectQuery | SelectUnionQuery" has no attribute "select" [union-attr]
posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "SelectUnionQuery" of "SelectQuery | SelectUnionQuery" has no attribute "group_by" [union-attr]
posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "None" of "list[Expr] | Any | None" has no attribute "append" [union-attr]
posthog/batch_exports/service.py:0: error: Missing positional argument "end_at" in call to "backfill_export" [call-arg]
ee/billing/billing_manager.py:0: error: TypedDict "CustomerInfo" has no key "available_product_features" [typeddict-item]
ee/billing/billing_manager.py:0: note: Did you mean "available_features"?
posthog/hogql/resolver.py:0: error: Argument 1 of "visit" is incompatible with supertype "Visitor"; supertype defines the argument type as "AST" [override]
Expand Down Expand Up @@ -346,11 +345,6 @@ posthog/hogql/autocomplete.py:0: error: Unused "type: ignore" comment [unused-i
posthog/hogql_queries/insights/trends/breakdown_values.py:0: error: Item "SelectUnionQuery" of "SelectQuery | SelectUnionQuery" has no attribute "select" [union-attr]
posthog/hogql_queries/insights/trends/breakdown_values.py:0: error: Value of type "list[Any] | None" is not indexable [index]
posthog/hogql_queries/sessions_timeline_query_runner.py:0: error: Statement is unreachable [unreachable]
posthog/hogql_queries/hogql_query_runner.py:0: error: Statement is unreachable [unreachable]
posthog/hogql_queries/hogql_query_runner.py:0: error: Argument "placeholders" to "parse_select" has incompatible type "dict[str, Constant] | None"; expected "dict[str, Expr] | None" [arg-type]
posthog/hogql_queries/hogql_query_runner.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "SelectQuery | SelectUnionQuery") [assignment]
posthog/hogql_queries/hogql_query_runner.py:0: error: Incompatible return value type (got "SelectQuery | SelectUnionQuery", expected "SelectQuery") [return-value]
posthog/hogql_queries/events_query_runner.py:0: error: Statement is unreachable [unreachable]
posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown_type" [union-attr]
posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown_histogram_bin_count" [union-attr]
posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown_type" [union-attr]
Expand All @@ -375,6 +369,11 @@ posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "Bre
posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown" [union-attr]
posthog/hogql_queries/insights/trends/breakdown.py:0: error: Argument "breakdown_field" to "get_properties_chain" has incompatible type "str | float | list[str | float] | Any | None"; expected "str" [arg-type]
posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown_group_type_index" [union-attr]
posthog/hogql_queries/hogql_query_runner.py:0: error: Statement is unreachable [unreachable]
posthog/hogql_queries/hogql_query_runner.py:0: error: Argument "placeholders" to "parse_select" has incompatible type "dict[str, Constant] | None"; expected "dict[str, Expr] | None" [arg-type]
posthog/hogql_queries/hogql_query_runner.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "SelectQuery | SelectUnionQuery") [assignment]
posthog/hogql_queries/hogql_query_runner.py:0: error: Incompatible return value type (got "SelectQuery | SelectUnionQuery", expected "SelectQuery") [return-value]
posthog/hogql_queries/events_query_runner.py:0: error: Statement is unreachable [unreachable]
posthog/hogql/metadata.py:0: error: Argument "metadata_source" to "translate_hogql" has incompatible type "SelectQuery | SelectUnionQuery"; expected "SelectQuery | None" [arg-type]
posthog/hogql/metadata.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "SelectQuery | SelectUnionQuery") [assignment]
posthog/queries/breakdown_props.py:0: error: Argument 1 to "translate_hogql" has incompatible type "str | int"; expected "str" [arg-type]
Expand All @@ -386,6 +385,17 @@ posthog/api/person.py:0: error: Argument 1 to "loads" has incompatible type "str
posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type]
posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type]
posthog/hogql_queries/web_analytics/web_analytics_query_runner.py:0: error: Argument 1 to "append" of "list" has incompatible type "EventPropertyFilter"; expected "Expr" [arg-type]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Signature of "to_actors_query" incompatible with supertype "QueryRunner" [override]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Superclass:
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self) -> SelectQuery | SelectUnionQuery
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Subclass:
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self, time_frame: str | int | None, series_index: int, breakdown_value: str | int | None = ..., compare: Compare | None = ...) -> SelectQuery | SelectUnionQuery
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Superclass:
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self) -> SelectQuery | SelectUnionQuery
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Subclass:
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self, time_frame: str | int | None, series_index: int, breakdown_value: str | int | None = ..., compare: Compare | None = ...) -> SelectQuery | SelectUnionQuery
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Statement is unreachable [unreachable]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "_event_property" of "TrendsQueryRunner" has incompatible type "str | float | list[str | float] | None"; expected "str" [arg-type]
posthog/hogql_queries/insights/retention_query_runner.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "Call") [assignment]
posthog/hogql_queries/insights/retention_query_runner.py:0: error: Incompatible types in assignment (expression has type "Call", variable has type "Field") [assignment]
posthog/hogql_queries/insights/retention_query_runner.py:0: error: Argument "select" to "SelectQuery" has incompatible type "list[Alias]"; expected "list[Expr]" [arg-type]
Expand All @@ -410,17 +420,6 @@ posthog/hogql_queries/insights/lifecycle_query_runner.py:0: note: Consider using
posthog/hogql_queries/insights/lifecycle_query_runner.py:0: error: Argument 1 to "sorted" has incompatible type "list[Any] | None"; expected "Iterable[Any]" [arg-type]
posthog/hogql_queries/insights/lifecycle_query_runner.py:0: error: Item "SelectUnionQuery" of "SelectQuery | SelectUnionQuery" has no attribute "select_from" [union-attr]
posthog/hogql_queries/insights/lifecycle_query_runner.py:0: error: Item "None" of "JoinExpr | Any | None" has no attribute "sample" [union-attr]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Signature of "to_actors_query" incompatible with supertype "QueryRunner" [override]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Superclass:
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self) -> SelectQuery | SelectUnionQuery
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Subclass:
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self, time_frame: str | int | None, series_index: int, breakdown_value: str | int | None = ..., compare: Compare | None = ...) -> SelectQuery | SelectUnionQuery
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Superclass:
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self) -> SelectQuery | SelectUnionQuery
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: Subclass:
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: note: def to_actors_query(self, time_frame: str | int | None, series_index: int, breakdown_value: str | int | None = ..., compare: Compare | None = ...) -> SelectQuery | SelectUnionQuery
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Statement is unreachable [unreachable]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "_event_property" of "TrendsQueryRunner" has incompatible type "str | float | list[str | float] | None"; expected "str" [arg-type]
posthog/hogql_queries/legacy_compatibility/process_insight.py:0: error: Incompatible types in assignment (expression has type "PathFilter", variable has type "RetentionFilter") [assignment]
posthog/hogql_queries/legacy_compatibility/process_insight.py:0: error: Incompatible types in assignment (expression has type "StickinessFilter", variable has type "RetentionFilter") [assignment]
posthog/hogql_queries/legacy_compatibility/process_insight.py:0: error: Incompatible types in assignment (expression has type "Filter", variable has type "RetentionFilter") [assignment]
Expand Down Expand Up @@ -792,9 +791,6 @@ posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0:
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "last_uploaded_part_timestamp" [attr-defined]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined]
posthog/temporal/tests/batch_exports/test_backfill_batch_export.py:0: error: Argument "name" to "acreate_batch_export" has incompatible type "object"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_backfill_batch_export.py:0: error: Argument "destination_data" to "acreate_batch_export" has incompatible type "object"; expected "dict[Any, Any]" [arg-type]
posthog/temporal/tests/batch_exports/test_backfill_batch_export.py:0: error: Argument "interval" to "acreate_batch_export" has incompatible type "object"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: error: Incompatible types in assignment (expression has type "str | int", variable has type "int") [assignment]
posthog/api/test/batch_exports/conftest.py:0: error: Argument "activities" to "ThreadedWorker" has incompatible type "list[function]"; expected "Sequence[Callable[..., Any]]" [arg-type]
posthog/management/commands/test/test_create_batch_export_from_app.py:0: error: Incompatible return value type (got "dict[str, Collection[str]]", expected "dict[str, str]") [return-value]
Expand Down
1 change: 1 addition & 0 deletions posthog/api/test/batch_exports/test_pause.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ def test_unpause_can_trigger_a_backfill(client: HttpClient):
mock_backfill.assert_called_once_with(
ANY,
batch_export["id"],
team.pk,
start_at,
end_at,
)
6 changes: 5 additions & 1 deletion posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
BatchExportServiceError,
BatchExportServiceRPCError,
BatchExportServiceScheduleNotFound,
BatchExportWithNoEndNotAllowedError,
backfill_export,
batch_export_delete_schedule,
cancel_running_batch_export_backfill,
Expand Down Expand Up @@ -371,7 +372,10 @@ def backfill(self, request: request.Request, *args, **kwargs) -> response.Respon

batch_export = self.get_object()
temporal = sync_connect()
backfill_id = backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at)
try:
backfill_id = backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at)
except BatchExportWithNoEndNotAllowedError:
raise ValidationError("Backfilling a BatchExport with no end date is not allowed")

return response.Response({"backfill_id": backfill_id})

Expand Down
2 changes: 1 addition & 1 deletion posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class Status(models.TextChoices):
help_text="The BatchExport this backfill belongs to.",
)
start_at: models.DateTimeField = models.DateTimeField(help_text="The start of the data interval.")
end_at: models.DateTimeField = models.DateTimeField(help_text="The end of the data interval.")
end_at: models.DateTimeField = models.DateTimeField(help_text="The end of the data interval.", null=True)
status: models.CharField = models.CharField(
choices=Status.choices, max_length=64, help_text="The status of this backfill."
)
Expand Down
31 changes: 22 additions & 9 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from posthog.batch_exports.models import (
BatchExport,
BatchExportBackfill,
BatchExportDestination,
BatchExportRun,
)
from posthog.constants import BATCH_EXPORTS_TASK_QUEUE
Expand Down Expand Up @@ -209,6 +210,10 @@ class BatchExportServiceRPCError(BatchExportServiceError):
"""Exception raised when the underlying Temporal RPC fails."""


class BatchExportWithNoEndNotAllowedError(BatchExportServiceError):
"""Exception raised when a BatchExport without an end_at is not allowed for a given destination."""


class BatchExportServiceScheduleNotFound(BatchExportServiceRPCError):
"""Exception raised when the underlying Temporal RPC fails because a schedule was not found."""

Expand Down Expand Up @@ -283,7 +288,7 @@ def unpause_batch_export(
start_at = batch_export.last_paused_at
end_at = batch_export.last_updated_at

backfill_export(temporal, batch_export_id, start_at, end_at)
backfill_export(temporal, batch_export_id, batch_export.team_id, start_at, end_at)


def batch_export_delete_schedule(temporal: Client, schedule_id: str) -> None:
Expand Down Expand Up @@ -316,7 +321,7 @@ class BackfillBatchExportInputs:
team_id: int
batch_export_id: str
start_at: str
end_at: str
end_at: str | None
buffer_limit: int = 1
wait_delay: float = 5.0

Expand All @@ -326,7 +331,7 @@ def backfill_export(
batch_export_id: str,
team_id: int,
start_at: dt.datetime,
end_at: dt.datetime,
end_at: dt.datetime | None,
) -> str:
"""Starts a backfill for given team and batch export covering given date range.
Expand All @@ -335,18 +340,26 @@ def backfill_export(
batch_export_id: The id of the BatchExport to backfill.
team_id: The id of the Team the BatchExport belongs to.
start_at: From when to backfill.
end_at: Up to when to backfill.
end_at: Up to when to backfill, if None it will backfill until it has caught up with realtime
and then unpause the underlying BatchExport.
"""
try:
BatchExport.objects.get(id=batch_export_id, team_id=team_id)
batch_export = BatchExport.objects.select_related("destination").get(id=batch_export_id, team_id=team_id)
except BatchExport.DoesNotExist:
raise BatchExportIdError(batch_export_id)

# Ensure we don't allow users access to this feature until we are ready.
if not end_at and batch_export.destination.type not in (
BatchExportDestination.Destination.HTTP,
BatchExportDestination.Destination.NOOP, # For tests.
):
raise BatchExportWithNoEndNotAllowedError(f"BatchExport {batch_export_id} has no end_at and is not HTTP")

inputs = BackfillBatchExportInputs(
batch_export_id=batch_export_id,
team_id=team_id,
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
end_at=end_at.isoformat() if end_at else None,
)
workflow_id = start_backfill_batch_export_workflow(temporal, inputs=inputs)
return workflow_id
Expand All @@ -358,7 +371,7 @@ async def start_backfill_batch_export_workflow(temporal: Client, inputs: Backfil
handle = temporal.get_schedule_handle(inputs.batch_export_id)
description = await handle.describe()

if description.schedule.spec.jitter is not None:
if description.schedule.spec.jitter is not None and inputs.end_at is not None:
# Adjust end_at to account for jitter if present.
inputs.end_at = (dt.datetime.fromisoformat(inputs.end_at) + description.schedule.spec.jitter).isoformat()

Expand Down Expand Up @@ -464,7 +477,7 @@ def create_batch_export_backfill(
batch_export_id: UUID,
team_id: int,
start_at: str,
end_at: str,
end_at: str | None,
status: str = BatchExportRun.Status.RUNNING,
) -> BatchExportBackfill:
"""Create a BatchExportBackfill.
Expand All @@ -481,7 +494,7 @@ def create_batch_export_backfill(
batch_export_id=batch_export_id,
status=status,
start_at=dt.datetime.fromisoformat(start_at),
end_at=dt.datetime.fromisoformat(end_at),
end_at=dt.datetime.fromisoformat(end_at) if end_at else None,
team_id=team_id,
)
backfill.save()
Expand Down
Loading

0 comments on commit 021a926

Please sign in to comment.