diff --git a/posthog/celery.py b/posthog/celery.py index 1ec34f9e60ff1..cee6342c5ede7 100644 --- a/posthog/celery.py +++ b/posthog/celery.py @@ -27,7 +27,8 @@ from posthog.cloud_utils import is_cloud from posthog.metrics import pushed_metrics_registry from posthog.redis import get_client -from posthog.utils import get_crontab, get_instance_region +from posthog.utils import get_crontab +from posthog.ph_client import get_ph_client # set the default Django settings module for the 'celery' program. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "posthog.settings") @@ -333,6 +334,13 @@ def setup_periodic_tasks(sender: Celery, **kwargs): name="sync datawarehouse sources that have settled in s3 bucket", ) + # Every 2 hours try to retrieve and calculate total rows synced in period + sender.add_periodic_task( + crontab(hour="*/2"), + calculate_external_data_rows_synced.s(), + name="calculate external data rows synced", + ) + # Set up clickhouse query instrumentation @task_prerun.connect @@ -900,38 +908,13 @@ def debug_task(self): print(f"Request: {self.request!r}") -def _get_ph_client(): - from posthoganalytics import Posthog - - if not is_cloud(): - return - - # send EU data to EU, US data to US - api_key = None - host = None - region = get_instance_region() - if region == "EU": - api_key = "phc_dZ4GK1LRjhB97XozMSkEwPXx7OVANaJEwLErkY1phUF" - host = "https://eu.posthog.com" - elif region == "US": - api_key = "sTMFPsFhdP1Ssg" - host = "https://app.posthog.com" - - if not api_key: - return - - ph_client = Posthog(api_key, host=host) - - return ph_client - - @app.task(ignore_result=True) def calculate_decide_usage() -> None: from django.db.models import Q from posthog.models import Team from posthog.models.feature_flag.flag_analytics import capture_team_decide_usage - ph_client = _get_ph_client() + ph_client = get_ph_client() for team in Team.objects.select_related("organization").exclude( Q(organization__for_internal_metrics=True) | Q(is_demo=True) @@ -947,14 +930,10 @@ def calculate_external_data_rows_synced() -> None: from posthog.models import Team from posthog.tasks.warehouse import calculate_workspace_rows_synced_by_team - ph_client = _get_ph_client() - for team in Team.objects.select_related("organization").exclude( Q(organization__for_internal_metrics=True) | Q(is_demo=True) | Q(external_data_workspace_id__isnull=True) ): - calculate_workspace_rows_synced_by_team(ph_client, team.pk) - - ph_client.shutdown() + calculate_workspace_rows_synced_by_team.delay(team.pk) @app.task(ignore_result=True) diff --git a/posthog/ph_client.py b/posthog/ph_client.py new file mode 100644 index 0000000000000..e81161a59d470 --- /dev/null +++ b/posthog/ph_client.py @@ -0,0 +1,27 @@ +from posthog.utils import get_instance_region +from posthog.cloud_utils import is_cloud + + +def get_ph_client(): + from posthoganalytics import Posthog + + if not is_cloud(): + return + + # send EU data to EU, US data to US + api_key = None + host = None + region = get_instance_region() + if region == "EU": + api_key = "phc_dZ4GK1LRjhB97XozMSkEwPXx7OVANaJEwLErkY1phUF" + host = "https://eu.posthog.com" + elif region == "US": + api_key = "sTMFPsFhdP1Ssg" + host = "https://app.posthog.com" + + if not api_key: + return + + ph_client = Posthog(api_key, host=host) + + return ph_client diff --git a/posthog/tasks/test/test_warehouse.py b/posthog/tasks/test/test_warehouse.py index 0f936995b4c50..dad42a0b95675 100644 --- a/posthog/tasks/test/test_warehouse.py +++ b/posthog/tasks/test/test_warehouse.py @@ -52,15 +52,15 @@ def test_traverse_jobs_by_field(self, send_request_mock): ) @patch("posthog.tasks.warehouse._traverse_jobs_by_field") + @patch("posthog.tasks.warehouse.get_ph_client") @freeze_time("2023-11-07") - def test_calculate_workspace_rows_synced_by_team(self, traverse_jobs_mock): + def test_calculate_workspace_rows_synced_by_team(self, mock_capture, traverse_jobs_mock): traverse_jobs_mock.return_value = [ {"count": 97747, "startTime": "2023-11-05T18:32:41Z"}, {"count": 93353, "startTime": "2023-11-07T16:50:49Z"}, ] - mock_capture = MagicMock() - calculate_workspace_rows_synced_by_team(mock_capture, self.team.pk) + calculate_workspace_rows_synced_by_team(self.team.pk) self.team.refresh_from_db() self.assertEqual(self.team.external_data_workspace_rows_synced_in_month, 191100) @@ -70,16 +70,16 @@ def test_calculate_workspace_rows_synced_by_team(self, traverse_jobs_mock): ) @patch("posthog.tasks.warehouse._traverse_jobs_by_field") + @patch("posthog.tasks.warehouse.get_ph_client") @freeze_time("2023-11-07") - def test_calculate_workspace_rows_synced_by_team_month_cutoff(self, traverse_jobs_mock): + def test_calculate_workspace_rows_synced_by_team_month_cutoff(self, mock_capture, traverse_jobs_mock): # external_data_workspace_last_synced unset traverse_jobs_mock.return_value = [ {"count": 97747, "startTime": "2023-10-30T18:32:41Z"}, {"count": 93353, "startTime": "2023-11-07T16:50:49Z"}, ] - mock_capture = MagicMock() - calculate_workspace_rows_synced_by_team(mock_capture, self.team.pk) + calculate_workspace_rows_synced_by_team(self.team.pk) self.team.refresh_from_db() self.assertEqual(self.team.external_data_workspace_rows_synced_in_month, 93353) @@ -89,8 +89,9 @@ def test_calculate_workspace_rows_synced_by_team_month_cutoff(self, traverse_job ) @patch("posthog.tasks.warehouse._traverse_jobs_by_field") + @patch("posthog.tasks.warehouse.get_ph_client") @freeze_time("2023-11-07") - def test_calculate_workspace_rows_synced_by_team_month_cutoff_field_set(self, traverse_jobs_mock): + def test_calculate_workspace_rows_synced_by_team_month_cutoff_field_set(self, mock_capture, traverse_jobs_mock): self.team.external_data_workspace_last_synced = datetime.datetime( 2023, 10, 29, 18, 32, 41, tzinfo=datetime.timezone.utc ) @@ -99,8 +100,7 @@ def test_calculate_workspace_rows_synced_by_team_month_cutoff_field_set(self, tr {"count": 93353, "startTime": "2023-11-07T16:50:49Z"}, ] - mock_capture = MagicMock() - calculate_workspace_rows_synced_by_team(mock_capture, self.team.pk) + calculate_workspace_rows_synced_by_team(self.team.pk) self.team.refresh_from_db() self.assertEqual(self.team.external_data_workspace_rows_synced_in_month, 93353) diff --git a/posthog/tasks/warehouse.py b/posthog/tasks/warehouse.py index 9d6e42168ee7f..903e6cd335d1c 100644 --- a/posthog/tasks/warehouse.py +++ b/posthog/tasks/warehouse.py @@ -2,14 +2,18 @@ from posthog.models import Team from posthog.warehouse.external_data_source.client import send_request from urllib.parse import urlencode +from posthog.ph_client import get_ph_client + +from posthog.celery import app AIRBYTE_JOBS_URL = "https://api.airbyte.com/v1/jobs" DEFAULT_DATE_TIME = datetime.datetime(2023, 11, 7, tzinfo=datetime.timezone.utc) -# TODO: split these into their own tasks -def calculate_workspace_rows_synced_by_team(ph_client, team_id): +@app.task(ignore_result=True, max_retries=2) +def calculate_workspace_rows_synced_by_team(team_id): + ph_client = get_ph_client() team = Team.objects.get(pk=team_id) now = datetime.datetime.now(datetime.timezone.utc) begin = team.external_data_workspace_last_synced or now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) @@ -51,6 +55,8 @@ def calculate_workspace_rows_synced_by_team(ph_client, team_id): team.external_data_workspace_rows_synced_in_month = total team.save() + ph_client.shutdown() + def _traverse_jobs_by_field(ph_client, team, url, field, acc=[]): response = send_request(url, method="GET")