From 9842e97b587ae1378831f817d3445915f1f1fbb3 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Wed, 4 Sep 2024 15:21:00 +0100 Subject: [PATCH] chore(data-warehouse): Updated billing limits to work per sync (#24752) --- ee/billing/quota_limiting.py | 26 ++--- ee/billing/test/test_quota_limiting.py | 6 +- .../DataWarehouseManagedSourcesTable.tsx | 1 + .../settings/source/Schemas.tsx | 1 + .../data-warehouse/settings/source/Syncs.tsx | 2 +- frontend/src/types.ts | 2 +- mypy-baseline.txt | 67 +++++------ posthog/tasks/scheduled.py | 7 -- posthog/tasks/tasks.py | 10 -- posthog/tasks/test/test_warehouse.py | 104 +---------------- posthog/tasks/warehouse.py | 92 +-------------- posthog/temporal/data_imports/__init__.py | 2 + .../data_imports/external_data_job.py | 43 ++++--- .../data_imports/pipelines/helpers.py | 9 -- .../pipelines/rest_source/__init__.py | 7 -- .../check_billing_limits.py | 41 +++++++ .../workflow_activities/create_job_model.py | 4 +- .../tests/data_imports/test_end_to_end.py | 43 +++++++ .../external_data/test_external_data_job.py | 109 +----------------- posthog/warehouse/api/external_data_schema.py | 7 ++ posthog/warehouse/api/external_data_source.py | 9 +- .../warehouse/external_data_source/jobs.py | 13 ++- 22 files changed, 190 insertions(+), 415 deletions(-) create mode 100644 posthog/temporal/data_imports/workflow_activities/check_billing_limits.py diff --git a/ee/billing/quota_limiting.py b/ee/billing/quota_limiting.py index fbf35a09e2dd3..f91811d866a40 100644 --- a/ee/billing/quota_limiting.py +++ b/ee/billing/quota_limiting.py @@ -122,7 +122,7 @@ def org_quota_limited_until( if organization.never_drop_data or trust_score == 15: return None - team_tokens = get_team_attribute_by_quota_resource(organization, resource) + team_tokens = get_team_attribute_by_quota_resource(organization) team_being_limited = any(x in previously_quota_limited_team_tokens for x in team_tokens) if team_being_limited: @@ -237,7 +237,7 @@ def sync_org_quota_limits(organization: Organization): previously_quota_limited_team_tokens = list_limited_team_attributes( resource, QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY ) - team_attributes = get_team_attribute_by_quota_resource(organization, resource) + team_attributes = get_team_attribute_by_quota_resource(organization) result = org_quota_limited_until(organization, resource, previously_quota_limited_team_tokens) if result: @@ -264,24 +264,14 @@ def sync_org_quota_limits(organization: Organization): remove_limited_team_tokens(resource, team_attributes, QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY) -def get_team_attribute_by_quota_resource(organization: Organization, resource: QuotaResource): - if resource in [QuotaResource.EVENTS, QuotaResource.RECORDINGS]: - team_tokens: list[str] = [x for x in list(organization.teams.values_list("api_token", flat=True)) if x] +def get_team_attribute_by_quota_resource(organization: Organization): + team_tokens: list[str] = [x for x in list(organization.teams.values_list("api_token", flat=True)) if x] - if not team_tokens: - capture_exception(Exception(f"quota_limiting: No team tokens found for organization: {organization.id}")) - return + if not team_tokens: + capture_exception(Exception(f"quota_limiting: No team tokens found for organization: {organization.id}")) + return - return team_tokens - - if resource == QuotaResource.ROWS_SYNCED: - team_ids: list[str] = [x for x in list(organization.teams.values_list("id", flat=True)) if x] - - if not team_ids: - capture_exception(Exception(f"quota_limiting: No team ids found for organization: {organization.id}")) - return - - return team_ids + return team_tokens def set_org_usage_summary( diff --git a/ee/billing/test/test_quota_limiting.py b/ee/billing/test/test_quota_limiting.py index 3e8b5105767d3..926e3441c4f73 100644 --- a/ee/billing/test/test_quota_limiting.py +++ b/ee/billing/test/test_quota_limiting.py @@ -92,7 +92,7 @@ def test_quota_limiting_feature_flag_enabled(self, patch_feature_enabled, patch_ patch_capture.reset_mock() # Add this org to the redis cache. - team_tokens = get_team_attribute_by_quota_resource(self.organization, QuotaResource.EVENTS) + team_tokens = get_team_attribute_by_quota_resource(self.organization) add_limited_team_tokens( QuotaResource.EVENTS, {x: 1612137599 for x in team_tokens}, @@ -715,7 +715,7 @@ def test_sync_org_quota_limits(self): # rows_synced uses teams, not tokens assert sorted( list_limited_team_attributes(QuotaResource.ROWS_SYNCED, QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY) - ) == sorted(["1337", str(self.team.pk), str(other_team.pk)]) + ) == sorted(["1337", str(self.team.api_token), str(other_team.api_token)]) self.organization.usage["events"]["usage"] = 80 self.organization.usage["rows_synced"]["usage"] = 36 @@ -748,7 +748,7 @@ def test_sync_org_quota_limits(self): list_limited_team_attributes( QuotaResource.ROWS_SYNCED, QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY ) - ) == sorted([str(self.team.pk), str(other_team.pk)]) + ) == sorted([str(self.team.api_token), str(other_team.api_token)]) self.organization.usage["events"]["usage"] = 80 self.organization.usage["rows_synced"]["usage"] = 36 diff --git a/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx b/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx index e983ce363bd95..d18657892fd22 100644 --- a/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx +++ b/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx @@ -27,6 +27,7 @@ const StatusTagSetting = { Completed: 'success', Error: 'danger', Failed: 'danger', + 'Billing limits': 'danger', } export function DataWarehouseManagedSourcesTable(): JSX.Element { diff --git a/frontend/src/scenes/data-warehouse/settings/source/Schemas.tsx b/frontend/src/scenes/data-warehouse/settings/source/Schemas.tsx index 2a3fa469ac658..56c5e12963dd8 100644 --- a/frontend/src/scenes/data-warehouse/settings/source/Schemas.tsx +++ b/frontend/src/scenes/data-warehouse/settings/source/Schemas.tsx @@ -47,6 +47,7 @@ const StatusTagSetting = { Completed: 'success', Error: 'danger', Failed: 'danger', + 'Billing limits': 'danger', } export const SchemaTable = ({ schemas, isLoading }: SchemaTableProps): JSX.Element => { diff --git a/frontend/src/scenes/data-warehouse/settings/source/Syncs.tsx b/frontend/src/scenes/data-warehouse/settings/source/Syncs.tsx index a86a41ec867ae..c283e32c7b54b 100644 --- a/frontend/src/scenes/data-warehouse/settings/source/Syncs.tsx +++ b/frontend/src/scenes/data-warehouse/settings/source/Syncs.tsx @@ -11,7 +11,7 @@ const StatusTagSetting: Record = { Running: 'primary', Completed: 'success', Failed: 'danger', - Cancelled: 'default', + 'Billing limits': 'danger', } interface SyncsProps { diff --git a/frontend/src/types.ts b/frontend/src/types.ts index cbc91482a2bae..a4e8187d755de 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -3924,7 +3924,7 @@ export interface ExternalDataSourceSchema extends SimpleExternalDataSourceSchema export interface ExternalDataJob { id: string created_at: string - status: 'Running' | 'Failed' | 'Completed' | 'Cancelled' + status: 'Running' | 'Failed' | 'Completed' | 'Billing limits' schema: SimpleExternalDataSourceSchema rows_synced: number latest_error: string diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 2a1b0092f3f1b..9e25498b357dd 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -27,6 +27,21 @@ posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Un posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Unpacked dict entry 1 has incompatible type "dict[str, Any] | None"; expected "SupportsKeysAndGetItem[str, Any]" [dict-item] posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Unpacked dict entry 0 has incompatible type "dict[str, Any] | None"; expected "SupportsKeysAndGetItem[str, ResolveParamConfig | IncrementalParamConfig | Any]" [dict-item] posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Unpacked dict entry 1 has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "SupportsKeysAndGetItem[str, ResolveParamConfig | IncrementalParamConfig | Any]" [dict-item] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Not all union combinations were tried because there are too many unions [misc] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 2 to "source" has incompatible type "str | None"; expected "str" [arg-type] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 3 to "source" has incompatible type "str | None"; expected "str" [arg-type] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 4 to "source" has incompatible type "int | None"; expected "int" [arg-type] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 6 to "source" has incompatible type "Schema | None"; expected "Schema" [arg-type] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 7 to "source" has incompatible type "Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | None"; expected "Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict" [arg-type] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 8 to "source" has incompatible type "type[BaseConfiguration] | None"; expected "type[BaseConfiguration]" [arg-type] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "build_resource_dependency_graph" has incompatible type "EndpointResourceBase | None"; expected "EndpointResourceBase" [arg-type] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible types in assignment (expression has type "list[str] | None", variable has type "list[str]") [assignment] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "setup_incremental_object" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "dict[str, Any]" [arg-type] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument "base_url" to "RESTClient" has incompatible type "str | None"; expected "str" [arg-type] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "exclude_keys" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "Mapping[str, Any]" [arg-type] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible default for argument "resolved_param" (default has type "ResolvedParam | None", argument has type "ResolvedParam") [assignment] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument "module" to "SourceInfo" has incompatible type Module | None; expected Module [arg-type] posthog/utils.py:0: error: No overload variant of "asdict" matches argument type "type[DataclassInstance]" [call-overload] posthog/utils.py:0: note: Possible overload variants: posthog/utils.py:0: note: def asdict(obj: DataclassInstance) -> dict[str, Any] @@ -239,7 +254,6 @@ posthog/user_permissions.py:0: error: Key expression in dictionary comprehension posthog/user_permissions.py:0: error: Incompatible return value type (got "int", expected "Level | None") [return-value] posthog/user_permissions.py:0: error: Incompatible return value type (got "int", expected "Level | None") [return-value] posthog/user_permissions.py:0: error: Incompatible return value type (got "int", expected "RestrictionLevel") [return-value] -posthog/tasks/warehouse.py:0: error: Argument 1 to "cancel_external_data_workflow" has incompatible type "str | None"; expected "str" [arg-type] posthog/tasks/update_survey_iteration.py:0: error: Incompatible types in assignment (expression has type "ForeignKey[Any, _ST] | Any", variable has type "FeatureFlag | Combinable | None") [assignment] posthog/tasks/update_survey_iteration.py:0: error: Item "None" of "FeatureFlag | None" has no attribute "filters" [union-attr] posthog/tasks/update_survey_iteration.py:0: error: Item "None" of "FeatureFlag | None" has no attribute "filters" [union-attr] @@ -266,7 +280,6 @@ posthog/demo/matrix/matrix.py:0: error: Name "timezone.datetime" is not defined posthog/demo/matrix/matrix.py:0: error: Name "timezone.datetime" is not defined [name-defined] posthog/api/shared.py:0: error: Incompatible return value type (got "int | None", expected "Level | None") [return-value] ee/billing/quota_limiting.py:0: error: Argument 2 to "feature_enabled" has incompatible type "UUID"; expected "str" [arg-type] -ee/billing/quota_limiting.py:0: error: List comprehension has incompatible type List[int]; expected List[str] [misc] ee/billing/quota_limiting.py:0: error: Unsupported target for indexed assignment ("object") [index] ee/billing/quota_limiting.py:0: error: "object" has no attribute "get" [attr-defined] ee/billing/quota_limiting.py:0: error: Unsupported target for indexed assignment ("object") [index] @@ -412,7 +425,6 @@ posthog/api/user.py:0: error: "User" has no attribute "social_auth" [attr-defin ee/clickhouse/queries/related_actors_query.py:0: error: Argument 1 to "_query_related_groups" of "RelatedActorsQuery" has incompatible type "int"; expected "Literal[0, 1, 2, 3, 4]" [arg-type] ee/api/test/base.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "License") [assignment] ee/api/test/base.py:0: error: "setUpTestData" undefined in superclass [misc] -posthog/warehouse/external_data_source/jobs.py:0: error: Incompatible types in assignment (expression has type "str", variable has type "Status") [assignment] posthog/warehouse/external_data_source/jobs.py:0: error: Incompatible type for lookup 'id': (got "UUID | None", expected "UUID | str") [misc] posthog/warehouse/api/test/test_table.py:0: error: Item "None" of "DataWarehouseCredential | None" has no attribute "access_key" [union-attr] posthog/warehouse/api/test/test_table.py:0: error: Item "None" of "DataWarehouseCredential | None" has no attribute "access_secret" [union-attr] @@ -436,6 +448,23 @@ posthog/test/test_feature_flag_analytics.py:0: error: Item "None" of "Dashboard posthog/test/activity_logging/test_activity_logging.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/test/activity_logging/test_activity_logging.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/test/activity_logging/test_activity_logging.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/tasks/test/test_update_survey_iteration.py:0: error: Item "None" of "FeatureFlag | None" has no attribute "filters" [union-attr] posthog/tasks/test/test_stop_surveys_reached_target.py:0: error: No overload variant of "__sub__" of "datetime" matches argument type "None" [operator] posthog/tasks/test/test_stop_surveys_reached_target.py:0: note: Possible overload variants: @@ -591,21 +620,6 @@ posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: note: d posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: note: def get(self, Type, Sequence[str], /) -> Sequence[str] posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: note: def [_T] get(self, Type, _T, /) -> Sequence[str] | _T posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: error: Argument 1 has incompatible type "dict[str, list[tuple[str, str]]]"; expected "list[Any]" [arg-type] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Not all union combinations were tried because there are too many unions [misc] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 2 to "source" has incompatible type "str | None"; expected "str" [arg-type] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 3 to "source" has incompatible type "str | None"; expected "str" [arg-type] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 4 to "source" has incompatible type "int | None"; expected "int" [arg-type] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 6 to "source" has incompatible type "Schema | None"; expected "Schema" [arg-type] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 7 to "source" has incompatible type "Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | None"; expected "Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict" [arg-type] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 8 to "source" has incompatible type "type[BaseConfiguration] | None"; expected "type[BaseConfiguration]" [arg-type] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "build_resource_dependency_graph" has incompatible type "EndpointResourceBase | None"; expected "EndpointResourceBase" [arg-type] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible types in assignment (expression has type "list[str] | None", variable has type "list[str]") [assignment] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "setup_incremental_object" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "dict[str, Any]" [arg-type] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument "base_url" to "RESTClient" has incompatible type "str | None"; expected "str" [arg-type] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "exclude_keys" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "Mapping[str, Any]" [arg-type] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible default for argument "resolved_param" (default has type "ResolvedParam | None", argument has type "ResolvedParam") [assignment] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument "module" to "SourceInfo" has incompatible type Module | None; expected Module [arg-type] posthog/tasks/exports/test/test_csv_exporter.py:0: error: Function is missing a return type annotation [no-untyped-def] posthog/tasks/exports/test/test_csv_exporter.py:0: error: Function is missing a type annotation [no-untyped-def] posthog/tasks/exports/test/test_csv_exporter.py:0: error: Function is missing a type annotation for one or more arguments [no-untyped-def] @@ -764,23 +778,6 @@ posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict key must be a string literal; expected one of ("_timestamp", "created_at", "distinct_id", "elements", "elements_chain", ...) [literal-required] -posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/tasks/test/test_email.py:0: error: Argument 1 to "send_batch_export_run_failure" has incompatible type "UUID"; expected "str" [arg-type] posthog/tasks/test/test_email.py:0: error: Argument 1 to "send_batch_export_run_failure" has incompatible type "UUID"; expected "str" [arg-type] posthog/tasks/test/test_email.py:0: error: Argument 1 to "send_batch_export_run_failure" has incompatible type "UUID"; expected "str" [arg-type] diff --git a/posthog/tasks/scheduled.py b/posthog/tasks/scheduled.py index df765689fb362..149a00eb11831 100644 --- a/posthog/tasks/scheduled.py +++ b/posthog/tasks/scheduled.py @@ -15,7 +15,6 @@ calculate_decide_usage, calculate_replay_embeddings, check_async_migration_health, - check_data_import_row_limits, check_flags_to_rollback, clean_stale_partials, clear_clickhouse_deleted_person, @@ -315,13 +314,7 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None: name="delete expired exported assets", ) - sender.add_periodic_task( - crontab(minute="*/20"), - check_data_import_row_limits.s(), - name="check external data rows synced", - ) # Every 20 minutes try to retrieve and calculate total rows synced in period - sender.add_periodic_task( crontab(minute="*/20"), calculate_external_data_rows_synced.s(), diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 5c4b085072287..f2bf138177a35 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -890,16 +890,6 @@ def ee_persist_finished_recordings() -> None: persist_finished_recordings() -@shared_task(ignore_result=True) -def check_data_import_row_limits() -> None: - try: - from posthog.tasks.warehouse import check_synced_row_limits - except ImportError: - pass - else: - check_synced_row_limits() - - # this task runs a CH query and triggers other tasks # it can run on the default queue @shared_task(ignore_result=True) diff --git a/posthog/tasks/test/test_warehouse.py b/posthog/tasks/test/test_warehouse.py index 8a83cad234d88..c6150ef565336 100644 --- a/posthog/tasks/test/test_warehouse.py +++ b/posthog/tasks/test/test_warehouse.py @@ -1,12 +1,11 @@ from posthog.test.base import APIBaseTest from unittest.mock import patch, MagicMock from posthog.tasks.warehouse import ( - check_synced_row_limits_of_team, capture_workspace_rows_synced_by_team, validate_data_warehouse_table_columns, capture_external_data_rows_synced, ) -from posthog.warehouse.models import ExternalDataSource, ExternalDataJob, ExternalDataSchema +from posthog.warehouse.models import ExternalDataSource, ExternalDataJob from freezegun import freeze_time import datetime @@ -14,107 +13,6 @@ class TestWarehouse(APIBaseTest): - @patch("posthog.tasks.warehouse.MONTHLY_LIMIT", 100) - @patch("posthog.tasks.warehouse.cancel_external_data_workflow") - @patch("posthog.tasks.warehouse.pause_external_data_schedule") - @patch("ee.billing.quota_limiting.list_limited_team_attributes") - def test_check_synced_row_limits_of_team_monthly_limit( - self, - list_limited_team_attributes_mock: MagicMock, - pause_schedule_mock: MagicMock, - cancel_workflow_mock: MagicMock, - ) -> None: - list_limited_team_attributes_mock.return_value = [] - - source = ExternalDataSource.objects.create( - source_id="test_id", - connection_id="fake connectino_id", - destination_id="fake destination_id", - team=self.team, - status="Running", - source_type="Stripe", - ) - - schema = ExternalDataSchema.objects.create( - source=source, - name="test_schema", - team=self.team, - status="Running", - ) - - job = ExternalDataJob.objects.create( - pipeline=source, - workflow_id="fake_workflow_id", - team=self.team, - status="Running", - rows_synced=100000, - schema=schema, - ) - - check_synced_row_limits_of_team(self.team.pk) - - source.refresh_from_db() - self.assertEqual(source.status, ExternalDataSource.Status.PAUSED) - - schema.refresh_from_db() - self.assertEqual(schema.status, ExternalDataSchema.Status.PAUSED) - - job.refresh_from_db() - self.assertEqual(job.status, ExternalDataJob.Status.CANCELLED) - - self.assertEqual(pause_schedule_mock.call_count, 1) - self.assertEqual(cancel_workflow_mock.call_count, 1) - - @patch("posthog.tasks.warehouse.cancel_external_data_workflow") - @patch("posthog.tasks.warehouse.pause_external_data_schedule") - @patch("ee.billing.quota_limiting.list_limited_team_attributes") - def test_check_synced_row_limits_of_team( - self, - list_limited_team_attributes_mock: MagicMock, - pause_schedule_mock: MagicMock, - cancel_workflow_mock: MagicMock, - ) -> None: - list_limited_team_attributes_mock.return_value = [self.team.api_token] - - source = ExternalDataSource.objects.create( - source_id="test_id", - connection_id="fake connectino_id", - destination_id="fake destination_id", - team=self.team, - status="Running", - source_type="Stripe", - ) - - schema = ExternalDataSchema.objects.create( - source=source, - name="test_schema", - team=self.team, - status="Running", - ) - - job = ExternalDataJob.objects.create( - pipeline=source, - workflow_id="fake_workflow_id", - team=self.team, - status="Running", - rows_synced=100000, - schema=schema, - ) - - check_synced_row_limits_of_team(self.team.pk) - - source.refresh_from_db() - self.assertEqual(source.status, ExternalDataSource.Status.PAUSED) - - schema.refresh_from_db() - self.assertEqual(schema.status, ExternalDataSchema.Status.PAUSED) - - job.refresh_from_db() - self.assertEqual(job.status, ExternalDataJob.Status.CANCELLED) - - self.assertEqual(pause_schedule_mock.call_count, 1) - self.assertEqual(cancel_workflow_mock.call_count, 1) - @patch("posthog.tasks.warehouse.get_ph_client") @patch( "posthog.tasks.warehouse.DEFAULT_DATE_TIME", diff --git a/posthog/tasks/warehouse.py b/posthog/tasks/warehouse.py index 518d4fc3027c4..3e75017ca81b4 100644 --- a/posthog/tasks/warehouse.py +++ b/posthog/tasks/warehouse.py @@ -3,20 +3,13 @@ import structlog from celery import shared_task -from posthog.warehouse.data_load.service import ( - cancel_external_data_workflow, - pause_external_data_schedule, - unpause_external_data_schedule, -) -from posthog.warehouse.models import ExternalDataJob, ExternalDataSource, ExternalDataSchema +from posthog.warehouse.models import ExternalDataJob, ExternalDataSource from posthog.ph_client import get_ph_client from posthog.models import Team from django.db.models import Q logger = structlog.get_logger(__name__) -MONTHLY_LIMIT = 500_000_000 - # TODO: adjust to whenever billing officially starts DEFAULT_DATE_TIME = datetime.datetime(2024, 6, 1, tzinfo=datetime.UTC) @@ -32,89 +25,6 @@ def capture_external_data_rows_synced() -> None: capture_workspace_rows_synced_by_team.delay(team_id) -def check_synced_row_limits() -> None: - team_ids = ExternalDataSource.objects.values_list("team", flat=True) - for team_id in team_ids: - check_synced_row_limits_of_team.delay(team_id) - - -@shared_task(ignore_result=True) -def check_synced_row_limits_of_team(team_id: int) -> None: - logger.info("Checking synced row limits of team", team_id=team_id) - team_model = Team.objects.get(pk=team_id) - - from ee.billing.quota_limiting import list_limited_team_attributes, QuotaResource, QuotaLimitingCaches - - # TODO: temp workaround. Should use team ids directly instead of tokens - limited_team_tokens_rows_synced = list_limited_team_attributes( - QuotaResource.ROWS_SYNCED, QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY - ) - - # TODO: Remove once billing logic is fully through - start_of_month = datetime.datetime.now().replace(day=1, hour=0, minute=0, second=0, microsecond=0) - rows_synced_list = [ - x - for x in ExternalDataJob.objects.filter(team_id=team_id, created_at__gte=start_of_month).values_list( - "rows_synced", flat=True - ) - if x - ] - total_rows_synced = sum(rows_synced_list) - - if team_model.api_token in limited_team_tokens_rows_synced or total_rows_synced > MONTHLY_LIMIT: - # stop active jobs - running_jobs = ExternalDataJob.objects.filter(team_id=team_id, status=ExternalDataJob.Status.RUNNING) - for job in running_jobs: - try: - cancel_external_data_workflow(job.workflow_id) - except Exception as e: - logger.exception("Could not cancel external data workflow", exc_info=e) - - try: - pause_external_data_schedule(str(job.pipeline.id)) - except Exception as e: - logger.exception("Could not pause external data schedule", exc_info=e) - - job.status = ExternalDataJob.Status.CANCELLED - job.save() - - job.pipeline.status = ExternalDataSource.Status.PAUSED - job.pipeline.save() - - if job.schema: - job.schema.status = ExternalDataSchema.Status.PAUSED - job.schema.save() - - # pause active schemas - all_schemas = ExternalDataSchema.objects.filter( - team_id=team_id, status__in=[ExternalDataSchema.Status.COMPLETED, ExternalDataSchema.Status.RUNNING] - ) - for schema in all_schemas: - try: - pause_external_data_schedule(str(schema.id)) - except Exception as e: - logger.exception("Could not pause external data schedule", exc_info=e) - - schema.status = ExternalDataSchema.Status.PAUSED - schema.save() - - schema.source.status = ExternalDataSource.Status.PAUSED - schema.source.save() - else: - all_schemas = ExternalDataSchema.objects.filter(team_id=team_id, status=ExternalDataSchema.Status.PAUSED) - for schema in all_schemas: - try: - unpause_external_data_schedule(str(schema.id)) - except Exception as e: - logger.exception("Could not unpause external data schedule", exc_info=e) - - schema.status = ExternalDataSchema.Status.COMPLETED - schema.save() - - schema.source.status = ExternalDataSource.Status.RUNNING - schema.source.save() - - @shared_task(ignore_result=True) def capture_workspace_rows_synced_by_team(team_id: int) -> None: ph_client = get_ph_client() diff --git a/posthog/temporal/data_imports/__init__.py b/posthog/temporal/data_imports/__init__.py index 2b162efa4c538..e945b483d8e95 100644 --- a/posthog/temporal/data_imports/__init__.py +++ b/posthog/temporal/data_imports/__init__.py @@ -5,6 +5,7 @@ import_data_activity, update_external_data_job_model, check_schedule_activity, + check_billing_limits_activity, ) WORKFLOWS = [ExternalDataJobWorkflow] @@ -15,4 +16,5 @@ import_data_activity, create_source_templates, check_schedule_activity, + check_billing_limits_activity, ] diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 76ca85db9be5f..15b0e6a08db1c 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -1,14 +1,16 @@ import dataclasses import datetime as dt import json -import uuid -from asgiref.sync import sync_to_async from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy # TODO: remove dependency from posthog.temporal.batch_exports.base import PostHogWorkflow +from posthog.temporal.data_imports.workflow_activities.check_billing_limits import ( + CheckBillingLimitsActivityInputs, + check_billing_limits_activity, +) from posthog.temporal.utils import ExternalDataWorkflowInputs from posthog.temporal.data_imports.workflow_activities.create_job_model import ( CreateExternalDataJobModelActivityInputs, @@ -24,7 +26,7 @@ from posthog.warehouse.data_load.source_templates import create_warehouse_templates_for_source from posthog.warehouse.external_data_source.jobs import ( - update_external_job_status, + aupdate_external_job_status, ) from posthog.warehouse.models import ( ExternalDataJob, @@ -67,8 +69,8 @@ async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInpu logger.info("Schema has a non-retryable error - turning off syncing") await aupdate_should_sync(schema_id=inputs.schema_id, team_id=inputs.team_id, should_sync=False) - await sync_to_async(update_external_job_status)( - run_id=uuid.UUID(inputs.id), + await aupdate_external_job_status( + job_id=inputs.id, status=inputs.status, latest_error=inputs.latest_error, team_id=inputs.team_id, @@ -151,7 +153,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs): ) # TODO: split out the creation of the external data job model from schema getting to seperate out exception handling - run_id, incremental = await workflow.execute_activity( + job_id, incremental = await workflow.execute_activity( create_external_data_job_model_activity, create_external_data_job_inputs, start_to_close_timeout=dt.timedelta(minutes=1), @@ -163,9 +165,24 @@ async def run(self, inputs: ExternalDataWorkflowInputs): ), ) + # Check billing limits + hit_billing_limit = await workflow.execute_activity( + check_billing_limits_activity, + CheckBillingLimitsActivityInputs(job_id=job_id, team_id=inputs.team_id), + start_to_close_timeout=dt.timedelta(minutes=1), + retry_policy=RetryPolicy( + initial_interval=dt.timedelta(seconds=10), + maximum_interval=dt.timedelta(seconds=60), + maximum_attempts=3, + ), + ) + + if hit_billing_limit: + return + update_inputs = UpdateExternalDataJobStatusInputs( - id=run_id, - run_id=run_id, + id=job_id, + run_id=job_id, status=ExternalDataJob.Status.COMPLETED, latest_error=None, internal_error=None, @@ -176,7 +193,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs): try: job_inputs = ImportDataActivityInputs( team_id=inputs.team_id, - run_id=run_id, + run_id=job_id, schema_id=inputs.external_data_schema_id, source_id=inputs.external_data_source_id, ) @@ -197,17 +214,13 @@ async def run(self, inputs: ExternalDataWorkflowInputs): # Create source templates await workflow.execute_activity( create_source_templates, - CreateSourceTemplateInputs(team_id=inputs.team_id, run_id=run_id), + CreateSourceTemplateInputs(team_id=inputs.team_id, run_id=job_id), start_to_close_timeout=dt.timedelta(minutes=10), retry_policy=RetryPolicy(maximum_attempts=2), ) except exceptions.ActivityError as e: - if isinstance(e.cause, exceptions.CancelledError): - update_inputs.status = ExternalDataJob.Status.CANCELLED - else: - update_inputs.status = ExternalDataJob.Status.FAILED - + update_inputs.status = ExternalDataJob.Status.FAILED update_inputs.internal_error = str(e.cause) update_inputs.latest_error = str(e.cause) raise diff --git a/posthog/temporal/data_imports/pipelines/helpers.py b/posthog/temporal/data_imports/pipelines/helpers.py index 776b7f8dd0582..d0cc153f4e11d 100644 --- a/posthog/temporal/data_imports/pipelines/helpers.py +++ b/posthog/temporal/data_imports/pipelines/helpers.py @@ -5,15 +5,6 @@ from posthog.warehouse.util import database_sync_to_async -async def is_job_cancelled( - team_id: int, - job_id: str, -) -> bool: - model = await aget_external_data_job(team_id, job_id) - - return model.status == ExternalDataJob.Status.CANCELLED - - @database_sync_to_async def aget_external_data_job(team_id, job_id): return ExternalDataJob.objects.get(id=job_id, team_id=team_id) diff --git a/posthog/temporal/data_imports/pipelines/rest_source/__init__.py b/posthog/temporal/data_imports/pipelines/rest_source/__init__.py index 5a5d8e8d09ab0..5dceafd1d2aec 100644 --- a/posthog/temporal/data_imports/pipelines/rest_source/__init__.py +++ b/posthog/temporal/data_imports/pipelines/rest_source/__init__.py @@ -23,7 +23,6 @@ from dlt.sources.helpers.rest_client.paginators import BasePaginator from dlt.sources.helpers.rest_client.typing import HTTPMethodBasic -from posthog.temporal.data_imports.pipelines.helpers import is_job_cancelled from .typing import ( ClientConfig, ResolvedParam, @@ -259,9 +258,6 @@ async def paginate_resource( ) -> AsyncGenerator[Iterator[Any], Any]: yield dlt.mark.materialize_table_schema() # type: ignore - if await is_job_cancelled(team_id=team_id, job_id=job_id): - return - if incremental_object: params = _set_incremental_params( params, @@ -315,9 +311,6 @@ async def paginate_dependent_resource( ) -> AsyncGenerator[Any, Any]: yield dlt.mark.materialize_table_schema() # type: ignore - if await is_job_cancelled(team_id=team_id, job_id=job_id): - return - if incremental_object: params = _set_incremental_params( params, diff --git a/posthog/temporal/data_imports/workflow_activities/check_billing_limits.py b/posthog/temporal/data_imports/workflow_activities/check_billing_limits.py new file mode 100644 index 0000000000000..3bce57ef74891 --- /dev/null +++ b/posthog/temporal/data_imports/workflow_activities/check_billing_limits.py @@ -0,0 +1,41 @@ +import dataclasses +from temporalio import activity + +from asgiref.sync import sync_to_async + +from ee.billing.quota_limiting import QuotaLimitingCaches, QuotaResource, list_limited_team_attributes +from posthog.models.team.team import Team +from posthog.temporal.common.logger import bind_temporal_worker_logger +from posthog.warehouse.external_data_source.jobs import aupdate_external_job_status +from posthog.warehouse.models.external_data_job import ExternalDataJob + + +@dataclasses.dataclass +class CheckBillingLimitsActivityInputs: + team_id: int + job_id: str + + +@activity.defn +async def check_billing_limits_activity(inputs: CheckBillingLimitsActivityInputs) -> bool: + logger = await bind_temporal_worker_logger(team_id=inputs.team_id) + + team: Team = await sync_to_async(Team.objects.get)(id=inputs.team_id) + + limited_team_tokens_rows_synced = list_limited_team_attributes( + QuotaResource.ROWS_SYNCED, QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY + ) + + if team.api_token in limited_team_tokens_rows_synced: + logger.info("Billing limits hit. Canceling sync") + + await aupdate_external_job_status( + job_id=inputs.job_id, + status=ExternalDataJob.Status.CANCELLED, + latest_error=None, + team_id=inputs.team_id, + ) + + return True + + return False diff --git a/posthog/temporal/data_imports/workflow_activities/create_job_model.py b/posthog/temporal/data_imports/workflow_activities/create_job_model.py index 21f5e046d1a28..5ddcf16160a2c 100644 --- a/posthog/temporal/data_imports/workflow_activities/create_job_model.py +++ b/posthog/temporal/data_imports/workflow_activities/create_job_model.py @@ -32,7 +32,7 @@ async def create_external_data_job_model_activity(inputs: CreateExternalDataJobM logger = await bind_temporal_worker_logger(team_id=inputs.team_id) try: - run = await sync_to_async(create_external_data_job)( + job = await sync_to_async(create_external_data_job)( team_id=inputs.team_id, external_data_source_id=inputs.source_id, external_data_schema_id=inputs.schema_id, @@ -108,7 +108,7 @@ async def create_external_data_job_model_activity(inputs: CreateExternalDataJobM if schema_model is None: raise ValueError(f"Schema with ID {inputs.schema_id} not found") - return str(run.id), schema_model.is_incremental + return str(job.id), schema_model.is_incremental except Exception as e: logger.exception( f"External data job failed on create_external_data_job_model_activity for {str(inputs.source_id)} with error: {e}" diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 0292fe2d83f52..054f5f4a3c471 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -685,3 +685,46 @@ async def test_postgres_schema_evolution(team, postgres_config, postgres_connect assert any(x == "new_col" for x in columns) assert any(x == "_dlt_id" for x in columns) assert any(x == "_dlt_load_id" for x in columns) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_billing_limits(team, stripe_customer): + source = await sync_to_async(ExternalDataSource.objects.create)( + source_id=uuid.uuid4(), + connection_id=uuid.uuid4(), + destination_id=uuid.uuid4(), + team=team, + status="running", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, + ) + + schema = await sync_to_async(ExternalDataSchema.objects.create)( + name="Customer", + team_id=team.pk, + source_id=source.pk, + sync_type=ExternalDataSchema.SyncType.FULL_REFRESH, + sync_type_config={}, + ) + + workflow_id = str(uuid.uuid4()) + inputs = ExternalDataWorkflowInputs( + team_id=team.id, + external_data_source_id=source.pk, + external_data_schema_id=schema.id, + ) + + with mock.patch( + "posthog.temporal.data_imports.workflow_activities.check_billing_limits.list_limited_team_attributes", + ) as mock_list_limited_team_attributes: + mock_list_limited_team_attributes.return_value = [team.api_token] + + await _execute_run(workflow_id, inputs, stripe_customer["data"]) + + job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.get)(team_id=team.id, schema_id=schema.pk) + + assert job.status == ExternalDataJob.Status.CANCELLED + + with pytest.raises(Exception): + await sync_to_async(execute_hogql_query)("SELECT * FROM stripe_customer", team) diff --git a/posthog/temporal/tests/external_data/test_external_data_job.py b/posthog/temporal/tests/external_data/test_external_data_job.py index 0c3e1b0001e57..93630571c3a7a 100644 --- a/posthog/temporal/tests/external_data/test_external_data_job.py +++ b/posthog/temporal/tests/external_data/test_external_data_job.py @@ -15,12 +15,12 @@ ExternalDataJobWorkflow, ExternalDataWorkflowInputs, ) +from posthog.temporal.data_imports.workflow_activities.check_billing_limits import check_billing_limits_activity from posthog.temporal.data_imports.workflow_activities.create_job_model import ( CreateExternalDataJobModelActivityInputs, create_external_data_job_model_activity, ) from posthog.temporal.data_imports.workflow_activities.import_data import ImportDataActivityInputs, import_data_activity -from posthog.temporal.tests.data_imports.conftest import stripe_customer from posthog.warehouse.external_data_source.jobs import create_external_data_job from posthog.warehouse.models import ( get_latest_run_if_exists, @@ -539,112 +539,6 @@ def mock_to_object_store_rs_credentials(class_self): assert len(job_2_charge_objects["Contents"]) == 2 -@pytest.mark.django_db(transaction=True) -@pytest.mark.asyncio -async def test_run_stripe_job_cancelled(activity_environment, team, minio_client, **kwargs): - async def setup_job_1(): - new_source = await sync_to_async(ExternalDataSource.objects.create)( - source_id=uuid.uuid4(), - connection_id=uuid.uuid4(), - destination_id=uuid.uuid4(), - team=team, - status="running", - source_type="Stripe", - job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, - ) - - customer_schema = await _create_schema("Customer", new_source, team) - - # Already canceled so it should only run once - # This imitates if the job was canceled mid run - new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)( - team_id=team.id, - pipeline_id=new_source.pk, - status=ExternalDataJob.Status.CANCELLED, - rows_synced=0, - schema=customer_schema, - ) - - new_job = await sync_to_async( - ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").prefetch_related("schema").get - )() - - inputs = ImportDataActivityInputs( - team_id=team.id, - run_id=str(new_job.pk), - source_id=new_source.pk, - schema_id=customer_schema.id, - ) - - return new_job, inputs - - job_1, job_1_inputs = await setup_job_1() - - def mock_customers_paginate( - class_self, - path: str = "", - method: Any = "GET", - params: Optional[dict[str, Any]] = None, - json: Optional[dict[str, Any]] = None, - auth: Optional[Any] = None, - paginator: Optional[Any] = None, - data_selector: Optional[Any] = None, - hooks: Optional[Any] = None, - ): - return iter(stripe_customer()["data"]) - - def mock_to_session_credentials(class_self): - return { - "aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID, - "aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, - "endpoint_url": settings.OBJECT_STORAGE_ENDPOINT, - "aws_session_token": None, - "AWS_ALLOW_HTTP": "true", - "AWS_S3_ALLOW_UNSAFE_RENAME": "true", - } - - def mock_to_object_store_rs_credentials(class_self): - return { - "aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID, - "aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, - "endpoint_url": settings.OBJECT_STORAGE_ENDPOINT, - "region": "us-east-1", - "AWS_ALLOW_HTTP": "true", - "AWS_S3_ALLOW_UNSAFE_RENAME": "true", - } - - with ( - mock.patch.object(RESTClient, "paginate", mock_customers_paginate), - override_settings( - BUCKET_URL=f"s3://{BUCKET_NAME}", - AIRBYTE_BUCKET_KEY=settings.OBJECT_STORAGE_ACCESS_KEY_ID, - AIRBYTE_BUCKET_SECRET=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, - AIRBYTE_BUCKET_REGION="us-east-1", - BUCKET_NAME=BUCKET_NAME, - ), - mock.patch( - "posthog.warehouse.models.table.DataWarehouseTable.get_columns", - return_value={"clickhouse": {"id": "string", "name": "string"}}, - ), - mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials), - mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials), - ): - await asyncio.gather( - activity_environment.run(import_data_activity, job_1_inputs), - ) - - folder_path = await sync_to_async(job_1.folder_path)() - job_1_customer_objects = await minio_client.list_objects_v2( - Bucket=BUCKET_NAME, Prefix=f"{folder_path}/customer/" - ) - - # if job was not canceled, this job would run indefinitely - assert len(job_1_customer_objects.get("Contents", [])) == 1 - - await sync_to_async(job_1.refresh_from_db)() - assert job_1.rows_synced == 0 - - @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_run_stripe_job_row_count_update(activity_environment, team, minio_client, **kwargs): @@ -803,6 +697,7 @@ async def mock_async_func(inputs): update_external_data_job_model, import_data_activity, create_source_templates, + check_billing_limits_activity, ], workflow_runner=UnsandboxedWorkflowRunner(), ): diff --git a/posthog/warehouse/api/external_data_schema.py b/posthog/warehouse/api/external_data_schema.py index 154fd848ff524..3f98702db64f7 100644 --- a/posthog/warehouse/api/external_data_schema.py +++ b/posthog/warehouse/api/external_data_schema.py @@ -46,6 +46,7 @@ class ExternalDataSchemaSerializer(serializers.ModelSerializer): incremental_field = serializers.SerializerMethodField(read_only=True) incremental_field_type = serializers.SerializerMethodField(read_only=True) sync_frequency = serializers.SerializerMethodField(read_only=True) + status = serializers.SerializerMethodField(read_only=True) class Meta: model = ExternalDataSchema @@ -74,6 +75,12 @@ class Meta: "status", ] + def get_status(self, schema: ExternalDataSchema) -> str | None: + if schema.status == ExternalDataSchema.Status.CANCELLED: + return "Billing limits" + + return schema.status + def get_incremental(self, schema: ExternalDataSchema) -> bool: return schema.is_incremental diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 54a3960f0960a..2e3f66de9c630 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -89,6 +89,7 @@ def get_generic_sql_error(source_type: ExternalDataSource.Type): class ExternalDataJobSerializers(serializers.ModelSerializer): schema = serializers.SerializerMethodField(read_only=True) + status = serializers.SerializerMethodField(read_only=True) class Meta: model = ExternalDataJob @@ -113,6 +114,12 @@ class Meta: "workflow_run_id", ] + def get_status(self, instance: ExternalDataJob): + if instance.status == ExternalDataJob.Status.CANCELLED: + return "Billing limits" + + return instance.status + def get_schema(self, instance: ExternalDataJob): return SimpleExternalDataSchemaSerializer( instance.schema, many=False, read_only=True, context=self.context @@ -167,7 +174,7 @@ def get_status(self, instance: ExternalDataSource) -> str: if any_failures: return ExternalDataSchema.Status.ERROR elif any_cancelled: - return ExternalDataSchema.Status.CANCELLED + return "Billing limits" elif any_paused: return ExternalDataSchema.Status.PAUSED elif any_running: diff --git a/posthog/warehouse/external_data_source/jobs.py b/posthog/warehouse/external_data_source/jobs.py index 0bfbd95760391..27d2ae22b6b8d 100644 --- a/posthog/warehouse/external_data_source/jobs.py +++ b/posthog/warehouse/external_data_source/jobs.py @@ -1,5 +1,5 @@ from uuid import UUID - +from posthog.warehouse.util import database_sync_to_async from posthog.warehouse.models.external_data_job import ExternalDataJob from posthog.warehouse.models.external_data_schema import ExternalDataSchema from posthog.warehouse.models.external_data_source import ExternalDataSource @@ -29,16 +29,19 @@ def create_external_data_job( return job -def update_external_job_status(run_id: UUID, team_id: int, status: str, latest_error: str | None) -> ExternalDataJob: - model = ExternalDataJob.objects.get(id=run_id, team_id=team_id) +@database_sync_to_async +def aupdate_external_job_status( + job_id: str, team_id: int, status: ExternalDataJob.Status, latest_error: str | None +) -> ExternalDataJob: + model = ExternalDataJob.objects.get(id=job_id, team_id=team_id) model.status = status model.latest_error = latest_error model.save() if status == ExternalDataJob.Status.FAILED: - schema_status = ExternalDataSchema.Status.ERROR + schema_status: ExternalDataSchema.Status = ExternalDataSchema.Status.ERROR else: - schema_status = status + schema_status = status # type: ignore schema = ExternalDataSchema.objects.get(id=model.schema_id, team_id=team_id) schema.status = schema_status