Skip to content

Commit

Permalink
fix(data-warehouse): use rows synced from pg (#24203)
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE authored Aug 6, 2024
1 parent bd1be16 commit 0e64d85
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 64 deletions.
2 changes: 1 addition & 1 deletion ee/billing/test/test_quota_limiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def test_quota_limit_feature_flag_not_on(self, patch_feature_enabled, patch_capt
self.organization.save()

time.sleep(1)
with self.assertNumQueries(2):
with self.assertNumQueries(3):
quota_limited_orgs, quota_limiting_suspended_orgs = update_all_org_billing_quotas()
# Shouldn't be called due to lazy evaluation of the conditional
patch_feature_enabled.assert_not_called()
Expand Down
54 changes: 15 additions & 39 deletions posthog/tasks/test/test_usage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
snapshot_clickhouse_queries,
)
from posthog.utils import get_machine_id, get_previous_day
from posthog.warehouse.models import ExternalDataJob, ExternalDataSource

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -1095,46 +1096,21 @@ def test_external_data_rows_synced_response(
) -> None:
self._setup_teams()

source = ExternalDataSource.objects.create(
team=self.analytics_team,
source_id="source_id",
connection_id="connection_id",
status=ExternalDataSource.Status.COMPLETED,
source_type=ExternalDataSource.Type.STRIPE,
)

for i in range(5):
start_time = (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ")
_create_event(
distinct_id="3",
event="$data_sync_job_completed",
properties={
"count": 10,
"job_id": 10924,
"start_time": start_time,
},
timestamp=now() - relativedelta(hours=i),
team=self.analytics_team,
)
# identical job id should be deduped and not counted
_create_event(
distinct_id="3",
event="$data_sync_job_completed",
properties={
"count": 10,
"job_id": 10924,
"start_time": start_time,
},
timestamp=now() - relativedelta(hours=i, minutes=i),
team=self.analytics_team,
)
ExternalDataJob.objects.create(team_id=3, created_at=start_time, rows_synced=10, pipeline=source)

for i in range(5):
_create_event(
distinct_id="4",
event="$data_sync_job_completed",
properties={
"count": 10,
"job_id": 10924,
"start_time": (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ"),
},
timestamp=now() - relativedelta(hours=i),
team=self.analytics_team,
)

flush_persons_and_events()
start_time = (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ")
ExternalDataJob.objects.create(team_id=4, created_at=start_time, rows_synced=10, pipeline=source)

period = get_previous_day(at=now() + relativedelta(days=1))
period_start, period_end = period
Expand All @@ -1151,10 +1127,10 @@ def test_external_data_rows_synced_response(
)

assert org_1_report["organization_name"] == "Org 1"
assert org_1_report["rows_synced_in_period"] == 20
assert org_1_report["rows_synced_in_period"] == 100

assert org_1_report["teams"]["3"]["rows_synced_in_period"] == 10
assert org_1_report["teams"]["4"]["rows_synced_in_period"] == 10
assert org_1_report["teams"]["3"]["rows_synced_in_period"] == 50
assert org_1_report["teams"]["4"]["rows_synced_in_period"] == 50

assert org_2_report["organization_name"] == "Org 2"
assert org_2_report["rows_synced_in_period"] == 0
Expand Down
32 changes: 8 additions & 24 deletions posthog/tasks/usage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dateutil import parser
from django.conf import settings
from django.db import connection
from django.db.models import Count, Q
from django.db.models import Count, Q, Sum
from posthoganalytics.client import Client
from psycopg import sql
from retry import retry
Expand Down Expand Up @@ -40,6 +40,8 @@
get_previous_day,
)

from posthog.warehouse.models import ExternalDataJob

logger = structlog.get_logger(__name__)


Expand Down Expand Up @@ -602,31 +604,13 @@ def get_teams_with_survey_responses_count_in_period(

@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list[tuple[int, int]]:
team_to_query = 1 if get_instance_region() == "EU" else 2

# dedup by job id incase there were duplicates sent
results = sync_execute(
"""
SELECT team, sum(rows_synced) FROM (
SELECT JSONExtractString(properties, 'job_id') AS job_id, distinct_id AS team, any(JSONExtractInt(properties, 'count')) AS rows_synced
FROM events
WHERE team_id = %(team_to_query)s AND event = '$data_sync_job_completed' AND JSONExtractString(properties, 'start_time') != '' AND parseDateTimeBestEffort(JSONExtractString(properties, 'start_time')) BETWEEN %(begin)s AND %(end)s
GROUP BY job_id, team
)
GROUP BY team
""",
{
"begin": begin,
"end": end,
"team_to_query": team_to_query,
},
workload=Workload.OFFLINE,
settings=CH_BILLING_SETTINGS,
def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list:
return list(
ExternalDataJob.objects.filter(created_at__gte=begin, created_at__lte=end)
.values("team_id")
.annotate(total=Sum("rows_synced"))
)

return results


@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0)
def capture_report(
Expand Down

0 comments on commit 0e64d85

Please sign in to comment.