From c51a9daeeeba57c22fb1db7ef50bfe8f54677ed8 Mon Sep 17 00:00:00 2001 From: eric Date: Mon, 5 Aug 2024 15:26:39 -0400 Subject: [PATCH 1/3] use rows_synced from pg --- posthog/tasks/test/test_usage_report.py | 54 +++++++------------------ posthog/tasks/usage_report.py | 30 ++++---------- 2 files changed, 22 insertions(+), 62 deletions(-) diff --git a/posthog/tasks/test/test_usage_report.py b/posthog/tasks/test/test_usage_report.py index acee43d00d19f..9aebf9d05ea76 100644 --- a/posthog/tasks/test/test_usage_report.py +++ b/posthog/tasks/test/test_usage_report.py @@ -54,6 +54,7 @@ snapshot_clickhouse_queries, ) from posthog.utils import get_machine_id, get_previous_day +from posthog.warehouse.models import ExternalDataJob, ExternalDataSource logger = structlog.get_logger(__name__) @@ -1095,46 +1096,21 @@ def test_external_data_rows_synced_response( ) -> None: self._setup_teams() + source = ExternalDataSource.objects.create( + team=self.analytics_team, + source_id="source_id", + connection_id="connection_id", + status=ExternalDataSource.Status.COMPLETED, + source_type=ExternalDataSource.Type.STRIPE, + ) + for i in range(5): start_time = (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ") - _create_event( - distinct_id="3", - event="$data_sync_job_completed", - properties={ - "count": 10, - "job_id": 10924, - "start_time": start_time, - }, - timestamp=now() - relativedelta(hours=i), - team=self.analytics_team, - ) - # identical job id should be deduped and not counted - _create_event( - distinct_id="3", - event="$data_sync_job_completed", - properties={ - "count": 10, - "job_id": 10924, - "start_time": start_time, - }, - timestamp=now() - relativedelta(hours=i, minutes=i), - team=self.analytics_team, - ) + ExternalDataJob.objects.create(team_id=3, created_at=start_time, rows_synced=10, pipeline=source) for i in range(5): - _create_event( - distinct_id="4", - event="$data_sync_job_completed", - properties={ - "count": 10, - "job_id": 10924, - "start_time": (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ"), - }, - timestamp=now() - relativedelta(hours=i), - team=self.analytics_team, - ) - - flush_persons_and_events() + start_time = (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ") + ExternalDataJob.objects.create(team_id=4, created_at=start_time, rows_synced=10, pipeline=source) period = get_previous_day(at=now() + relativedelta(days=1)) period_start, period_end = period @@ -1151,10 +1127,10 @@ def test_external_data_rows_synced_response( ) assert org_1_report["organization_name"] == "Org 1" - assert org_1_report["rows_synced_in_period"] == 20 + assert org_1_report["rows_synced_in_period"] == 100 - assert org_1_report["teams"]["3"]["rows_synced_in_period"] == 10 - assert org_1_report["teams"]["4"]["rows_synced_in_period"] == 10 + assert org_1_report["teams"]["3"]["rows_synced_in_period"] == 50 + assert org_1_report["teams"]["4"]["rows_synced_in_period"] == 50 assert org_2_report["organization_name"] == "Org 2" assert org_2_report["rows_synced_in_period"] == 0 diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index fd6598cd7ca3b..66b3693bed444 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -11,7 +11,7 @@ from dateutil import parser from django.conf import settings from django.db import connection -from django.db.models import Count, Q +from django.db.models import Count, Q, Sum from posthoganalytics.client import Client from psycopg import sql from retry import retry @@ -40,6 +40,8 @@ get_previous_day, ) +from posthog.warehouse.models import ExternalDataJob + logger = structlog.get_logger(__name__) @@ -603,30 +605,12 @@ def get_teams_with_survey_responses_count_in_period( @timed_log() @retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list[tuple[int, int]]: - team_to_query = 1 if get_instance_region() == "EU" else 2 - - # dedup by job id incase there were duplicates sent - results = sync_execute( - """ - SELECT team, sum(rows_synced) FROM ( - SELECT JSONExtractString(properties, 'job_id') AS job_id, distinct_id AS team, any(JSONExtractInt(properties, 'count')) AS rows_synced - FROM events - WHERE team_id = %(team_to_query)s AND event = '$data_sync_job_completed' AND JSONExtractString(properties, 'start_time') != '' AND parseDateTimeBestEffort(JSONExtractString(properties, 'start_time')) BETWEEN %(begin)s AND %(end)s - GROUP BY job_id, team - ) - GROUP BY team - """, - { - "begin": begin, - "end": end, - "team_to_query": team_to_query, - }, - workload=Workload.OFFLINE, - settings=CH_BILLING_SETTINGS, + return list( + ExternalDataJob.objects.filter(created_at__gte=begin, created_at__lte=end) + .values("team_id") + .annotate(total=Sum("rows_synced")) ) - return results - @shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0) def capture_report( From 03672630e91e79565d0b89775143b4c0fdd7c750 Mon Sep 17 00:00:00 2001 From: eric Date: Mon, 5 Aug 2024 15:55:19 -0400 Subject: [PATCH 2/3] update test --- ee/billing/test/test_quota_limiting.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ee/billing/test/test_quota_limiting.py b/ee/billing/test/test_quota_limiting.py index 020c0979ccc91..3e8b5105767d3 100644 --- a/ee/billing/test/test_quota_limiting.py +++ b/ee/billing/test/test_quota_limiting.py @@ -134,7 +134,7 @@ def test_quota_limit_feature_flag_not_on(self, patch_feature_enabled, patch_capt self.organization.save() time.sleep(1) - with self.assertNumQueries(2): + with self.assertNumQueries(3): quota_limited_orgs, quota_limiting_suspended_orgs = update_all_org_billing_quotas() # Shouldn't be called due to lazy evaluation of the conditional patch_feature_enabled.assert_not_called() From 2f75effbb7b5a95f26dfc9eacbbfc7dcfe4b4e24 Mon Sep 17 00:00:00 2001 From: eric Date: Mon, 5 Aug 2024 15:58:14 -0400 Subject: [PATCH 3/3] types --- posthog/tasks/usage_report.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index 66b3693bed444..fd08f1a1f6e7f 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -604,7 +604,7 @@ def get_teams_with_survey_responses_count_in_period( @timed_log() @retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) -def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list[tuple[int, int]]: +def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list: return list( ExternalDataJob.objects.filter(created_at__gte=begin, created_at__lte=end) .values("team_id")