Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(data-warehouse): use rows synced from pg #24203

Merged
merged 5 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ee/billing/test/test_quota_limiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def test_quota_limit_feature_flag_not_on(self, patch_feature_enabled, patch_capt
self.organization.save()

time.sleep(1)
with self.assertNumQueries(2):
with self.assertNumQueries(3):
quota_limited_orgs, quota_limiting_suspended_orgs = update_all_org_billing_quotas()
# Shouldn't be called due to lazy evaluation of the conditional
patch_feature_enabled.assert_not_called()
Expand Down
54 changes: 15 additions & 39 deletions posthog/tasks/test/test_usage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
snapshot_clickhouse_queries,
)
from posthog.utils import get_machine_id, get_previous_day
from posthog.warehouse.models import ExternalDataJob, ExternalDataSource

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -1095,46 +1096,21 @@ def test_external_data_rows_synced_response(
) -> None:
self._setup_teams()

source = ExternalDataSource.objects.create(
team=self.analytics_team,
source_id="source_id",
connection_id="connection_id",
status=ExternalDataSource.Status.COMPLETED,
source_type=ExternalDataSource.Type.STRIPE,
)

for i in range(5):
start_time = (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ")
_create_event(
distinct_id="3",
event="$data_sync_job_completed",
properties={
"count": 10,
"job_id": 10924,
"start_time": start_time,
},
timestamp=now() - relativedelta(hours=i),
team=self.analytics_team,
)
# identical job id should be deduped and not counted
_create_event(
distinct_id="3",
event="$data_sync_job_completed",
properties={
"count": 10,
"job_id": 10924,
"start_time": start_time,
},
timestamp=now() - relativedelta(hours=i, minutes=i),
team=self.analytics_team,
)
ExternalDataJob.objects.create(team_id=3, created_at=start_time, rows_synced=10, pipeline=source)

for i in range(5):
_create_event(
distinct_id="4",
event="$data_sync_job_completed",
properties={
"count": 10,
"job_id": 10924,
"start_time": (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ"),
},
timestamp=now() - relativedelta(hours=i),
team=self.analytics_team,
)

flush_persons_and_events()
start_time = (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ")
ExternalDataJob.objects.create(team_id=4, created_at=start_time, rows_synced=10, pipeline=source)

period = get_previous_day(at=now() + relativedelta(days=1))
period_start, period_end = period
Expand All @@ -1151,10 +1127,10 @@ def test_external_data_rows_synced_response(
)

assert org_1_report["organization_name"] == "Org 1"
assert org_1_report["rows_synced_in_period"] == 20
assert org_1_report["rows_synced_in_period"] == 100

assert org_1_report["teams"]["3"]["rows_synced_in_period"] == 10
assert org_1_report["teams"]["4"]["rows_synced_in_period"] == 10
assert org_1_report["teams"]["3"]["rows_synced_in_period"] == 50
assert org_1_report["teams"]["4"]["rows_synced_in_period"] == 50

assert org_2_report["organization_name"] == "Org 2"
assert org_2_report["rows_synced_in_period"] == 0
Expand Down
32 changes: 8 additions & 24 deletions posthog/tasks/usage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dateutil import parser
from django.conf import settings
from django.db import connection
from django.db.models import Count, Q
from django.db.models import Count, Q, Sum
from posthoganalytics.client import Client
from psycopg import sql
from retry import retry
Expand Down Expand Up @@ -40,6 +40,8 @@
get_previous_day,
)

from posthog.warehouse.models import ExternalDataJob

logger = structlog.get_logger(__name__)


Expand Down Expand Up @@ -602,31 +604,13 @@ def get_teams_with_survey_responses_count_in_period(

@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list[tuple[int, int]]:
team_to_query = 1 if get_instance_region() == "EU" else 2

# dedup by job id incase there were duplicates sent
results = sync_execute(
"""
SELECT team, sum(rows_synced) FROM (
SELECT JSONExtractString(properties, 'job_id') AS job_id, distinct_id AS team, any(JSONExtractInt(properties, 'count')) AS rows_synced
FROM events
WHERE team_id = %(team_to_query)s AND event = '$data_sync_job_completed' AND JSONExtractString(properties, 'start_time') != '' AND parseDateTimeBestEffort(JSONExtractString(properties, 'start_time')) BETWEEN %(begin)s AND %(end)s
GROUP BY job_id, team
)
GROUP BY team
""",
{
"begin": begin,
"end": end,
"team_to_query": team_to_query,
},
workload=Workload.OFFLINE,
settings=CH_BILLING_SETTINGS,
def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list:
return list(
ExternalDataJob.objects.filter(created_at__gte=begin, created_at__lte=end)
.values("team_id")
.annotate(total=Sum("rows_synced"))
)

return results


@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0)
def capture_report(
Expand Down
Loading