diff --git a/posthog/celery.py b/posthog/celery.py index 5256cee72bfad7..69d961edf8ffdb 100644 --- a/posthog/celery.py +++ b/posthog/celery.py @@ -342,12 +342,6 @@ def setup_periodic_tasks(sender: Celery, **kwargs): name="delete expired exported assets", ) - sender.add_periodic_task( - crontab(minute="*/10"), - sync_datawarehouse_sources.s(), - name="sync datawarehouse sources that have settled in s3 bucket", - ) - sender.add_periodic_task( crontab(minute="23", hour="*"), check_data_import_row_limits.s(), @@ -938,23 +932,6 @@ def calculate_decide_usage() -> None: ph_client.shutdown() -@app.task(ignore_result=True) -def calculate_external_data_rows_synced() -> None: - from django.db.models import Q - - from posthog.models import Team - from posthog.tasks.warehouse import ( - capture_workspace_rows_synced_by_team, - check_external_data_source_billing_limit_by_team, - ) - - for team in Team.objects.select_related("organization").exclude( - Q(organization__for_internal_metrics=True) | Q(is_demo=True) | Q(external_data_workspace_id__isnull=True) - ): - capture_workspace_rows_synced_by_team.delay(team.pk) - check_external_data_source_billing_limit_by_team.delay(team.pk) - - @app.task(ignore_result=True) def find_flags_with_enriched_analytics(): from datetime import datetime, timedelta @@ -1109,16 +1086,6 @@ def ee_persist_finished_recordings(): persist_finished_recordings() -@app.task(ignore_result=True) -def sync_datawarehouse_sources(): - try: - from posthog.tasks.warehouse import sync_resources - except ImportError: - pass - else: - sync_resources() - - @app.task(ignore_result=True) def check_data_import_row_limits(): try: diff --git a/posthog/tasks/test/test_warehouse.py b/posthog/tasks/test/test_warehouse.py index 01b5ac561f5dd2..9581e5af0284c8 100644 --- a/posthog/tasks/test/test_warehouse.py +++ b/posthog/tasks/test/test_warehouse.py @@ -1,172 +1,12 @@ from posthog.test.base import APIBaseTest -import datetime from unittest.mock import patch, MagicMock from posthog.tasks.warehouse import ( - _traverse_jobs_by_field, - capture_workspace_rows_synced_by_team, - check_external_data_source_billing_limit_by_team, check_synced_row_limits_of_team, ) from posthog.warehouse.models import ExternalDataSource, ExternalDataJob -from freezegun import freeze_time class TestWarehouse(APIBaseTest): - @patch("posthog.tasks.warehouse.send_request") - @freeze_time("2023-11-07") - def test_traverse_jobs_by_field(self, send_request_mock: MagicMock) -> None: - send_request_mock.return_value = { - "data": [ - { - "jobId": 5827835, - "status": "succeeded", - "jobType": "sync", - "startTime": "2023-11-07T16:50:49Z", - "connectionId": "fake", - "lastUpdatedAt": "2023-11-07T16:52:54Z", - "duration": "PT2M5S", - "rowsSynced": 93353, - }, - { - "jobId": 5783573, - "status": "succeeded", - "jobType": "sync", - "startTime": "2023-11-05T18:32:41Z", - "connectionId": "fake-2", - "lastUpdatedAt": "2023-11-05T18:35:11Z", - "duration": "PT2M30S", - "rowsSynced": 97747, - }, - ] - } - mock_capture = MagicMock() - response = _traverse_jobs_by_field(mock_capture, self.team, "fake-url", "rowsSynced") - - self.assertEqual( - response, - [ - {"count": 93353, "startTime": "2023-11-07T16:50:49Z"}, - {"count": 97747, "startTime": "2023-11-05T18:32:41Z"}, - ], - ) - - self.assertEqual(mock_capture.capture.call_count, 2) - mock_capture.capture.assert_called_with( - self.team.pk, - "external data sync job", - { - "count": 97747, - "workspace_id": self.team.external_data_workspace_id, - "team_id": self.team.pk, - "team_uuid": self.team.uuid, - "startTime": "2023-11-05T18:32:41Z", - "job_id": "5783573", - }, - ) - - @patch("posthog.tasks.warehouse._traverse_jobs_by_field") - @patch("posthog.tasks.warehouse.get_ph_client") - @freeze_time("2023-11-07") - def test_capture_workspace_rows_synced_by_team( - self, mock_capture: MagicMock, traverse_jobs_mock: MagicMock - ) -> None: - traverse_jobs_mock.return_value = [ - {"count": 97747, "startTime": "2023-11-05T18:32:41Z"}, - {"count": 93353, "startTime": "2023-11-07T16:50:49Z"}, - ] - - capture_workspace_rows_synced_by_team(self.team.pk) - - self.team.refresh_from_db() - self.assertEqual( - self.team.external_data_workspace_last_synced_at, - datetime.datetime(2023, 11, 7, 16, 50, 49, tzinfo=datetime.timezone.utc), - ) - - @patch("posthog.tasks.warehouse._traverse_jobs_by_field") - @patch("posthog.tasks.warehouse.get_ph_client") - @freeze_time("2023-11-07") - def test_capture_workspace_rows_synced_by_team_month_cutoff( - self, mock_capture: MagicMock, traverse_jobs_mock: MagicMock - ) -> None: - # external_data_workspace_last_synced_at unset - traverse_jobs_mock.return_value = [ - {"count": 93353, "startTime": "2023-11-07T16:50:49Z"}, - ] - - capture_workspace_rows_synced_by_team(self.team.pk) - - self.team.refresh_from_db() - self.assertEqual( - self.team.external_data_workspace_last_synced_at, - datetime.datetime(2023, 11, 7, 16, 50, 49, tzinfo=datetime.timezone.utc), - ) - - @patch("posthog.tasks.warehouse._traverse_jobs_by_field") - @patch("posthog.tasks.warehouse.get_ph_client") - @freeze_time("2023-11-07") - def test_capture_workspace_rows_synced_by_team_month_cutoff_field_set( - self, mock_capture: MagicMock, traverse_jobs_mock: MagicMock - ) -> None: - self.team.external_data_workspace_last_synced_at = datetime.datetime( - 2023, 10, 29, 18, 32, 41, tzinfo=datetime.timezone.utc - ) - self.team.save() - traverse_jobs_mock.return_value = [ - {"count": 97747, "startTime": "2023-10-30T18:32:41Z"}, - {"count": 93353, "startTime": "2023-11-07T16:50:49Z"}, - ] - - capture_workspace_rows_synced_by_team(self.team.pk) - - self.team.refresh_from_db() - self.assertEqual( - self.team.external_data_workspace_last_synced_at, - datetime.datetime(2023, 11, 7, 16, 50, 49, tzinfo=datetime.timezone.utc), - ) - - @patch("posthog.warehouse.external_data_source.connection.send_request") - @patch("ee.billing.quota_limiting.list_limited_team_attributes") - def test_external_data_source_billing_limit_deactivate( - self, usage_limit_mock: MagicMock, send_request_mock: MagicMock - ) -> None: - usage_limit_mock.return_value = [self.team.pk] - - external_source = ExternalDataSource.objects.create( - source_id="test_id", - connection_id="fake connectino_id", - destination_id="fake destination_id", - team=self.team, - status="running", - source_type="Stripe", - ) - - check_external_data_source_billing_limit_by_team(self.team.pk) - - external_source.refresh_from_db() - self.assertEqual(external_source.status, "inactive") - - @patch("posthog.warehouse.external_data_source.connection.send_request") - @patch("ee.billing.quota_limiting.list_limited_team_attributes") - def test_external_data_source_billing_limit_activate( - self, usage_limit_mock: MagicMock, send_request_mock: MagicMock - ) -> None: - usage_limit_mock.return_value = [] - - external_source = ExternalDataSource.objects.create( - source_id="test_id", - connection_id="fake connectino_id", - destination_id="fake destination_id", - team=self.team, - status="inactive", - source_type="Stripe", - ) - - check_external_data_source_billing_limit_by_team(self.team.pk) - - external_source.refresh_from_db() - self.assertEqual(external_source.status, "running") - @patch("posthog.tasks.warehouse.MONTHLY_LIMIT", 100) @patch("posthog.tasks.warehouse.cancel_external_data_workflow") @patch("posthog.tasks.warehouse.pause_external_data_schedule") diff --git a/posthog/tasks/warehouse.py b/posthog/tasks/warehouse.py index 5ab889fcd54a10..de48d10a28bdc6 100644 --- a/posthog/tasks/warehouse.py +++ b/posthog/tasks/warehouse.py @@ -1,176 +1,15 @@ -from django.conf import settings import datetime -from posthog.models import Team -from posthog.warehouse.external_data_source.client import send_request from posthog.warehouse.data_load.service import ( cancel_external_data_workflow, pause_external_data_schedule, unpause_external_data_schedule, ) -from posthog.warehouse.models import DataWarehouseCredential, DataWarehouseTable, ExternalDataSource, ExternalDataJob -from posthog.warehouse.external_data_source.connection import retrieve_sync -from urllib.parse import urlencode -from posthog.ph_client import get_ph_client -from typing import Any, Dict, List, TYPE_CHECKING +from posthog.warehouse.models import ExternalDataSource, ExternalDataJob from posthog.celery import app import structlog logger = structlog.get_logger(__name__) -AIRBYTE_JOBS_URL = "https://api.airbyte.com/v1/jobs" -DEFAULT_DATE_TIME = datetime.datetime(2023, 11, 7, tzinfo=datetime.timezone.utc) - -if TYPE_CHECKING: - from posthoganalytics import Posthog - - -def sync_resources() -> None: - resources = ExternalDataSource.objects.filter(are_tables_created=False, status__in=["running", "error"]) - - for resource in resources: - sync_resource.delay(resource.pk) - - -@app.task(ignore_result=True) -def sync_resource(resource_id: str) -> None: - resource = ExternalDataSource.objects.get(pk=resource_id) - - try: - job = retrieve_sync(resource.connection_id) - except Exception as e: - logger.exception("Data Warehouse: Sync Resource failed with an unexpected exception.", exc_info=e) - resource.status = "error" - resource.save() - return - - if job is None: - logger.error(f"Data Warehouse: No jobs found for connection: {resource.connection_id}") - resource.status = "error" - resource.save() - return - - if job["status"] == "succeeded": - resource = ExternalDataSource.objects.get(pk=resource_id) - credential, _ = DataWarehouseCredential.objects.get_or_create( - team_id=resource.team.pk, - access_key=settings.AIRBYTE_BUCKET_KEY, - access_secret=settings.AIRBYTE_BUCKET_SECRET, - ) - - data = { - "credential": credential, - "name": "stripe_customers", - "format": "Parquet", - "url_pattern": f"https://{settings.AIRBYTE_BUCKET_DOMAIN}/airbyte/{resource.team.pk}/customers/*.parquet", - "team_id": resource.team.pk, - } - - table = DataWarehouseTable(**data) - try: - table.columns = table.get_columns() - except Exception as e: - logger.exception( - f"Data Warehouse: Sync Resource failed with an unexpected exception for connection: {resource.connection_id}", - exc_info=e, - ) - else: - table.save() - - resource.are_tables_created = True - resource.status = job["status"] - resource.save() - - else: - resource.status = job["status"] - resource.save() - - -DEFAULT_USAGE_LIMIT = 1000000 -ROWS_PER_DOLLAR = 66666 # 1 million rows per $15 - - -@app.task(ignore_result=True, max_retries=2) -def check_external_data_source_billing_limit_by_team(team_id: int) -> None: - from posthog.warehouse.external_data_source.connection import deactivate_connection_by_id, activate_connection_by_id - from ee.billing.quota_limiting import list_limited_team_attributes, QuotaResource - - limited_teams_rows_synced = list_limited_team_attributes(QuotaResource.ROWS_SYNCED) - - team = Team.objects.get(pk=team_id) - all_active_connections = ExternalDataSource.objects.filter(team=team, status__in=["running", "succeeded"]) - all_inactive_connections = ExternalDataSource.objects.filter(team=team, status="inactive") - - # TODO: consider more boundaries - if team_id in limited_teams_rows_synced: - for connection in all_active_connections: - deactivate_connection_by_id(connection.connection_id) - connection.status = "inactive" - connection.save() - else: - for connection in all_inactive_connections: - activate_connection_by_id(connection.connection_id) - connection.status = "running" - connection.save() - - -@app.task(ignore_result=True, max_retries=2) -def capture_workspace_rows_synced_by_team(team_id: int) -> None: - ph_client = get_ph_client() - team = Team.objects.get(pk=team_id) - now = datetime.datetime.now(datetime.timezone.utc) - begin = team.external_data_workspace_last_synced_at or DEFAULT_DATE_TIME - - params = { - "workspaceIds": team.external_data_workspace_id, - "limit": 100, - "offset": 0, - "status": "succeeded", - "orderBy": "createdAt|ASC", - "updatedAtStart": begin.strftime("%Y-%m-%dT%H:%M:%SZ"), - "updatedAtEnd": now.strftime("%Y-%m-%dT%H:%M:%SZ"), - } - result_totals = _traverse_jobs_by_field(ph_client, team, AIRBYTE_JOBS_URL + "?" + urlencode(params), "rowsSynced") - - # TODO: check assumption that ordering is possible with API - team.external_data_workspace_last_synced_at = result_totals[-1]["startTime"] if result_totals else now - team.save() - - ph_client.shutdown() - - -def _traverse_jobs_by_field( - ph_client: "Posthog", team: Team, url: str, field: str, acc: List[Dict[str, Any]] = [] -) -> List[Dict[str, Any]]: - response = send_request(url, method="GET") - response_data = response.get("data", []) - response_next = response.get("next", None) - - for job in response_data: - acc.append( - { - "count": job[field], - "startTime": job["startTime"], - } - ) - ph_client.capture( - team.pk, - "external data sync job", - { - "count": job[field], - "workspace_id": team.external_data_workspace_id, - "team_id": team.pk, - "team_uuid": team.uuid, - "startTime": job["startTime"], - "job_id": str(job["jobId"]), - }, - ) - - if response_next: - return _traverse_jobs_by_field(ph_client, team, response_next, field, acc) - - return acc - - MONTHLY_LIMIT = 1_000_000 diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index cdb218c0cce31f..97c05ee2c4b168 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -218,7 +218,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs): await workflow.execute_activity( run_external_data_job, job_inputs, - start_to_close_timeout=dt.timedelta(minutes=90), + start_to_close_timeout=dt.timedelta(hours=4), retry_policy=RetryPolicy(maximum_attempts=5), heartbeat_timeout=dt.timedelta(minutes=1), )