From f56bcf64cdef0b2b499d4a66e9ea37dde418d8cc Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Thu, 21 Nov 2024 08:24:18 -0800 Subject: [PATCH 01/12] kinda maybe make it work --- posthog/tasks/test/test_usage_report.py | 19 +- posthog/tasks/usage_report.py | 308 ++++++++++++++++++++++-- 2 files changed, 300 insertions(+), 27 deletions(-) diff --git a/posthog/tasks/test/test_usage_report.py b/posthog/tasks/test/test_usage_report.py index 41795a3bc372b..d1c623e4832cb 100644 --- a/posthog/tasks/test/test_usage_report.py +++ b/posthog/tasks/test/test_usage_report.py @@ -49,6 +49,7 @@ APIBaseTest, ClickhouseDestroyTablesMixin, ClickhouseTestMixin, + QueryMatchingTest, _create_event, _create_person, also_test_with_materialized_columns, @@ -139,7 +140,7 @@ def _setup_replay_data(team_id: int, include_mobile_replay: bool) -> None: @freeze_time("2022-01-10T00:01:00Z") -class UsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesMixin): +class UsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesMixin, QueryMatchingTest): def setUp(self) -> None: super().setUp() @@ -807,6 +808,10 @@ def test_unlicensed_usage_report(self, mock_post: MagicMock, mock_client: MagicM mock_posthog = MagicMock() mock_client.return_value = mock_posthog + # # make a dashboard for the team created within the last 7 days + # with freeze_time("2022-01-08T00:01:00Z"): + # Dashboard.objects.create(name="Test Dashboard", team=self.team) + all_reports = self._test_usage_report() with self.settings(SITE_URL="http://test.posthog.com"): send_all_org_usage_reports() @@ -834,6 +839,8 @@ def test_unlicensed_usage_report(self, mock_post: MagicMock, mock_client: MagicM assert mock_posthog.capture.call_count == 2 mock_posthog.capture.assert_has_calls(calls, any_order=True) + # add tests for new digest fields + @freeze_time("2022-01-09T00:01:00Z") class ReplayUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesMixin): @@ -1553,11 +1560,11 @@ def test_capture_event_called_with_string_timestamp(self, mock_client: MagicMock mock_posthog = MagicMock() mock_client.return_value = mock_posthog capture_event( - mock_client, - "test event", - organization.id, - {"prop1": "val1"}, - "2021-10-10T23:01:00.00Z", + pha_client=mock_client, + name="test event", + organization_id=organization.id, + properties={"prop1": "val1"}, + timestamp="2021-10-10T23:01:00.00Z", ) assert mock_client.capture.call_args[1]["timestamp"] == datetime(2021, 10, 10, 23, 1, tzinfo=tzutc()) diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index f9e3982409d82..b5d2be6ae4400 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -2,7 +2,7 @@ import os from collections import Counter from collections.abc import Sequence -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Literal, Optional, TypedDict, Union, cast import requests @@ -11,7 +11,7 @@ from dateutil import parser from django.conf import settings from django.db import connection -from django.db.models import Count, Q, Sum +from django.db.models import Count, Q, QuerySet, Sum from posthoganalytics.client import Client from psycopg import sql from retry import retry @@ -26,11 +26,17 @@ from posthog.logging.timing import timed_log from posthog.models import GroupTypeMapping, OrganizationMembership, User from posthog.models.dashboard import Dashboard +from posthog.models.event_definition import EventDefinition +from posthog.models.experiment import Experiment from posthog.models.feature_flag import FeatureFlag +from posthog.models.feedback.survey import Survey from posthog.models.organization import Organization from posthog.models.plugin import PluginConfig from posthog.models.team.team import Team from posthog.models.utils import namedtuplefetchall +from posthog.session_recordings.models.session_recording_playlist import ( + SessionRecordingPlaylist, +) from posthog.settings import CLICKHOUSE_CLUSTER, INSTANCE_TAG from posthog.tasks.utils import CeleryQueue from posthog.utils import ( @@ -41,6 +47,7 @@ get_previous_day, ) from posthog.warehouse.models import ExternalDataJob +from posthog.warehouse.models.external_data_source import ExternalDataSource logger = structlog.get_logger(__name__) @@ -132,6 +139,18 @@ class UsageReportCounters: php_events_count_in_period: int +@dataclasses.dataclass +class WeeklyDigestReport: + new_dashboards_in_last_7_days: list[dict[str, str]] + new_event_definitions_in_last_7_days: list[dict[str, str]] + new_playlists_created_in_last_7_days: list[dict[str, str]] + new_experiments_launched_in_last_7_days: list[dict[str, str]] + new_experiments_completed_in_last_7_days: list[dict[str, str]] + new_external_data_sources_connected_in_last_7_days: list[dict[str, str]] + new_surveys_launched_in_last_7_days: list[dict[str, str]] + new_feature_flags_created_in_last_7_days: list[dict[str, str]] + + # Instance metadata to be included in overall report @dataclasses.dataclass class InstanceMetadata: @@ -335,20 +354,23 @@ def send_report_to_billing_service(org_id: str, report: dict[str, Any]) -> None: capture_exception(err) pha_client = Client("sTMFPsFhdP1Ssg") capture_event( - pha_client, - f"organization usage report to billing service failure", - org_id, - {"err": str(err)}, + pha_client=pha_client, + name=f"organization usage report to billing service failure", + organization_id=org_id, + properties={"err": str(err)}, ) raise def capture_event( + *, pha_client: Client, name: str, - organization_id: str, + organization_id: Optional[str] = None, + team_id: Optional[int] = None, properties: dict[str, Any], timestamp: Optional[Union[datetime, str]] = None, + send_for_all_members: bool = False, ) -> None: if timestamp and isinstance(timestamp, str): try: @@ -356,16 +378,38 @@ def capture_event( except ValueError: timestamp = None + if not organization_id and not team_id: + raise ValueError("Either organization_id or team_id must be provided") + if is_cloud(): - org_owner = get_org_owner_or_first_user(organization_id) - distinct_id = org_owner.distinct_id if org_owner and org_owner.distinct_id else f"org-{organization_id}" - pha_client.capture( - distinct_id, - name, - {**properties, "scope": "user"}, - groups={"organization": organization_id, "instance": settings.SITE_URL}, - timestamp=timestamp, - ) + distinct_ids = [] + if send_for_all_members: + if organization_id: + distinct_ids = list( + OrganizationMembership.objects.filter(organization_id=organization_id).values_list( + "user__distinct_id", flat=True + ) + ) + elif team_id: + team = Team.objects.get(id=team_id) + distinct_ids = [user.distinct_id for user in team.all_users_with_access()] + else: + if not organization_id: + team = Team.objects.get(id=team_id) + organization_id = team.organization_id + org_owner = get_org_owner_or_first_user(organization_id) if organization_id else None + distinct_ids.append( + org_owner.distinct_id if org_owner and org_owner.distinct_id else f"org-{organization_id}" + ) + + for distinct_id in distinct_ids: + pha_client.capture( + distinct_id, + name, + {**properties, "scope": "user"}, + groups={"organization": organization_id, "instance": settings.SITE_URL}, + timestamp=timestamp, + ) pha_client.group_identify("organization", organization_id, properties) else: pha_client.capture( @@ -703,22 +747,118 @@ def get_teams_with_hog_function_fetch_calls_in_period( return results +@timed_log() +def get_teams_with_new_dashboards_in_last_7_days( + end: datetime, +) -> QuerySet: + begin = end - timedelta(days=7) + return Dashboard.objects.filter(created_at__gt=begin, created_at__lte=end).values("team_id", "name", "id") + + +@timed_log() +def get_teams_with_new_event_definitions_in_last_7_days( + end: datetime, +) -> QuerySet: + begin = end - timedelta(days=7) + return EventDefinition.objects.filter(created_at__gt=begin, created_at__lte=end).values("team_id", "name", "id") + + +@timed_log() +def get_teams_with_new_playlists_created_in_last_7_days( + end: datetime, +) -> QuerySet: + begin = end - timedelta(days=7) + return SessionRecordingPlaylist.objects.filter(created_at__gt=begin, created_at__lte=end).values( + "team_id", "name", "short_id" + ) + + +@timed_log() +def get_teams_with_new_experiments_launched_in_last_7_days( + end: datetime, +) -> QuerySet: + begin = end - timedelta(days=7) + return Experiment.objects.filter(start_date__gt=begin, start_date__lte=end).values( + "team_id", "name", "id", "start_date" + ) + + +@timed_log() +def get_teams_with_new_experiments_completed_in_last_7_days( + end: datetime, +) -> QuerySet: + begin = end - timedelta(days=7) + return Experiment.objects.filter(end_date__gt=begin, end_date__lte=end).values( + "team_id", "name", "id", "start_date", "end_date" + ) + + +@timed_log() +def get_teams_with_new_external_data_sources_connected_in_last_7_days( + end: datetime, +) -> QuerySet: + begin = end - timedelta(days=7) + return ExternalDataSource.objects.filter(created_at__gt=begin, created_at__lte=end, deleted=False).values( + "team_id", "source_type", "id" + ) + + +@timed_log() +def get_teams_with_new_surveys_launched_in_last_7_days( + end: datetime, +) -> QuerySet: + begin = end - timedelta(days=7) + return Survey.objects.filter(start_date__gt=begin, start_date__lte=end).values( + "team_id", "name", "id", "description" + ) + + +@timed_log() +def get_teams_with_new_feature_flags_created_in_last_7_days( + end: datetime, +) -> QuerySet: + begin = end - timedelta(days=7) + return FeatureFlag.objects.filter(created_at__gt=begin, created_at__lte=end, deleted=False).values( + "team_id", "name", "id", "key" + ) + + @shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0) def capture_report( + *, capture_event_name: str, - org_id: str, + org_id: Optional[str] = None, + team_id: Optional[int] = None, full_report_dict: dict[str, Any], at_date: Optional[datetime] = None, + send_for_all_members: bool = False, ) -> None: + if not org_id and not team_id: + raise ValueError("Either org_id or team_id must be provided") pha_client = Client("sTMFPsFhdP1Ssg") try: - capture_event(pha_client, capture_event_name, org_id, full_report_dict, timestamp=at_date) + capture_event( + pha_client=pha_client, + name=capture_event_name, + organization_id=org_id, + team_id=team_id, + properties=full_report_dict, + timestamp=at_date, + send_for_all_members=send_for_all_members, + ) logger.info(f"UsageReport sent to PostHog for organization {org_id}") except Exception as err: logger.exception( f"UsageReport sent to PostHog for organization {org_id} failed: {str(err)}", ) - capture_event(pha_client, f"{capture_event_name} failure", org_id, {"error": str(err)}) + capture_event( + pha_client=pha_client, + name=f"{capture_event_name} failure", + organization_id=org_id, + team_id=team_id, + properties={"error": str(err)}, + send_for_all_members=send_for_all_members, + ) pha_client.flush() @@ -736,6 +876,10 @@ def has_non_zero_usage(report: FullUsageReport) -> bool: ) +def has_non_zero_digest(report: WeeklyDigestReport) -> bool: + return any(len(getattr(report, key)) > 0 for key in report.__dataclass_fields__) + + def convert_team_usage_rows_to_dict(rows: list[Union[dict, tuple[int, int]]]) -> dict[int, int]: team_id_map = {} for row in rows: @@ -748,6 +892,10 @@ def convert_team_usage_rows_to_dict(rows: list[Union[dict, tuple[int, int]]]) -> return team_id_map +def convert_team_digest_items_to_dict(items: QuerySet) -> dict[int, QuerySet]: + return {team_id: items.filter(team_id=team_id) for team_id in items.values_list("team_id", flat=True).distinct()} + + def _get_all_usage_data(period_start: datetime, period_end: datetime) -> dict[str, Any]: """ Gets all usage data for the specified period. Clickhouse is good at counting things so @@ -920,6 +1068,33 @@ def _get_all_usage_data(period_start: datetime, period_end: datetime) -> dict[st } +def _get_all_digest_data(period_start: datetime, period_end: datetime) -> dict[str, Any]: + return { + "teams_with_new_dashboards_in_last_7_days": get_teams_with_new_dashboards_in_last_7_days(period_end), + "teams_with_new_event_definitions_in_last_7_days": get_teams_with_new_event_definitions_in_last_7_days( + period_end + ), + "teams_with_new_playlists_created_in_last_7_days": get_teams_with_new_playlists_created_in_last_7_days( + period_end + ), + "teams_with_new_experiments_launched_in_last_7_days": get_teams_with_new_experiments_launched_in_last_7_days( + period_end + ), + "teams_with_new_experiments_completed_in_last_7_days": get_teams_with_new_experiments_completed_in_last_7_days( + period_end + ), + "teams_with_new_external_data_sources_connected_in_last_7_days": get_teams_with_new_external_data_sources_connected_in_last_7_days( + period_end + ), + "teams_with_new_surveys_launched_in_last_7_days": get_teams_with_new_surveys_launched_in_last_7_days( + period_end + ), + "teams_with_new_feature_flags_created_in_last_7_days": get_teams_with_new_feature_flags_created_in_last_7_days( + period_end + ), + } + + def _get_all_usage_data_as_team_rows(period_start: datetime, period_end: datetime) -> dict[str, Any]: """ Gets all usage data for the specified period as a map of team_id -> value. This makes it faster @@ -936,7 +1111,7 @@ def _get_teams_for_usage_reports() -> Sequence[Team]: return list( Team.objects.select_related("organization") .exclude(Q(organization__for_internal_metrics=True) | Q(is_demo=True)) - .only("id", "organization__id", "organization__name", "organization__created_at") + .only("id", "name", "organization__id", "organization__name", "organization__created_at") ) @@ -1002,6 +1177,64 @@ def _get_team_report(all_data: dict[str, Any], team: Team) -> UsageReportCounter ) +def _get_all_digest_data_as_team_rows(period_start: datetime, period_end: datetime) -> dict[str, Any]: + all_digest_data = _get_all_digest_data(period_start, period_end) + # convert it to a map of team_id -> value + for key, rows in all_digest_data.items(): + all_digest_data[key] = convert_team_digest_items_to_dict(rows) + return all_digest_data + + +def _get_weekly_digest_report(all_digest_data: dict[str, Any], team: Team) -> WeeklyDigestReport: + report = WeeklyDigestReport( + new_dashboards_in_last_7_days=[ + {"name": dashboard.get("name"), "id": dashboard.get("id")} + for dashboard in all_digest_data["teams_with_new_dashboards_in_last_7_days"].get(team.id, []) + ], + new_event_definitions_in_last_7_days=[ + {"name": event_definition.get("name"), "id": event_definition.get("id")} + for event_definition in all_digest_data["teams_with_new_event_definitions_in_last_7_days"].get(team.id, []) + ], + new_playlists_created_in_last_7_days=[ + {"name": playlist.get("name"), "id": playlist.get("id")} + for playlist in all_digest_data["teams_with_new_playlists_created_in_last_7_days"].get(team.id, []) + ], + new_experiments_launched_in_last_7_days=[ + {"name": experiment.get("name"), "id": experiment.get("id"), "start_date": experiment.get("start_date")} + for experiment in all_digest_data["teams_with_new_experiments_launched_in_last_7_days"].get(team.id, []) + ], + new_experiments_completed_in_last_7_days=[ + { + "name": experiment.get("name"), + "id": experiment.get("id"), + "start_date": experiment.get("start_date"), + "end_date": experiment.get("end_date"), + } + for experiment in all_digest_data["teams_with_new_experiments_completed_in_last_7_days"].get(team.id, []) + ], + new_external_data_sources_connected_in_last_7_days=[ + {"source_type": source.get("source_type"), "id": source.get("id")} + for source in all_digest_data["teams_with_new_external_data_sources_connected_in_last_7_days"].get( + team.id, [] + ) + ], + new_surveys_launched_in_last_7_days=[ + { + "name": survey.get("name"), + "id": survey.get("id"), + "start_date": survey.get("start_date"), + "description": survey.get("description"), + } + for survey in all_digest_data["teams_with_new_surveys_launched_in_last_7_days"].get(team.id, []) + ], + new_feature_flags_created_in_last_7_days=[ + {"name": feature_flag.get("name"), "id": feature_flag.get("id"), "key": feature_flag.get("key")} + for feature_flag in all_digest_data["teams_with_new_feature_flags_created_in_last_7_days"].get(team.id, []) + ], + ) + return report + + def _add_team_report_to_org_reports( org_reports: dict[str, OrgReport], team: Team, @@ -1040,6 +1273,11 @@ def _get_all_org_reports(period_start: datetime, period_end: datetime) -> dict[s logger.info("Getting all usage data...") # noqa T201 time_now = datetime.now() all_data = _get_all_usage_data_as_team_rows(period_start, period_end) + all_digest_data = None + if datetime.now().weekday() == 0: + logger.debug("Getting all digest data...") # noqa T201 + all_digest_data = _get_all_digest_data_as_team_rows(period_start, period_end) + logger.debug(f"Getting all usage data took {(datetime.now() - time_now).total_seconds()} seconds.") # noqa T201 logger.info("Getting teams for usage reports...") # noqa T201 @@ -1055,6 +1293,14 @@ def _get_all_org_reports(period_start: datetime, period_end: datetime) -> dict[s team_report = _get_team_report(all_data, team) _add_team_report_to_org_reports(org_reports, team, team_report, period_start) + # on mondays, send the weekly digest report + if datetime.now().weekday() == 0 and all_digest_data: + weekly_digest_report = _get_weekly_digest_report(all_digest_data, team) + if has_non_zero_digest(weekly_digest_report): + _send_weekly_digest_report( + team_id=team.id, team_name=team.name, weekly_digest_report=weekly_digest_report + ) + time_since = datetime.now() - time_now logger.debug(f"Generating reports for teams took {time_since.total_seconds()} seconds.") # noqa T201 return org_reports @@ -1071,6 +1317,21 @@ def _get_full_org_usage_report_as_dict(full_report: FullUsageReport) -> dict[str return dataclasses.asdict(full_report) +@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=3) +def _send_weekly_digest_report(*, team_id: int, team_name: str, weekly_digest_report: WeeklyDigestReport) -> None: + full_report_dict = { + "team_id": team_id, + "team_name": team_name, + **dataclasses.asdict(weekly_digest_report), + } + capture_report.delay( + capture_event_name="weekly digest report", + team_id=team_id, + full_report_dict=full_report_dict, + send_for_all_members=True, + ) + + @shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=3) def send_all_org_usage_reports( dry_run: bool = False, @@ -1107,7 +1368,12 @@ def send_all_org_usage_reports( # First capture the events to PostHog if not skip_capture_event: at_date_str = at_date.isoformat() if at_date else None - capture_report.delay(capture_event_name, org_id, full_report_dict, at_date_str) + capture_report.delay( + capture_event_name=capture_event_name, + org_id=org_id, + full_report_dict=full_report_dict, + at_date=at_date_str, + ) # Then capture the events to Billing if has_non_zero_usage(full_report): From c8e8bff63b868d3d22d8efb51623824446ab2927 Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Thu, 21 Nov 2024 08:57:56 -0800 Subject: [PATCH 02/12] write test --- posthog/tasks/test/test_usage_report.py | 183 +++++++++++++++++++++++- posthog/tasks/usage_report.py | 27 ++-- 2 files changed, 200 insertions(+), 10 deletions(-) diff --git a/posthog/tasks/test/test_usage_report.py b/posthog/tasks/test/test_usage_report.py index d1c623e4832cb..cdd0ddbaf4b3f 100644 --- a/posthog/tasks/test/test_usage_report.py +++ b/posthog/tasks/test/test_usage_report.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timedelta from typing import Any from unittest.mock import ANY, MagicMock, Mock, call, patch from uuid import uuid4 @@ -23,12 +23,18 @@ from posthog.models.app_metrics2.sql import TRUNCATE_APP_METRICS2_TABLE_SQL from posthog.models.dashboard import Dashboard from posthog.models.event.util import create_event +from posthog.models.event_definition import EventDefinition +from posthog.models.experiment import Experiment from posthog.models.feature_flag import FeatureFlag +from posthog.models.feedback.survey import Survey from posthog.models.group.util import create_group from posthog.models.group_type_mapping import GroupTypeMapping from posthog.models.plugin import PluginConfig from posthog.models.sharing_configuration import SharingConfiguration from posthog.schema import EventsQuery +from posthog.session_recordings.models.session_recording_playlist import ( + SessionRecordingPlaylist, +) from posthog.session_recordings.queries.test.session_replay_sql import ( produce_replay_summary, ) @@ -839,7 +845,180 @@ def test_unlicensed_usage_report(self, mock_post: MagicMock, mock_client: MagicM assert mock_posthog.capture.call_count == 2 mock_posthog.capture.assert_has_calls(calls, any_order=True) - # add tests for new digest fields + +@freeze_time("2024-01-01T00:01:00Z") # A Monday +class TestWeeklyDigestReport(ClickhouseDestroyTablesMixin, APIBaseTest): + def setUp(self) -> None: + super().setUp() + self.distinct_id = str(uuid4()) + self.one_week_ago = now() - timedelta(days=7) + + @patch("posthog.tasks.usage_report.Client") + def test_weekly_digest_report(self, mock_client: MagicMock) -> None: + # Create test data from "last week" + with freeze_time(self.one_week_ago): + # Create a dashboard + dashboard = Dashboard.objects.create( + team=self.team, + name="Test Dashboard", + created_at=now(), + ) + + # Create an event definition + event_definition = EventDefinition.objects.create( + team=self.team, + name="Test Event", + created_at=now(), + ) + + # Create a playlist + playlist = SessionRecordingPlaylist.objects.create( + team=self.team, + name="Test Playlist", + created_at=now(), + ) + + # Create experiments + # this flag should not be included in the digest + flag_for_launched_experiment = FeatureFlag.objects.create( + team=self.team, + name="Feature Flag for Experiment My experiment 1", + key="flag-for-launched-experiment", + created_at=now(), + ) + launched_experiment = Experiment.objects.create( + team=self.team, + name="Launched Experiment", + created_at=now(), + start_date=now(), + feature_flag=flag_for_launched_experiment, + ) + + # this flag should not e included in the digest + flag_for_completed_experiment = FeatureFlag.objects.create( + team=self.team, + name="Feature Flag for Experiment My experiment 2", + key="feature-flag-for-completed-experiment", + created_at=now(), + ) + completed_experiment = Experiment.objects.create( + team=self.team, + name="Completed Experiment", + created_at=now(), + start_date=now() - timedelta(days=2), + end_date=now(), + feature_flag=flag_for_completed_experiment, + ) + + # Create external data source + external_data_source = ExternalDataSource.objects.create( + team=self.team, + source_id="test_source", + connection_id="test_connection", + status="completed", + source_type="Stripe", + created_at=now(), + ) + + # Create a survey + # this flag should not be included in the digest since it's generated for the survey + flag_for_survey = FeatureFlag.objects.create( + team=self.team, + name="Targeting flag for survey My survey", + key="feature-flag-for-survey", + created_at=now(), + ) + survey = Survey.objects.create( + team=self.team, + name="Test Survey", + description="Test Description", + created_at=now(), + start_date=now(), + targeting_flag=flag_for_survey, + ) + + # Create a feature flag + feature_flag = FeatureFlag.objects.create( + team=self.team, + name="Test Flag", + key="test-flag", + created_at=now(), + ) + + # Run the usage report task + mock_posthog = MagicMock() + mock_client.return_value = mock_posthog + + period = get_previous_day() + period_start, period_end = period + _get_all_org_reports(period_start, period_end) + + # Check that the capture event was called with the correct data + calls = mock_posthog.capture.call_args_list + weekly_digest_call = next(call for call in calls if call[0][1] == "weekly digest report") + properties = weekly_digest_call[0][2] + + expected_properties = { + "team_id": self.team.id, + "team_name": self.team.name, + "scope": "machine", + "new_dashboards_in_last_7_days": [ + { + "name": "Test Dashboard", + "id": dashboard.id, + } + ], + "new_event_definitions_in_last_7_days": [ + { + "name": "Test Event", + "id": event_definition.id, + } + ], + "new_playlists_created_in_last_7_days": [ + { + "name": "Test Playlist", + "id": playlist.short_id, + } + ], + "new_experiments_launched_in_last_7_days": [ + { + "name": "Launched Experiment", + "id": launched_experiment.id, + "start_date": launched_experiment.start_date.isoformat(), + } + ], + "new_experiments_completed_in_last_7_days": [ + { + "name": "Completed Experiment", + "id": completed_experiment.id, + "start_date": completed_experiment.start_date.isoformat(), + "end_date": completed_experiment.end_date.isoformat(), + } + ], + "new_external_data_sources_connected_in_last_7_days": [ + { + "source_type": "Stripe", + "id": external_data_source.id, + } + ], + "new_surveys_launched_in_last_7_days": [ + { + "name": "Test Survey", + "id": survey.id, + "start_date": survey.start_date.isoformat(), + "description": "Test Description", + } + ], + "new_feature_flags_created_in_last_7_days": [ + { + "name": "Test Flag", + "id": feature_flag.id, + "key": "test-flag", + } + ], + } + + assert properties == expected_properties @freeze_time("2022-01-09T00:01:00Z") diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index b5d2be6ae4400..f32d2067bbabb 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -809,7 +809,7 @@ def get_teams_with_new_surveys_launched_in_last_7_days( ) -> QuerySet: begin = end - timedelta(days=7) return Survey.objects.filter(start_date__gt=begin, start_date__lte=end).values( - "team_id", "name", "id", "description" + "team_id", "name", "id", "description", "start_date" ) @@ -818,8 +818,15 @@ def get_teams_with_new_feature_flags_created_in_last_7_days( end: datetime, ) -> QuerySet: begin = end - timedelta(days=7) - return FeatureFlag.objects.filter(created_at__gt=begin, created_at__lte=end, deleted=False).values( - "team_id", "name", "id", "key" + return ( + FeatureFlag.objects.filter( + created_at__gt=begin, + created_at__lte=end, + deleted=False, + ) + .exclude(name__contains="Feature Flag for Experiment") + .exclude(name__contains="Targeting flag for survey") + .values("team_id", "name", "id", "key") ) @@ -1196,19 +1203,23 @@ def _get_weekly_digest_report(all_digest_data: dict[str, Any], team: Team) -> We for event_definition in all_digest_data["teams_with_new_event_definitions_in_last_7_days"].get(team.id, []) ], new_playlists_created_in_last_7_days=[ - {"name": playlist.get("name"), "id": playlist.get("id")} + {"name": playlist.get("name"), "id": playlist.get("short_id")} for playlist in all_digest_data["teams_with_new_playlists_created_in_last_7_days"].get(team.id, []) ], new_experiments_launched_in_last_7_days=[ - {"name": experiment.get("name"), "id": experiment.get("id"), "start_date": experiment.get("start_date")} + { + "name": experiment.get("name"), + "id": experiment.get("id"), + "start_date": experiment.get("start_date").isoformat(), + } for experiment in all_digest_data["teams_with_new_experiments_launched_in_last_7_days"].get(team.id, []) ], new_experiments_completed_in_last_7_days=[ { "name": experiment.get("name"), "id": experiment.get("id"), - "start_date": experiment.get("start_date"), - "end_date": experiment.get("end_date"), + "start_date": experiment.get("start_date").isoformat(), + "end_date": experiment.get("end_date").isoformat(), } for experiment in all_digest_data["teams_with_new_experiments_completed_in_last_7_days"].get(team.id, []) ], @@ -1222,7 +1233,7 @@ def _get_weekly_digest_report(all_digest_data: dict[str, Any], team: Team) -> We { "name": survey.get("name"), "id": survey.get("id"), - "start_date": survey.get("start_date"), + "start_date": survey.get("start_date").isoformat(), "description": survey.get("description"), } for survey in all_digest_data["teams_with_new_surveys_launched_in_last_7_days"].get(team.id, []) From 193e79f8696da761fe4b6b0a5529b164083b275e Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Fri, 29 Nov 2024 13:50:03 -0800 Subject: [PATCH 03/12] move to a completely separate file --- posthog/tasks/test/test_usage_report.py | 183 +---------------- posthog/tasks/test/test_weekly_digest.py | 182 +++++++++++++++++ posthog/tasks/usage_report.py | 230 +-------------------- posthog/tasks/weekly_digest.py | 247 +++++++++++++++++++++++ 4 files changed, 432 insertions(+), 410 deletions(-) create mode 100644 posthog/tasks/test/test_weekly_digest.py create mode 100644 posthog/tasks/weekly_digest.py diff --git a/posthog/tasks/test/test_usage_report.py b/posthog/tasks/test/test_usage_report.py index c7a01a8e8a70d..276646ce30368 100644 --- a/posthog/tasks/test/test_usage_report.py +++ b/posthog/tasks/test/test_usage_report.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import datetime from typing import Any from unittest.mock import ANY, MagicMock, Mock, call, patch from uuid import uuid4 @@ -24,18 +24,12 @@ from posthog.models.app_metrics2.sql import TRUNCATE_APP_METRICS2_TABLE_SQL from posthog.models.dashboard import Dashboard from posthog.models.event.util import create_event -from posthog.models.event_definition import EventDefinition -from posthog.models.experiment import Experiment from posthog.models.feature_flag import FeatureFlag -from posthog.models.feedback.survey import Survey from posthog.models.group.util import create_group from posthog.models.group_type_mapping import GroupTypeMapping from posthog.models.plugin import PluginConfig from posthog.models.sharing_configuration import SharingConfiguration from posthog.schema import EventsQuery -from posthog.session_recordings.models.session_recording_playlist import ( - SessionRecordingPlaylist, -) from posthog.session_recordings.queries.test.session_replay_sql import ( produce_replay_summary, ) @@ -853,181 +847,6 @@ def test_unlicensed_usage_report(self, mock_post: MagicMock, mock_client: MagicM mock_posthog.capture.assert_has_calls(calls, any_order=True) -@freeze_time("2024-01-01T00:01:00Z") # A Monday -class TestWeeklyDigestReport(ClickhouseDestroyTablesMixin, APIBaseTest): - def setUp(self) -> None: - super().setUp() - self.distinct_id = str(uuid4()) - self.one_week_ago = now() - timedelta(days=7) - - @patch("posthog.tasks.usage_report.Client") - def test_weekly_digest_report(self, mock_client: MagicMock) -> None: - # Create test data from "last week" - with freeze_time(self.one_week_ago): - # Create a dashboard - dashboard = Dashboard.objects.create( - team=self.team, - name="Test Dashboard", - created_at=now(), - ) - - # Create an event definition - event_definition = EventDefinition.objects.create( - team=self.team, - name="Test Event", - created_at=now(), - ) - - # Create a playlist - playlist = SessionRecordingPlaylist.objects.create( - team=self.team, - name="Test Playlist", - created_at=now(), - ) - - # Create experiments - # this flag should not be included in the digest - flag_for_launched_experiment = FeatureFlag.objects.create( - team=self.team, - name="Feature Flag for Experiment My experiment 1", - key="flag-for-launched-experiment", - created_at=now(), - ) - launched_experiment = Experiment.objects.create( - team=self.team, - name="Launched Experiment", - created_at=now(), - start_date=now(), - feature_flag=flag_for_launched_experiment, - ) - - # this flag should not e included in the digest - flag_for_completed_experiment = FeatureFlag.objects.create( - team=self.team, - name="Feature Flag for Experiment My experiment 2", - key="feature-flag-for-completed-experiment", - created_at=now(), - ) - completed_experiment = Experiment.objects.create( - team=self.team, - name="Completed Experiment", - created_at=now(), - start_date=now() - timedelta(days=2), - end_date=now(), - feature_flag=flag_for_completed_experiment, - ) - - # Create external data source - external_data_source = ExternalDataSource.objects.create( - team=self.team, - source_id="test_source", - connection_id="test_connection", - status="completed", - source_type="Stripe", - created_at=now(), - ) - - # Create a survey - # this flag should not be included in the digest since it's generated for the survey - flag_for_survey = FeatureFlag.objects.create( - team=self.team, - name="Targeting flag for survey My survey", - key="feature-flag-for-survey", - created_at=now(), - ) - survey = Survey.objects.create( - team=self.team, - name="Test Survey", - description="Test Description", - created_at=now(), - start_date=now(), - targeting_flag=flag_for_survey, - ) - - # Create a feature flag - feature_flag = FeatureFlag.objects.create( - team=self.team, - name="Test Flag", - key="test-flag", - created_at=now(), - ) - - # Run the usage report task - mock_posthog = MagicMock() - mock_client.return_value = mock_posthog - - period = get_previous_day() - period_start, period_end = period - _get_all_org_reports(period_start, period_end) - - # Check that the capture event was called with the correct data - calls = mock_posthog.capture.call_args_list - weekly_digest_call = next(call for call in calls if call[0][1] == "weekly digest report") - properties = weekly_digest_call[0][2] - - expected_properties = { - "team_id": self.team.id, - "team_name": self.team.name, - "scope": "machine", - "new_dashboards_in_last_7_days": [ - { - "name": "Test Dashboard", - "id": dashboard.id, - } - ], - "new_event_definitions_in_last_7_days": [ - { - "name": "Test Event", - "id": event_definition.id, - } - ], - "new_playlists_created_in_last_7_days": [ - { - "name": "Test Playlist", - "id": playlist.short_id, - } - ], - "new_experiments_launched_in_last_7_days": [ - { - "name": "Launched Experiment", - "id": launched_experiment.id, - "start_date": launched_experiment.start_date.isoformat(), - } - ], - "new_experiments_completed_in_last_7_days": [ - { - "name": "Completed Experiment", - "id": completed_experiment.id, - "start_date": completed_experiment.start_date.isoformat(), - "end_date": completed_experiment.end_date.isoformat(), - } - ], - "new_external_data_sources_connected_in_last_7_days": [ - { - "source_type": "Stripe", - "id": external_data_source.id, - } - ], - "new_surveys_launched_in_last_7_days": [ - { - "name": "Test Survey", - "id": survey.id, - "start_date": survey.start_date.isoformat(), - "description": "Test Description", - } - ], - "new_feature_flags_created_in_last_7_days": [ - { - "name": "Test Flag", - "id": feature_flag.id, - "key": "test-flag", - } - ], - } - - assert properties == expected_properties - - @freeze_time("2022-01-09T00:01:00Z") class ReplayUsageReport(APIBaseTest, ClickhouseTestMixin, ClickhouseDestroyTablesMixin): @also_test_with_materialized_columns(event_properties=["$lib"], verify_no_jsonextract=False) diff --git a/posthog/tasks/test/test_weekly_digest.py b/posthog/tasks/test/test_weekly_digest.py new file mode 100644 index 0000000000000..40614305f1388 --- /dev/null +++ b/posthog/tasks/test/test_weekly_digest.py @@ -0,0 +1,182 @@ +from datetime import timedelta +from unittest.mock import MagicMock, patch +from uuid import uuid4 + +from django.utils.timezone import now +from freezegun import freeze_time + +from posthog.models import Dashboard, EventDefinition, Experiment, FeatureFlag, Survey +from posthog.session_recordings.models.session_recording_playlist import ( + SessionRecordingPlaylist, +) +from posthog.tasks.weekly_digest import send_all_weekly_digest_reports +from posthog.test.base import APIBaseTest +from posthog.warehouse.models import ExternalDataSource + + +@freeze_time("2024-01-01T00:01:00Z") # A Monday +class TestWeeklyDigestReport(APIBaseTest): + def setUp(self) -> None: + super().setUp() + self.distinct_id = str(uuid4()) + + @freeze_time("2024-01-20T00:01:00Z") + @patch("posthog.tasks.weekly_digest.capture_report") + def test_weekly_digest_report(self, mock_capture: MagicMock) -> None: + # Create test data from "last week" + with freeze_time("2024-01-15T00:01:00Z"): + # Create a dashboard + dashboard = Dashboard.objects.create( + team=self.team, + name="Test Dashboard", + ) + + # Create an event definition + event_definition = EventDefinition.objects.create( + team=self.team, + name="Test Event", + ) + + # Create a playlist + playlist = SessionRecordingPlaylist.objects.create( + team=self.team, + name="Test Playlist", + ) + + # Create experiments + # this flag should not be included in the digest + flag_for_launched_experiment = FeatureFlag.objects.create( + team=self.team, + name="Feature Flag for Experiment My experiment 1", + key="flag-for-launched-experiment", + ) + launched_experiment = Experiment.objects.create( + team=self.team, + name="Launched Experiment", + start_date=now(), + feature_flag=flag_for_launched_experiment, + ) + + # Create external data source + external_data_source = ExternalDataSource.objects.create( + team=self.team, + source_id="test_source", + connection_id="test_connection", + status="completed", + source_type="Stripe", + ) + + # Create a survey + # this flag should not be included in the digest since it's generated for the survey + flag_for_survey = FeatureFlag.objects.create( + team=self.team, + name="Targeting flag for survey My survey", + key="feature-flag-for-survey", + ) + survey = Survey.objects.create( + team=self.team, + name="Test Survey", + description="Test Description", + start_date=now(), + targeting_flag=flag_for_survey, + ) + + # Create a feature flag + feature_flag = FeatureFlag.objects.create( + team=self.team, + name="Test Flag", + key="test-flag", + ) + + with freeze_time("2024-01-10T00:01:00Z"): + # this flag should not be included in the digest + flag_for_completed_experiment = FeatureFlag.objects.create( + team=self.team, + name="Feature Flag for Experiment My experiment 2", + key="feature-flag-for-completed-experiment", + ) + # completed experiment is not included in the list of launched experiments + # but is included in the list of completed experiments + completed_experiment = Experiment.objects.create( + team=self.team, + name="Completed Experiment", + start_date=now() + timedelta(days=1), + end_date=now() + timedelta(days=6), + feature_flag=flag_for_completed_experiment, + ) + + # Run the weekly digest report task + send_all_weekly_digest_reports() + + # Check that the capture event was called with the correct data + expected_properties = { + "team_id": self.team.id, + "team_name": self.team.name, + "template": "weekly_digest_report", + "new_dashboards_in_last_7_days": [ + { + "name": "Test Dashboard", + "id": dashboard.id, + } + ], + "new_event_definitions_in_last_7_days": [ + { + "name": "Test Event", + "id": event_definition.id, + } + ], + "new_playlists_created_in_last_7_days": [ + { + "name": "Test Playlist", + "id": playlist.short_id, + } + ], + "new_experiments_launched_in_last_7_days": [ + { + "name": "Launched Experiment", + "id": launched_experiment.id, + "start_date": launched_experiment.start_date.isoformat(), + } + ], + "new_experiments_completed_in_last_7_days": [ + { + "name": "Completed Experiment", + "id": completed_experiment.id, + "start_date": completed_experiment.start_date.isoformat(), + "end_date": completed_experiment.end_date.isoformat(), + } + ], + "new_external_data_sources_connected_in_last_7_days": [ + { + "source_type": "Stripe", + "id": external_data_source.id, + } + ], + "new_surveys_launched_in_last_7_days": [ + { + "name": "Test Survey", + "id": survey.id, + "start_date": survey.start_date.isoformat(), + "description": "Test Description", + } + ], + "new_feature_flags_created_in_last_7_days": [ + { + "name": "Test Flag", + "id": feature_flag.id, + "key": "test-flag", + } + ], + } + + mock_capture.delay.assert_called_once_with( + capture_event_name="transactional email", + team_id=self.team.id, + full_report_dict=expected_properties, + send_for_all_members=True, + ) + + @patch("posthog.tasks.weekly_digest.capture_report") + def test_weekly_digest_report_dry_run(self, mock_capture: MagicMock) -> None: + send_all_weekly_digest_reports(dry_run=True) + mock_capture.delay.assert_not_called() diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index 233b2c81e33fd..9f6f80460ec26 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -2,7 +2,7 @@ import os from collections import Counter from collections.abc import Sequence -from datetime import datetime, timedelta +from datetime import datetime from typing import Any, Literal, Optional, TypedDict, Union, cast import requests @@ -11,7 +11,7 @@ from dateutil import parser from django.conf import settings from django.db import connection -from django.db.models import Count, Q, QuerySet, Sum +from django.db.models import Count, Q, Sum from posthoganalytics.client import Client from psycopg import sql from retry import retry @@ -26,17 +26,11 @@ from posthog.logging.timing import timed_log from posthog.models import GroupTypeMapping, OrganizationMembership, User from posthog.models.dashboard import Dashboard -from posthog.models.event_definition import EventDefinition -from posthog.models.experiment import Experiment from posthog.models.feature_flag import FeatureFlag -from posthog.models.feedback.survey import Survey from posthog.models.organization import Organization from posthog.models.plugin import PluginConfig from posthog.models.team.team import Team from posthog.models.utils import namedtuplefetchall -from posthog.session_recordings.models.session_recording_playlist import ( - SessionRecordingPlaylist, -) from posthog.settings import CLICKHOUSE_CLUSTER, INSTANCE_TAG from posthog.tasks.utils import CeleryQueue from posthog.utils import ( @@ -47,7 +41,6 @@ get_previous_day, ) from posthog.warehouse.models import ExternalDataJob -from posthog.warehouse.models.external_data_source import ExternalDataSource logger = structlog.get_logger(__name__) @@ -140,18 +133,6 @@ class UsageReportCounters: php_events_count_in_period: int -@dataclasses.dataclass -class WeeklyDigestReport: - new_dashboards_in_last_7_days: list[dict[str, str]] - new_event_definitions_in_last_7_days: list[dict[str, str]] - new_playlists_created_in_last_7_days: list[dict[str, str]] - new_experiments_launched_in_last_7_days: list[dict[str, str]] - new_experiments_completed_in_last_7_days: list[dict[str, str]] - new_external_data_sources_connected_in_last_7_days: list[dict[str, str]] - new_surveys_launched_in_last_7_days: list[dict[str, str]] - new_feature_flags_created_in_last_7_days: list[dict[str, str]] - - # Instance metadata to be included in overall report @dataclasses.dataclass class InstanceMetadata: @@ -754,89 +735,6 @@ def get_teams_with_hog_function_fetch_calls_in_period( return results -@timed_log() -def get_teams_with_new_dashboards_in_last_7_days( - end: datetime, -) -> QuerySet: - begin = end - timedelta(days=7) - return Dashboard.objects.filter(created_at__gt=begin, created_at__lte=end).values("team_id", "name", "id") - - -@timed_log() -def get_teams_with_new_event_definitions_in_last_7_days( - end: datetime, -) -> QuerySet: - begin = end - timedelta(days=7) - return EventDefinition.objects.filter(created_at__gt=begin, created_at__lte=end).values("team_id", "name", "id") - - -@timed_log() -def get_teams_with_new_playlists_created_in_last_7_days( - end: datetime, -) -> QuerySet: - begin = end - timedelta(days=7) - return SessionRecordingPlaylist.objects.filter(created_at__gt=begin, created_at__lte=end).values( - "team_id", "name", "short_id" - ) - - -@timed_log() -def get_teams_with_new_experiments_launched_in_last_7_days( - end: datetime, -) -> QuerySet: - begin = end - timedelta(days=7) - return Experiment.objects.filter(start_date__gt=begin, start_date__lte=end).values( - "team_id", "name", "id", "start_date" - ) - - -@timed_log() -def get_teams_with_new_experiments_completed_in_last_7_days( - end: datetime, -) -> QuerySet: - begin = end - timedelta(days=7) - return Experiment.objects.filter(end_date__gt=begin, end_date__lte=end).values( - "team_id", "name", "id", "start_date", "end_date" - ) - - -@timed_log() -def get_teams_with_new_external_data_sources_connected_in_last_7_days( - end: datetime, -) -> QuerySet: - begin = end - timedelta(days=7) - return ExternalDataSource.objects.filter(created_at__gt=begin, created_at__lte=end, deleted=False).values( - "team_id", "source_type", "id" - ) - - -@timed_log() -def get_teams_with_new_surveys_launched_in_last_7_days( - end: datetime, -) -> QuerySet: - begin = end - timedelta(days=7) - return Survey.objects.filter(start_date__gt=begin, start_date__lte=end).values( - "team_id", "name", "id", "description", "start_date" - ) - - -@timed_log() -def get_teams_with_new_feature_flags_created_in_last_7_days( - end: datetime, -) -> QuerySet: - begin = end - timedelta(days=7) - return ( - FeatureFlag.objects.filter( - created_at__gt=begin, - created_at__lte=end, - deleted=False, - ) - .exclude(name__contains="Feature Flag for Experiment") - .exclude(name__contains="Targeting flag for survey") - .values("team_id", "name", "id", "key") - ) - - @shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0) def capture_report( *, @@ -890,10 +788,6 @@ def has_non_zero_usage(report: FullUsageReport) -> bool: ) -def has_non_zero_digest(report: WeeklyDigestReport) -> bool: - return any(len(getattr(report, key)) > 0 for key in report.__dataclass_fields__) - - def convert_team_usage_rows_to_dict(rows: list[Union[dict, tuple[int, int]]]) -> dict[int, int]: team_id_map = {} for row in rows: @@ -906,10 +800,6 @@ def convert_team_usage_rows_to_dict(rows: list[Union[dict, tuple[int, int]]]) -> return team_id_map -def convert_team_digest_items_to_dict(items: QuerySet) -> dict[int, QuerySet]: - return {team_id: items.filter(team_id=team_id) for team_id in items.values_list("team_id", flat=True).distinct()} - - def _get_all_usage_data(period_start: datetime, period_end: datetime) -> dict[str, Any]: """ Gets all usage data for the specified period. Clickhouse is good at counting things so @@ -1077,33 +967,6 @@ def _get_all_usage_data(period_start: datetime, period_end: datetime) -> dict[st } -def _get_all_digest_data(period_start: datetime, period_end: datetime) -> dict[str, Any]: - return { - "teams_with_new_dashboards_in_last_7_days": get_teams_with_new_dashboards_in_last_7_days(period_end), - "teams_with_new_event_definitions_in_last_7_days": get_teams_with_new_event_definitions_in_last_7_days( - period_end - ), - "teams_with_new_playlists_created_in_last_7_days": get_teams_with_new_playlists_created_in_last_7_days( - period_end - ), - "teams_with_new_experiments_launched_in_last_7_days": get_teams_with_new_experiments_launched_in_last_7_days( - period_end - ), - "teams_with_new_experiments_completed_in_last_7_days": get_teams_with_new_experiments_completed_in_last_7_days( - period_end - ), - "teams_with_new_external_data_sources_connected_in_last_7_days": get_teams_with_new_external_data_sources_connected_in_last_7_days( - period_end - ), - "teams_with_new_surveys_launched_in_last_7_days": get_teams_with_new_surveys_launched_in_last_7_days( - period_end - ), - "teams_with_new_feature_flags_created_in_last_7_days": get_teams_with_new_feature_flags_created_in_last_7_days( - period_end - ), - } - - def _get_all_usage_data_as_team_rows(period_start: datetime, period_end: datetime) -> dict[str, Any]: """ Gets all usage data for the specified period as a map of team_id -> value. This makes it faster @@ -1187,68 +1050,6 @@ def _get_team_report(all_data: dict[str, Any], team: Team) -> UsageReportCounter ) -def _get_all_digest_data_as_team_rows(period_start: datetime, period_end: datetime) -> dict[str, Any]: - all_digest_data = _get_all_digest_data(period_start, period_end) - # convert it to a map of team_id -> value - for key, rows in all_digest_data.items(): - all_digest_data[key] = convert_team_digest_items_to_dict(rows) - return all_digest_data - - -def _get_weekly_digest_report(all_digest_data: dict[str, Any], team: Team) -> WeeklyDigestReport: - report = WeeklyDigestReport( - new_dashboards_in_last_7_days=[ - {"name": dashboard.get("name"), "id": dashboard.get("id")} - for dashboard in all_digest_data["teams_with_new_dashboards_in_last_7_days"].get(team.id, []) - ], - new_event_definitions_in_last_7_days=[ - {"name": event_definition.get("name"), "id": event_definition.get("id")} - for event_definition in all_digest_data["teams_with_new_event_definitions_in_last_7_days"].get(team.id, []) - ], - new_playlists_created_in_last_7_days=[ - {"name": playlist.get("name"), "id": playlist.get("short_id")} - for playlist in all_digest_data["teams_with_new_playlists_created_in_last_7_days"].get(team.id, []) - ], - new_experiments_launched_in_last_7_days=[ - { - "name": experiment.get("name"), - "id": experiment.get("id"), - "start_date": experiment.get("start_date").isoformat(), - } - for experiment in all_digest_data["teams_with_new_experiments_launched_in_last_7_days"].get(team.id, []) - ], - new_experiments_completed_in_last_7_days=[ - { - "name": experiment.get("name"), - "id": experiment.get("id"), - "start_date": experiment.get("start_date").isoformat(), - "end_date": experiment.get("end_date").isoformat(), - } - for experiment in all_digest_data["teams_with_new_experiments_completed_in_last_7_days"].get(team.id, []) - ], - new_external_data_sources_connected_in_last_7_days=[ - {"source_type": source.get("source_type"), "id": source.get("id")} - for source in all_digest_data["teams_with_new_external_data_sources_connected_in_last_7_days"].get( - team.id, [] - ) - ], - new_surveys_launched_in_last_7_days=[ - { - "name": survey.get("name"), - "id": survey.get("id"), - "start_date": survey.get("start_date").isoformat(), - "description": survey.get("description"), - } - for survey in all_digest_data["teams_with_new_surveys_launched_in_last_7_days"].get(team.id, []) - ], - new_feature_flags_created_in_last_7_days=[ - {"name": feature_flag.get("name"), "id": feature_flag.get("id"), "key": feature_flag.get("key")} - for feature_flag in all_digest_data["teams_with_new_feature_flags_created_in_last_7_days"].get(team.id, []) - ], - ) - return report - - def _add_team_report_to_org_reports( org_reports: dict[str, OrgReport], team: Team, @@ -1287,10 +1088,6 @@ def _get_all_org_reports(period_start: datetime, period_end: datetime) -> dict[s logger.info("Getting all usage data...") # noqa T201 time_now = datetime.now() all_data = _get_all_usage_data_as_team_rows(period_start, period_end) - all_digest_data = None - if datetime.now().weekday() == 0: - logger.debug("Getting all digest data...") # noqa T201 - all_digest_data = _get_all_digest_data_as_team_rows(period_start, period_end) logger.debug(f"Getting all usage data took {(datetime.now() - time_now).total_seconds()} seconds.") # noqa T201 @@ -1307,14 +1104,6 @@ def _get_all_org_reports(period_start: datetime, period_end: datetime) -> dict[s team_report = _get_team_report(all_data, team) _add_team_report_to_org_reports(org_reports, team, team_report, period_start) - # on mondays, send the weekly digest report - if datetime.now().weekday() == 0 and all_digest_data: - weekly_digest_report = _get_weekly_digest_report(all_digest_data, team) - if has_non_zero_digest(weekly_digest_report): - _send_weekly_digest_report( - team_id=team.id, team_name=team.name, weekly_digest_report=weekly_digest_report - ) - time_since = datetime.now() - time_now logger.debug(f"Generating reports for teams took {time_since.total_seconds()} seconds.") # noqa T201 return org_reports @@ -1331,21 +1120,6 @@ def _get_full_org_usage_report_as_dict(full_report: FullUsageReport) -> dict[str return dataclasses.asdict(full_report) -@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=3) -def _send_weekly_digest_report(*, team_id: int, team_name: str, weekly_digest_report: WeeklyDigestReport) -> None: - full_report_dict = { - "team_id": team_id, - "team_name": team_name, - **dataclasses.asdict(weekly_digest_report), - } - capture_report.delay( - capture_event_name="weekly digest report", - team_id=team_id, - full_report_dict=full_report_dict, - send_for_all_members=True, - ) - - @shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=3) def send_all_org_usage_reports( dry_run: bool = False, diff --git a/posthog/tasks/weekly_digest.py b/posthog/tasks/weekly_digest.py new file mode 100644 index 0000000000000..994425b3b7a17 --- /dev/null +++ b/posthog/tasks/weekly_digest.py @@ -0,0 +1,247 @@ +import dataclasses +from datetime import datetime, timedelta +from typing import Any, Optional + +import structlog +from celery import shared_task +from dateutil import parser +from django.db.models import QuerySet +from sentry_sdk import capture_exception + +from posthog.models.dashboard import Dashboard +from posthog.models.event_definition import EventDefinition +from posthog.models.experiment import Experiment +from posthog.models.feature_flag import FeatureFlag +from posthog.models.feedback.survey import Survey +from posthog.models.team.team import Team +from posthog.session_recordings.models.session_recording_playlist import ( + SessionRecordingPlaylist, +) +from posthog.tasks.usage_report import USAGE_REPORT_TASK_KWARGS, capture_report +from posthog.tasks.utils import CeleryQueue +from posthog.utils import get_previous_day +from posthog.warehouse.models.external_data_source import ExternalDataSource + +logger = structlog.get_logger(__name__) + + +@dataclasses.dataclass +class WeeklyDigestReport: + new_dashboards_in_last_7_days: list[dict[str, str]] + new_event_definitions_in_last_7_days: list[dict[str, str]] + new_playlists_created_in_last_7_days: list[dict[str, str]] + new_experiments_launched_in_last_7_days: list[dict[str, str]] + new_experiments_completed_in_last_7_days: list[dict[str, str]] + new_external_data_sources_connected_in_last_7_days: list[dict[str, str]] + new_surveys_launched_in_last_7_days: list[dict[str, str]] + new_feature_flags_created_in_last_7_days: list[dict[str, str]] + + +def get_teams_for_digest() -> list[Team]: + from django.db.models import Q + + return list( + Team.objects.select_related("organization") + .exclude(Q(organization__for_internal_metrics=True) | Q(is_demo=True)) + .only("id", "name", "organization__id", "organization__name", "organization__created_at") + ) + + +def get_teams_with_new_dashboards_in_last_7_days(end: datetime) -> QuerySet: + begin = end - timedelta(days=7) + return Dashboard.objects.filter(created_at__gt=begin, created_at__lte=end).values("team_id", "name", "id") + + +def get_teams_with_new_event_definitions_in_last_7_days(end: datetime) -> QuerySet: + begin = end - timedelta(days=7) + return EventDefinition.objects.filter(created_at__gt=begin, created_at__lte=end).values("team_id", "name", "id") + + +def get_teams_with_new_playlists_created_in_last_7_days(end: datetime) -> QuerySet: + begin = end - timedelta(days=7) + return SessionRecordingPlaylist.objects.filter(created_at__gt=begin, created_at__lte=end).values( + "team_id", "name", "short_id" + ) + + +def get_teams_with_new_experiments_launched_in_last_7_days(end: datetime) -> QuerySet: + begin = end - timedelta(days=7) + return Experiment.objects.filter(start_date__gt=begin, start_date__lte=end).values( + "team_id", "name", "id", "start_date" + ) + + +def get_teams_with_new_experiments_completed_in_last_7_days(end: datetime) -> QuerySet: + begin = end - timedelta(days=7) + return Experiment.objects.filter(end_date__gt=begin, end_date__lte=end).values( + "team_id", "name", "id", "start_date", "end_date" + ) + + +def get_teams_with_new_external_data_sources_connected_in_last_7_days(end: datetime) -> QuerySet: + begin = end - timedelta(days=7) + return ExternalDataSource.objects.filter(created_at__gt=begin, created_at__lte=end, deleted=False).values( + "team_id", "source_type", "id" + ) + + +def get_teams_with_new_surveys_launched_in_last_7_days(end: datetime) -> QuerySet: + begin = end - timedelta(days=7) + return Survey.objects.filter(start_date__gt=begin, start_date__lte=end).values( + "team_id", "name", "id", "description", "start_date" + ) + + +def get_teams_with_new_feature_flags_created_in_last_7_days(end: datetime) -> QuerySet: + begin = end - timedelta(days=7) + return ( + FeatureFlag.objects.filter( + created_at__gt=begin, + created_at__lte=end, + deleted=False, + ) + .exclude(name__contains="Feature Flag for Experiment") + .exclude(name__contains="Targeting flag for survey") + .values("team_id", "name", "id", "key") + ) + + +def convert_team_digest_items_to_dict(items: QuerySet) -> dict[int, QuerySet]: + return {team_id: items.filter(team_id=team_id) for team_id in items.values_list("team_id", flat=True).distinct()} + + +def has_non_zero_digest(report: WeeklyDigestReport) -> bool: + return any(len(getattr(report, key)) > 0 for key in report.__dataclass_fields__) + + +def _get_all_digest_data_as_team_rows(period_start: datetime, period_end: datetime) -> dict[str, Any]: + all_digest_data = _get_all_digest_data(period_start, period_end) + # convert it to a map of team_id -> value + for key, rows in all_digest_data.items(): + all_digest_data[key] = convert_team_digest_items_to_dict(rows) + return all_digest_data + + +def _get_all_digest_data(period_start: datetime, period_end: datetime) -> dict[str, Any]: + return { + "teams_with_new_dashboards_in_last_7_days": get_teams_with_new_dashboards_in_last_7_days(period_end), + "teams_with_new_event_definitions_in_last_7_days": get_teams_with_new_event_definitions_in_last_7_days( + period_end + ), + "teams_with_new_playlists_created_in_last_7_days": get_teams_with_new_playlists_created_in_last_7_days( + period_end + ), + "teams_with_new_experiments_launched_in_last_7_days": get_teams_with_new_experiments_launched_in_last_7_days( + period_end + ), + "teams_with_new_experiments_completed_in_last_7_days": get_teams_with_new_experiments_completed_in_last_7_days( + period_end + ), + "teams_with_new_external_data_sources_connected_in_last_7_days": get_teams_with_new_external_data_sources_connected_in_last_7_days( + period_end + ), + "teams_with_new_surveys_launched_in_last_7_days": get_teams_with_new_surveys_launched_in_last_7_days( + period_end + ), + "teams_with_new_feature_flags_created_in_last_7_days": get_teams_with_new_feature_flags_created_in_last_7_days( + period_end + ), + } + + +def get_weekly_digest_report(all_digest_data: dict[str, Any], team: Team) -> WeeklyDigestReport: + return WeeklyDigestReport( + new_dashboards_in_last_7_days=[ + {"name": dashboard.get("name"), "id": dashboard.get("id")} + for dashboard in all_digest_data["teams_with_new_dashboards_in_last_7_days"].get(team.id, []) + ], + new_event_definitions_in_last_7_days=[ + {"name": event_definition.get("name"), "id": event_definition.get("id")} + for event_definition in all_digest_data["teams_with_new_event_definitions_in_last_7_days"].get(team.id, []) + ], + new_playlists_created_in_last_7_days=[ + {"name": playlist.get("name"), "id": playlist.get("short_id")} + for playlist in all_digest_data["teams_with_new_playlists_created_in_last_7_days"].get(team.id, []) + ], + new_experiments_launched_in_last_7_days=[ + { + "name": experiment.get("name"), + "id": experiment.get("id"), + "start_date": experiment.get("start_date").isoformat(), + } + for experiment in all_digest_data["teams_with_new_experiments_launched_in_last_7_days"].get(team.id, []) + ], + new_experiments_completed_in_last_7_days=[ + { + "name": experiment.get("name"), + "id": experiment.get("id"), + "start_date": experiment.get("start_date").isoformat(), + "end_date": experiment.get("end_date").isoformat(), + } + for experiment in all_digest_data["teams_with_new_experiments_completed_in_last_7_days"].get(team.id, []) + ], + new_external_data_sources_connected_in_last_7_days=[ + {"source_type": source.get("source_type"), "id": source.get("id")} + for source in all_digest_data["teams_with_new_external_data_sources_connected_in_last_7_days"].get( + team.id, [] + ) + ], + new_surveys_launched_in_last_7_days=[ + { + "name": survey.get("name"), + "id": survey.get("id"), + "start_date": survey.get("start_date").isoformat(), + "description": survey.get("description"), + } + for survey in all_digest_data["teams_with_new_surveys_launched_in_last_7_days"].get(team.id, []) + ], + new_feature_flags_created_in_last_7_days=[ + {"name": feature_flag.get("name"), "id": feature_flag.get("id"), "key": feature_flag.get("key")} + for feature_flag in all_digest_data["teams_with_new_feature_flags_created_in_last_7_days"].get(team.id, []) + ], + ) + + +@shared_task(queue=CeleryQueue.USAGE_REPORTS.value, ignore_result=True, max_retries=3) +def send_weekly_digest_report(*, team_id: int, team_name: str, weekly_digest_report: dict[str, Any]) -> None: + full_report_dict = { + "team_id": team_id, + "team_name": team_name, + "template": "weekly_digest_report", + **weekly_digest_report, + } + capture_report.delay( + capture_event_name="transactional email", + team_id=team_id, + full_report_dict=full_report_dict, + send_for_all_members=True, + ) + + +@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0) +def send_all_weekly_digest_reports( + dry_run: bool = False, + at: Optional[str] = None, +) -> None: + at_date = parser.parse(at) if at else None + period = get_previous_day(at=at_date) + period_start, period_end = period + + try: + all_digest_data = _get_all_digest_data_as_team_rows(period_start, period_end) + teams = get_teams_for_digest() + time_now = datetime.now() + for team in teams: + report = get_weekly_digest_report(all_digest_data, team) + full_report_dict = dataclasses.asdict(report) + + # Then capture as events to PostHog, so they can be sent via email + if has_non_zero_digest(report) and not dry_run: + send_weekly_digest_report.delay( + team_id=team.id, team_name=team.name, weekly_digest_report=full_report_dict + ) + time_since = datetime.now() - time_now + logger.debug(f"Sending usage reports to PostHog and Billing took {time_since.total_seconds()} seconds.") # noqa T201 + except Exception as err: + capture_exception(err) + raise From 36a38c78a3166ef5e453534cfa28dffa4f105c9f Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Fri, 29 Nov 2024 14:11:33 -0800 Subject: [PATCH 04/12] let it be for any time range --- .../{weekly_digest.py => periodic_digest.py} | 148 ++++++++---------- ...ekly_digest.py => test_periodic_digest.py} | 119 +++++++++++--- 2 files changed, 167 insertions(+), 100 deletions(-) rename posthog/tasks/{weekly_digest.py => periodic_digest.py} (57%) rename posthog/tasks/test/{test_weekly_digest.py => test_periodic_digest.py} (58%) diff --git a/posthog/tasks/weekly_digest.py b/posthog/tasks/periodic_digest.py similarity index 57% rename from posthog/tasks/weekly_digest.py rename to posthog/tasks/periodic_digest.py index 994425b3b7a17..0990c55506f3b 100644 --- a/posthog/tasks/weekly_digest.py +++ b/posthog/tasks/periodic_digest.py @@ -17,24 +17,27 @@ from posthog.session_recordings.models.session_recording_playlist import ( SessionRecordingPlaylist, ) -from posthog.tasks.usage_report import USAGE_REPORT_TASK_KWARGS, capture_report +from posthog.tasks.usage_report import ( + USAGE_REPORT_TASK_KWARGS, + capture_report, + get_instance_metadata, +) from posthog.tasks.utils import CeleryQueue -from posthog.utils import get_previous_day from posthog.warehouse.models.external_data_source import ExternalDataSource logger = structlog.get_logger(__name__) @dataclasses.dataclass -class WeeklyDigestReport: - new_dashboards_in_last_7_days: list[dict[str, str]] - new_event_definitions_in_last_7_days: list[dict[str, str]] - new_playlists_created_in_last_7_days: list[dict[str, str]] - new_experiments_launched_in_last_7_days: list[dict[str, str]] - new_experiments_completed_in_last_7_days: list[dict[str, str]] - new_external_data_sources_connected_in_last_7_days: list[dict[str, str]] - new_surveys_launched_in_last_7_days: list[dict[str, str]] - new_feature_flags_created_in_last_7_days: list[dict[str, str]] +class periodicDigestReport: + new_dashboards: list[dict[str, str]] + new_event_definitions: list[dict[str, str]] + new_playlists: list[dict[str, str]] + new_experiments_launched: list[dict[str, str]] + new_experiments_completed: list[dict[str, str]] + new_external_data_sources: list[dict[str, str]] + new_surveys_launched: list[dict[str, str]] + new_feature_flags: list[dict[str, str]] def get_teams_for_digest() -> list[Team]: @@ -47,53 +50,45 @@ def get_teams_for_digest() -> list[Team]: ) -def get_teams_with_new_dashboards_in_last_7_days(end: datetime) -> QuerySet: - begin = end - timedelta(days=7) +def get_teams_with_new_dashboards(end: datetime, begin: datetime) -> QuerySet: return Dashboard.objects.filter(created_at__gt=begin, created_at__lte=end).values("team_id", "name", "id") -def get_teams_with_new_event_definitions_in_last_7_days(end: datetime) -> QuerySet: - begin = end - timedelta(days=7) +def get_teams_with_new_event_definitions(end: datetime, begin: datetime) -> QuerySet: return EventDefinition.objects.filter(created_at__gt=begin, created_at__lte=end).values("team_id", "name", "id") -def get_teams_with_new_playlists_created_in_last_7_days(end: datetime) -> QuerySet: - begin = end - timedelta(days=7) +def get_teams_with_new_playlists(end: datetime, begin: datetime) -> QuerySet: return SessionRecordingPlaylist.objects.filter(created_at__gt=begin, created_at__lte=end).values( "team_id", "name", "short_id" ) -def get_teams_with_new_experiments_launched_in_last_7_days(end: datetime) -> QuerySet: - begin = end - timedelta(days=7) +def get_teams_with_new_experiments_launched(end: datetime, begin: datetime) -> QuerySet: return Experiment.objects.filter(start_date__gt=begin, start_date__lte=end).values( "team_id", "name", "id", "start_date" ) -def get_teams_with_new_experiments_completed_in_last_7_days(end: datetime) -> QuerySet: - begin = end - timedelta(days=7) +def get_teams_with_new_experiments_completed(end: datetime, begin: datetime) -> QuerySet: return Experiment.objects.filter(end_date__gt=begin, end_date__lte=end).values( "team_id", "name", "id", "start_date", "end_date" ) -def get_teams_with_new_external_data_sources_connected_in_last_7_days(end: datetime) -> QuerySet: - begin = end - timedelta(days=7) +def get_teams_with_new_external_data_sources(end: datetime, begin: datetime) -> QuerySet: return ExternalDataSource.objects.filter(created_at__gt=begin, created_at__lte=end, deleted=False).values( "team_id", "source_type", "id" ) -def get_teams_with_new_surveys_launched_in_last_7_days(end: datetime) -> QuerySet: - begin = end - timedelta(days=7) +def get_teams_with_new_surveys_launched(end: datetime, begin: datetime) -> QuerySet: return Survey.objects.filter(start_date__gt=begin, start_date__lte=end).values( "team_id", "name", "id", "description", "start_date" ) -def get_teams_with_new_feature_flags_created_in_last_7_days(end: datetime) -> QuerySet: - begin = end - timedelta(days=7) +def get_teams_with_new_feature_flags(end: datetime, begin: datetime) -> QuerySet: return ( FeatureFlag.objects.filter( created_at__gt=begin, @@ -110,7 +105,7 @@ def convert_team_digest_items_to_dict(items: QuerySet) -> dict[int, QuerySet]: return {team_id: items.filter(team_id=team_id) for team_id in items.values_list("team_id", flat=True).distinct()} -def has_non_zero_digest(report: WeeklyDigestReport) -> bool: +def has_non_zero_digest(report: periodicDigestReport) -> bool: return any(len(getattr(report, key)) > 0 for key in report.__dataclass_fields__) @@ -124,91 +119,78 @@ def _get_all_digest_data_as_team_rows(period_start: datetime, period_end: dateti def _get_all_digest_data(period_start: datetime, period_end: datetime) -> dict[str, Any]: return { - "teams_with_new_dashboards_in_last_7_days": get_teams_with_new_dashboards_in_last_7_days(period_end), - "teams_with_new_event_definitions_in_last_7_days": get_teams_with_new_event_definitions_in_last_7_days( - period_end - ), - "teams_with_new_playlists_created_in_last_7_days": get_teams_with_new_playlists_created_in_last_7_days( - period_end - ), - "teams_with_new_experiments_launched_in_last_7_days": get_teams_with_new_experiments_launched_in_last_7_days( - period_end - ), - "teams_with_new_experiments_completed_in_last_7_days": get_teams_with_new_experiments_completed_in_last_7_days( - period_end - ), - "teams_with_new_external_data_sources_connected_in_last_7_days": get_teams_with_new_external_data_sources_connected_in_last_7_days( - period_end - ), - "teams_with_new_surveys_launched_in_last_7_days": get_teams_with_new_surveys_launched_in_last_7_days( - period_end - ), - "teams_with_new_feature_flags_created_in_last_7_days": get_teams_with_new_feature_flags_created_in_last_7_days( - period_end - ), + "teams_with_new_dashboards": get_teams_with_new_dashboards(period_end, period_start), + "teams_with_new_event_definitions": get_teams_with_new_event_definitions(period_end, period_start), + "teams_with_new_playlists": get_teams_with_new_playlists(period_end, period_start), + "teams_with_new_experiments_launched": get_teams_with_new_experiments_launched(period_end, period_start), + "teams_with_new_experiments_completed": get_teams_with_new_experiments_completed(period_end, period_start), + "teams_with_new_external_data_sources": get_teams_with_new_external_data_sources(period_end, period_start), + "teams_with_new_surveys_launched": get_teams_with_new_surveys_launched(period_end, period_start), + "teams_with_new_feature_flags": get_teams_with_new_feature_flags(period_end, period_start), } -def get_weekly_digest_report(all_digest_data: dict[str, Any], team: Team) -> WeeklyDigestReport: - return WeeklyDigestReport( - new_dashboards_in_last_7_days=[ +def get_periodic_digest_report(all_digest_data: dict[str, Any], team: Team) -> periodicDigestReport: + return periodicDigestReport( + new_dashboards=[ {"name": dashboard.get("name"), "id": dashboard.get("id")} - for dashboard in all_digest_data["teams_with_new_dashboards_in_last_7_days"].get(team.id, []) + for dashboard in all_digest_data["teams_with_new_dashboards"].get(team.id, []) ], - new_event_definitions_in_last_7_days=[ + new_event_definitions=[ {"name": event_definition.get("name"), "id": event_definition.get("id")} - for event_definition in all_digest_data["teams_with_new_event_definitions_in_last_7_days"].get(team.id, []) + for event_definition in all_digest_data["teams_with_new_event_definitions"].get(team.id, []) ], - new_playlists_created_in_last_7_days=[ + new_playlists=[ {"name": playlist.get("name"), "id": playlist.get("short_id")} - for playlist in all_digest_data["teams_with_new_playlists_created_in_last_7_days"].get(team.id, []) + for playlist in all_digest_data["teams_with_new_playlists"].get(team.id, []) ], - new_experiments_launched_in_last_7_days=[ + new_experiments_launched=[ { "name": experiment.get("name"), "id": experiment.get("id"), "start_date": experiment.get("start_date").isoformat(), } - for experiment in all_digest_data["teams_with_new_experiments_launched_in_last_7_days"].get(team.id, []) + for experiment in all_digest_data["teams_with_new_experiments_launched"].get(team.id, []) ], - new_experiments_completed_in_last_7_days=[ + new_experiments_completed=[ { "name": experiment.get("name"), "id": experiment.get("id"), "start_date": experiment.get("start_date").isoformat(), "end_date": experiment.get("end_date").isoformat(), } - for experiment in all_digest_data["teams_with_new_experiments_completed_in_last_7_days"].get(team.id, []) + for experiment in all_digest_data["teams_with_new_experiments_completed"].get(team.id, []) ], - new_external_data_sources_connected_in_last_7_days=[ + new_external_data_sources=[ {"source_type": source.get("source_type"), "id": source.get("id")} - for source in all_digest_data["teams_with_new_external_data_sources_connected_in_last_7_days"].get( - team.id, [] - ) + for source in all_digest_data["teams_with_new_external_data_sources"].get(team.id, []) ], - new_surveys_launched_in_last_7_days=[ + new_surveys_launched=[ { "name": survey.get("name"), "id": survey.get("id"), "start_date": survey.get("start_date").isoformat(), "description": survey.get("description"), } - for survey in all_digest_data["teams_with_new_surveys_launched_in_last_7_days"].get(team.id, []) + for survey in all_digest_data["teams_with_new_surveys_launched"].get(team.id, []) ], - new_feature_flags_created_in_last_7_days=[ + new_feature_flags=[ {"name": feature_flag.get("name"), "id": feature_flag.get("id"), "key": feature_flag.get("key")} - for feature_flag in all_digest_data["teams_with_new_feature_flags_created_in_last_7_days"].get(team.id, []) + for feature_flag in all_digest_data["teams_with_new_feature_flags"].get(team.id, []) ], ) @shared_task(queue=CeleryQueue.USAGE_REPORTS.value, ignore_result=True, max_retries=3) -def send_weekly_digest_report(*, team_id: int, team_name: str, weekly_digest_report: dict[str, Any]) -> None: +def send_periodic_digest_report( + *, team_id: int, team_name: str, periodic_digest_report: dict[str, Any], instance_metadata: dict[str, Any] +) -> None: full_report_dict = { "team_id": team_id, "team_name": team_name, - "template": "weekly_digest_report", - **weekly_digest_report, + "template": "periodic_digest_report", + **periodic_digest_report, + **instance_metadata, } capture_report.delay( capture_event_name="transactional email", @@ -219,26 +201,30 @@ def send_weekly_digest_report(*, team_id: int, team_name: str, weekly_digest_rep @shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0) -def send_all_weekly_digest_reports( +def send_all_periodic_digest_reports( dry_run: bool = False, - at: Optional[str] = None, + end_date: Optional[str] = None, + begin_date: Optional[str] = None, ) -> None: - at_date = parser.parse(at) if at else None - period = get_previous_day(at=at_date) - period_start, period_end = period + period_end = parser.parse(end_date) if end_date else datetime.now() + period_start = parser.parse(begin_date) if begin_date else period_end - timedelta(days=7) try: all_digest_data = _get_all_digest_data_as_team_rows(period_start, period_end) teams = get_teams_for_digest() time_now = datetime.now() for team in teams: - report = get_weekly_digest_report(all_digest_data, team) + report = get_periodic_digest_report(all_digest_data, team) full_report_dict = dataclasses.asdict(report) + instance_metadata = dataclasses.asdict(get_instance_metadata((period_start, period_end))) # Then capture as events to PostHog, so they can be sent via email if has_non_zero_digest(report) and not dry_run: - send_weekly_digest_report.delay( - team_id=team.id, team_name=team.name, weekly_digest_report=full_report_dict + send_periodic_digest_report.delay( + team_id=team.id, + team_name=team.name, + periodic_digest_report=full_report_dict, + instance_metadata=instance_metadata, ) time_since = datetime.now() - time_now logger.debug(f"Sending usage reports to PostHog and Billing took {time_since.total_seconds()} seconds.") # noqa T201 diff --git a/posthog/tasks/test/test_weekly_digest.py b/posthog/tasks/test/test_periodic_digest.py similarity index 58% rename from posthog/tasks/test/test_weekly_digest.py rename to posthog/tasks/test/test_periodic_digest.py index 40614305f1388..1dc62645178e4 100644 --- a/posthog/tasks/test/test_weekly_digest.py +++ b/posthog/tasks/test/test_periodic_digest.py @@ -1,5 +1,5 @@ from datetime import timedelta -from unittest.mock import MagicMock, patch +from unittest.mock import ANY, MagicMock, patch from uuid import uuid4 from django.utils.timezone import now @@ -9,20 +9,20 @@ from posthog.session_recordings.models.session_recording_playlist import ( SessionRecordingPlaylist, ) -from posthog.tasks.weekly_digest import send_all_weekly_digest_reports +from posthog.tasks.periodic_digest import send_all_periodic_digest_reports from posthog.test.base import APIBaseTest from posthog.warehouse.models import ExternalDataSource @freeze_time("2024-01-01T00:01:00Z") # A Monday -class TestWeeklyDigestReport(APIBaseTest): +class TestperiodicDigestReport(APIBaseTest): def setUp(self) -> None: super().setUp() self.distinct_id = str(uuid4()) @freeze_time("2024-01-20T00:01:00Z") - @patch("posthog.tasks.weekly_digest.capture_report") - def test_weekly_digest_report(self, mock_capture: MagicMock) -> None: + @patch("posthog.tasks.periodic_digest.capture_report") + def test_periodic_digest_report(self, mock_capture: MagicMock) -> None: # Create test data from "last week" with freeze_time("2024-01-15T00:01:00Z"): # Create a dashboard @@ -105,40 +105,58 @@ def test_weekly_digest_report(self, mock_capture: MagicMock) -> None: feature_flag=flag_for_completed_experiment, ) - # Run the weekly digest report task - send_all_weekly_digest_reports() + # Run the periodic digest report task + send_all_periodic_digest_reports() # Check that the capture event was called with the correct data expected_properties = { "team_id": self.team.id, "team_name": self.team.name, - "template": "weekly_digest_report", - "new_dashboards_in_last_7_days": [ + "template": "periodic_digest_report", + "users_who_logged_in": [], + "users_who_logged_in_count": 0, + "users_who_signed_up": [], + "users_who_signed_up_count": 0, + "period": { + "end_inclusive": "2024-01-20T00:01:00", + "start_inclusive": "2024-01-13T00:01:00", + }, + "plugins_enabled": {}, + "plugins_installed": {}, + "product": "open source", + "realm": "hosted-clickhouse", + "site_url": "http://localhost:8000", + "table_sizes": ANY, + "clickhouse_version": ANY, + "deployment_infrastructure": "unknown", + "helm": {}, + "instance_tag": "none", + "new_dashboards": [ { "name": "Test Dashboard", "id": dashboard.id, } ], - "new_event_definitions_in_last_7_days": [ + "new_event_definitions": [ { "name": "Test Event", "id": event_definition.id, } ], - "new_playlists_created_in_last_7_days": [ + "new_playlists": [ { "name": "Test Playlist", "id": playlist.short_id, } ], - "new_experiments_launched_in_last_7_days": [ + "new_experiments_launched": [ { "name": "Launched Experiment", "id": launched_experiment.id, "start_date": launched_experiment.start_date.isoformat(), } ], - "new_experiments_completed_in_last_7_days": [ + "new_experiments_completed": [ { "name": "Completed Experiment", "id": completed_experiment.id, @@ -146,13 +164,13 @@ def test_weekly_digest_report(self, mock_capture: MagicMock) -> None: "end_date": completed_experiment.end_date.isoformat(), } ], - "new_external_data_sources_connected_in_last_7_days": [ + "new_external_data_sources": [ { "source_type": "Stripe", "id": external_data_source.id, } ], - "new_surveys_launched_in_last_7_days": [ + "new_surveys_launched": [ { "name": "Test Survey", "id": survey.id, @@ -160,7 +178,7 @@ def test_weekly_digest_report(self, mock_capture: MagicMock) -> None: "description": "Test Description", } ], - "new_feature_flags_created_in_last_7_days": [ + "new_feature_flags": [ { "name": "Test Flag", "id": feature_flag.id, @@ -176,7 +194,70 @@ def test_weekly_digest_report(self, mock_capture: MagicMock) -> None: send_for_all_members=True, ) - @patch("posthog.tasks.weekly_digest.capture_report") - def test_weekly_digest_report_dry_run(self, mock_capture: MagicMock) -> None: - send_all_weekly_digest_reports(dry_run=True) + @patch("posthog.tasks.periodic_digest.capture_report") + def test_periodic_digest_report_dry_run(self, mock_capture: MagicMock) -> None: + send_all_periodic_digest_reports(dry_run=True) mock_capture.delay.assert_not_called() + + @patch("posthog.tasks.periodic_digest.capture_report") + def test_periodic_digest_report_custom_dates(self, mock_capture: MagicMock) -> None: + # Create test data + with freeze_time("2024-01-15T00:01:00Z"): + dashboard = Dashboard.objects.create( + team=self.team, + name="Test Dashboard", + ) + with freeze_time("2024-01-13T00:01:00Z"): + # outside the range, should be excluded + Dashboard.objects.create( + team=self.team, + name="Test Dashboard", + ) + + # Run the periodic digest report task with custom dates + send_all_periodic_digest_reports(begin_date="2024-01-14T00:00:00Z", end_date="2024-01-16T00:00:00Z") + + # Check that the capture event was called with the correct data + expected_properties = { + "team_id": self.team.id, + "team_name": self.team.name, + "template": "periodic_digest_report", + "users_who_logged_in": [], + "users_who_logged_in_count": 0, + "users_who_signed_up": [], + "users_who_signed_up_count": 0, + "period": { + "end_inclusive": "2024-01-16T00:00:00+00:00", + "start_inclusive": "2024-01-14T00:00:00+00:00", + }, + "plugins_enabled": {}, + "plugins_installed": {}, + "product": "open source", + "realm": "hosted-clickhouse", + "site_url": "http://localhost:8000", + "table_sizes": ANY, + "clickhouse_version": ANY, + "deployment_infrastructure": "unknown", + "helm": {}, + "instance_tag": "none", + "new_dashboards": [ + { + "name": "Test Dashboard", + "id": dashboard.id, + } + ], + "new_event_definitions": [], + "new_playlists": [], + "new_experiments_launched": [], + "new_experiments_completed": [], + "new_external_data_sources": [], + "new_surveys_launched": [], + "new_feature_flags": [], + } + + mock_capture.delay.assert_called_once_with( + capture_event_name="transactional email", + team_id=self.team.id, + full_report_dict=expected_properties, + send_for_all_members=True, + ) From d0b35394c495a9ee02a34a8948731daa5ee7042c Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Fri, 29 Nov 2024 14:18:16 -0800 Subject: [PATCH 05/12] add to celery scheduler --- posthog/tasks/scheduled.py | 14 ++++++++++++-- posthog/tasks/test/test_periodic_digest.py | 14 +++++++++----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/posthog/tasks/scheduled.py b/posthog/tasks/scheduled.py index 0d9628490b788..41cfdef6658e4 100644 --- a/posthog/tasks/scheduled.py +++ b/posthog/tasks/scheduled.py @@ -1,3 +1,4 @@ +from datetime import datetime from random import randrange from typing import Any @@ -9,12 +10,13 @@ from posthog.caching.warming import schedule_warming_for_teams_task from posthog.celery import app from posthog.tasks.alerts.checks import ( + alerts_backlog_task, check_alerts_task, checks_cleanup_task, - alerts_backlog_task, reset_stuck_alerts_task, ) from posthog.tasks.integrations import refresh_integrations +from posthog.tasks.periodic_digest import send_all_periodic_digest_reports from posthog.tasks.tasks import ( calculate_cohort, calculate_decide_usage, @@ -52,9 +54,9 @@ sync_all_organization_available_product_features, update_event_partitions, update_quota_limiting, + update_survey_adaptive_sampling, update_survey_iteration, verify_persons_data_in_sync, - update_survey_adaptive_sampling, ) from posthog.utils import get_crontab @@ -130,6 +132,14 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None: name="update quota limiting", ) + # Send all periodic digest reports + weekly_digest_end_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + sender.add_periodic_task( + crontab(hour="9", minute="0", day_of_week="mon"), + send_all_periodic_digest_reports.s(end_date=weekly_digest_end_date.isoformat()), + name="send all weekly digest reports", + ) + # PostHog Cloud cron jobs # NOTE: We can't use is_cloud here as some Django elements aren't loaded yet. We check in the task execution instead # Verify that persons data is in sync every day at 4 AM UTC diff --git a/posthog/tasks/test/test_periodic_digest.py b/posthog/tasks/test/test_periodic_digest.py index 1dc62645178e4..f63ff7c8e4ed9 100644 --- a/posthog/tasks/test/test_periodic_digest.py +++ b/posthog/tasks/test/test_periodic_digest.py @@ -1,4 +1,4 @@ -from datetime import timedelta +from datetime import datetime, timedelta from unittest.mock import ANY, MagicMock, patch from uuid import uuid4 @@ -15,7 +15,7 @@ @freeze_time("2024-01-01T00:01:00Z") # A Monday -class TestperiodicDigestReport(APIBaseTest): +class TestPeriodicDigestReport(APIBaseTest): def setUp(self) -> None: super().setUp() self.distinct_id = str(uuid4()) @@ -214,8 +214,12 @@ def test_periodic_digest_report_custom_dates(self, mock_capture: MagicMock) -> N name="Test Dashboard", ) + with freeze_time("2024-01-16T00:01:00Z"): + end_date = datetime.now() + begin_date = end_date - timedelta(days=2) + # Run the periodic digest report task with custom dates - send_all_periodic_digest_reports(begin_date="2024-01-14T00:00:00Z", end_date="2024-01-16T00:00:00Z") + send_all_periodic_digest_reports(begin_date=begin_date.isoformat(), end_date=end_date.isoformat()) # Check that the capture event was called with the correct data expected_properties = { @@ -227,8 +231,8 @@ def test_periodic_digest_report_custom_dates(self, mock_capture: MagicMock) -> N "users_who_signed_up": [], "users_who_signed_up_count": 0, "period": { - "end_inclusive": "2024-01-16T00:00:00+00:00", - "start_inclusive": "2024-01-14T00:00:00+00:00", + "end_inclusive": "2024-01-16T00:01:00", + "start_inclusive": "2024-01-14T00:01:00", }, "plugins_enabled": {}, "plugins_installed": {}, From 2eba0940c25295e9a26097697a6519b738059e81 Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Fri, 29 Nov 2024 14:28:11 -0800 Subject: [PATCH 06/12] ignore mypy errors --- posthog/tasks/test/test_periodic_digest.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/posthog/tasks/test/test_periodic_digest.py b/posthog/tasks/test/test_periodic_digest.py index f63ff7c8e4ed9..fc2fbff2a464e 100644 --- a/posthog/tasks/test/test_periodic_digest.py +++ b/posthog/tasks/test/test_periodic_digest.py @@ -153,15 +153,15 @@ def test_periodic_digest_report(self, mock_capture: MagicMock) -> None: { "name": "Launched Experiment", "id": launched_experiment.id, - "start_date": launched_experiment.start_date.isoformat(), + "start_date": launched_experiment.start_date.isoformat(), # type: ignore } ], "new_experiments_completed": [ { "name": "Completed Experiment", "id": completed_experiment.id, - "start_date": completed_experiment.start_date.isoformat(), - "end_date": completed_experiment.end_date.isoformat(), + "start_date": completed_experiment.start_date.isoformat(), # type: ignore + "end_date": completed_experiment.end_date.isoformat(), # type: ignore } ], "new_external_data_sources": [ @@ -174,7 +174,7 @@ def test_periodic_digest_report(self, mock_capture: MagicMock) -> None: { "name": "Test Survey", "id": survey.id, - "start_date": survey.start_date.isoformat(), + "start_date": survey.start_date.isoformat(), # type: ignore "description": "Test Description", } ], From bc451d62b8117595f9af9a84925b3aaee79ca943 Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 29 Nov 2024 22:33:46 +0000 Subject: [PATCH 07/12] Update query snapshots --- .../test_session_recordings.ambr | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr index 80349bbc75121..ccba484c51a23 100644 --- a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr +++ b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr @@ -640,12 +640,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '437' + AND "ee_accesscontrol"."resource_id" = '421' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '437' + AND "ee_accesscontrol"."resource_id" = '421' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -1688,12 +1688,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -2441,12 +2441,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -3129,12 +3129,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -3881,12 +3881,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -4597,12 +4597,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -5395,12 +5395,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -5659,12 +5659,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -6091,12 +6091,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -6556,12 +6556,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -7248,12 +7248,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -7997,12 +7997,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '444' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL From 31ad4dec60d9228388c0cd922f839a0a3bde8001 Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Fri, 6 Dec 2024 14:11:51 -0800 Subject: [PATCH 08/12] use messaging record --- posthog/models/messaging.py | 16 +++++ posthog/tasks/periodic_digest.py | 31 +++++++- posthog/tasks/test/test_periodic_digest.py | 82 ++++++++++++++++++++++ posthog/tasks/test/test_usage_report.py | 4 -- 4 files changed, 128 insertions(+), 5 deletions(-) diff --git a/posthog/models/messaging.py b/posthog/models/messaging.py index 1f013ce09f75d..cd929d33b9986 100644 --- a/posthog/models/messaging.py +++ b/posthog/models/messaging.py @@ -19,6 +19,22 @@ def get_or_create(self, defaults=None, **kwargs): return super().get_or_create(defaults, **kwargs) + def filter(self, *args, **kwargs): + raw_email = kwargs.pop("raw_email", None) + + if raw_email: + kwargs["email_hash"] = get_email_hash(raw_email) + + return super().filter(*args, **kwargs) + + def get(self, *args, **kwargs): + raw_email = kwargs.pop("raw_email", None) + + if raw_email: + kwargs["email_hash"] = get_email_hash(raw_email) + + return super().get(*args, **kwargs) + class MessagingRecord(UUIDModel): objects = MessagingRecordManager() diff --git a/posthog/tasks/periodic_digest.py b/posthog/tasks/periodic_digest.py index 0990c55506f3b..bd13d9fc56669 100644 --- a/posthog/tasks/periodic_digest.py +++ b/posthog/tasks/periodic_digest.py @@ -6,6 +6,7 @@ from celery import shared_task from dateutil import parser from django.db.models import QuerySet +from django.utils import timezone from sentry_sdk import capture_exception from posthog.models.dashboard import Dashboard @@ -13,6 +14,7 @@ from posthog.models.experiment import Experiment from posthog.models.feature_flag import FeatureFlag from posthog.models.feedback.survey import Survey +from posthog.models.messaging import MessagingRecord from posthog.models.team.team import Team from posthog.session_recordings.models.session_recording_playlist import ( SessionRecordingPlaylist, @@ -183,8 +185,28 @@ def get_periodic_digest_report(all_digest_data: dict[str, Any], team: Team) -> p @shared_task(queue=CeleryQueue.USAGE_REPORTS.value, ignore_result=True, max_retries=3) def send_periodic_digest_report( - *, team_id: int, team_name: str, periodic_digest_report: dict[str, Any], instance_metadata: dict[str, Any] + *, + team_id: int, + team_name: str, + periodic_digest_report: dict[str, Any], + instance_metadata: dict[str, Any], + period_end: datetime, + period_start: datetime, ) -> None: + period_str = period_end.strftime("%Y-%m-%d") + days = (period_end - period_start).days + campaign_key = f"periodic_digest_{period_str}_{days}d" + + # Use a consistent identifier for the team + team_identifier = f"team_{team_id}" + + # Check if we've already sent this digest using get_or_create + record, created = MessagingRecord.objects.get_or_create(raw_email=team_identifier, campaign_key=campaign_key) + + if not created and record.sent_at: + logger.info(f"Skipping duplicate periodic digest for team {team_id} for period ending {period_str}") + return + full_report_dict = { "team_id": team_id, "team_name": team_name, @@ -192,6 +214,7 @@ def send_periodic_digest_report( **periodic_digest_report, **instance_metadata, } + capture_report.delay( capture_event_name="transactional email", team_id=team_id, @@ -199,6 +222,10 @@ def send_periodic_digest_report( send_for_all_members=True, ) + # Mark as sent + record.sent_at = timezone.now() + record.save() + @shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0) def send_all_periodic_digest_reports( @@ -225,6 +252,8 @@ def send_all_periodic_digest_reports( team_name=team.name, periodic_digest_report=full_report_dict, instance_metadata=instance_metadata, + period_end=period_end, + period_start=period_start, ) time_since = datetime.now() - time_now logger.debug(f"Sending usage reports to PostHog and Billing took {time_since.total_seconds()} seconds.") # noqa T201 diff --git a/posthog/tasks/test/test_periodic_digest.py b/posthog/tasks/test/test_periodic_digest.py index fc2fbff2a464e..a962dfc5e5bd0 100644 --- a/posthog/tasks/test/test_periodic_digest.py +++ b/posthog/tasks/test/test_periodic_digest.py @@ -6,6 +6,7 @@ from freezegun import freeze_time from posthog.models import Dashboard, EventDefinition, Experiment, FeatureFlag, Survey +from posthog.models.messaging import MessagingRecord from posthog.session_recordings.models.session_recording_playlist import ( SessionRecordingPlaylist, ) @@ -265,3 +266,84 @@ def test_periodic_digest_report_custom_dates(self, mock_capture: MagicMock) -> N full_report_dict=expected_properties, send_for_all_members=True, ) + + @freeze_time("2024-01-20T00:01:00Z") + @patch("posthog.tasks.periodic_digest.capture_report") + def test_periodic_digest_report_idempotency(self, mock_capture: MagicMock) -> None: + # Create test data + Dashboard.objects.create( + team=self.team, + name="Test Dashboard", + ) + + # First run - should send the digest + send_all_periodic_digest_reports() + + # Verify first call + mock_capture.delay.assert_called_once() + mock_capture.delay.reset_mock() + + # Check that messaging record was created + record = MessagingRecord.objects.get( + raw_email=f"team_{self.team.id}", campaign_key="periodic_digest_2024-01-20_7d" + ) + self.assertIsNotNone(record.sent_at) + + # Second run - should not send the digest again + send_all_periodic_digest_reports() + mock_capture.delay.assert_not_called() + + # Verify only one record exists + self.assertEqual(MessagingRecord.objects.count(), 1) + + @freeze_time("2024-01-20T00:01:00Z") + @patch("posthog.tasks.periodic_digest.capture_report") + def test_periodic_digest_different_periods(self, mock_capture: MagicMock) -> None: + # Create test data + Dashboard.objects.create( + team=self.team, + name="Test Dashboard", + ) + + # Send weekly digest + send_all_periodic_digest_reports() + mock_capture.delay.assert_called_once() + mock_capture.delay.reset_mock() + + # Send monthly digest (different period length) + send_all_periodic_digest_reports( + begin_date=(datetime.now() - timedelta(days=30)).isoformat(), end_date=datetime.now().isoformat() + ) + mock_capture.delay.assert_called_once() + + # Verify two different records exist + records = MessagingRecord.objects.filter(raw_email=f"team_{self.team.id}") + self.assertEqual(records.count(), 2) + campaign_keys = sorted([r.campaign_key for r in records]) + self.assertEqual(campaign_keys, ["periodic_digest_2024-01-20_30d", "periodic_digest_2024-01-20_7d"]) + + @freeze_time("2024-01-20T00:01:00Z") + @patch("posthog.tasks.periodic_digest.capture_report") + def test_periodic_digest_empty_report_no_record(self, mock_capture: MagicMock) -> None: + # Run without any data (empty digest) + send_all_periodic_digest_reports() + + # Verify no capture call and no messaging record + mock_capture.delay.assert_not_called() + self.assertEqual(MessagingRecord.objects.count(), 0) + + @freeze_time("2024-01-20T00:01:00Z") + @patch("posthog.tasks.periodic_digest.capture_report") + def test_periodic_digest_dry_run_no_record(self, mock_capture: MagicMock) -> None: + # Create test data + Dashboard.objects.create( + team=self.team, + name="Test Dashboard", + ) + + # Run in dry_run mode + send_all_periodic_digest_reports(dry_run=True) + + # Verify no capture call and no messaging record + mock_capture.delay.assert_not_called() + self.assertEqual(MessagingRecord.objects.count(), 0) diff --git a/posthog/tasks/test/test_usage_report.py b/posthog/tasks/test/test_usage_report.py index 2391d8721e0f1..a0c14a08a82d0 100644 --- a/posthog/tasks/test/test_usage_report.py +++ b/posthog/tasks/test/test_usage_report.py @@ -815,10 +815,6 @@ def test_unlicensed_usage_report(self, mock_post: MagicMock, mock_client: MagicM mock_posthog = MagicMock() mock_client.return_value = mock_posthog - # # make a dashboard for the team created within the last 7 days - # with freeze_time("2022-01-08T00:01:00Z"): - # Dashboard.objects.create(name="Test Dashboard", team=self.team) - all_reports = self._test_usage_report() with self.settings(SITE_URL="http://test.posthog.com"): send_all_org_usage_reports() From 3b0189e6e3fa5d4285aa10d43e7adb58012b7e17 Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Tue, 10 Dec 2024 08:19:27 -0800 Subject: [PATCH 09/12] fix type errors --- posthog/tasks/test/test_periodic_digest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/posthog/tasks/test/test_periodic_digest.py b/posthog/tasks/test/test_periodic_digest.py index a962dfc5e5bd0..01317904cd9c6 100644 --- a/posthog/tasks/test/test_periodic_digest.py +++ b/posthog/tasks/test/test_periodic_digest.py @@ -284,7 +284,7 @@ def test_periodic_digest_report_idempotency(self, mock_capture: MagicMock) -> No mock_capture.delay.reset_mock() # Check that messaging record was created - record = MessagingRecord.objects.get( + record = MessagingRecord.objects.get( # type: ignore raw_email=f"team_{self.team.id}", campaign_key="periodic_digest_2024-01-20_7d" ) self.assertIsNotNone(record.sent_at) @@ -317,7 +317,7 @@ def test_periodic_digest_different_periods(self, mock_capture: MagicMock) -> Non mock_capture.delay.assert_called_once() # Verify two different records exist - records = MessagingRecord.objects.filter(raw_email=f"team_{self.team.id}") + records = MessagingRecord.objects.filter(raw_email=f"team_{self.team.id}") # type: ignore self.assertEqual(records.count(), 2) campaign_keys = sorted([r.campaign_key for r in records]) self.assertEqual(campaign_keys, ["periodic_digest_2024-01-20_30d", "periodic_digest_2024-01-20_7d"]) From 78e73bc263ae902090d968e3a137e417b4b7a62b Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Tue, 10 Dec 2024 08:53:18 -0800 Subject: [PATCH 10/12] move the date calc to the task itself --- posthog/tasks/periodic_digest.py | 4 +++- posthog/tasks/scheduled.py | 4 +--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/posthog/tasks/periodic_digest.py b/posthog/tasks/periodic_digest.py index bd13d9fc56669..9b03f9ee755c6 100644 --- a/posthog/tasks/periodic_digest.py +++ b/posthog/tasks/periodic_digest.py @@ -233,7 +233,9 @@ def send_all_periodic_digest_reports( end_date: Optional[str] = None, begin_date: Optional[str] = None, ) -> None: - period_end = parser.parse(end_date) if end_date else datetime.now() + period_end = ( + parser.parse(end_date) if end_date else datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + ) period_start = parser.parse(begin_date) if begin_date else period_end - timedelta(days=7) try: diff --git a/posthog/tasks/scheduled.py b/posthog/tasks/scheduled.py index c0706e25b98e4..691b4dbd3cb28 100644 --- a/posthog/tasks/scheduled.py +++ b/posthog/tasks/scheduled.py @@ -1,4 +1,3 @@ -from datetime import datetime from random import randrange from typing import Any @@ -132,10 +131,9 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None: ) # Send all periodic digest reports - weekly_digest_end_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) sender.add_periodic_task( crontab(hour="9", minute="0", day_of_week="mon"), - send_all_periodic_digest_reports.s(end_date=weekly_digest_end_date.isoformat()), + send_all_periodic_digest_reports.s(), name="send all weekly digest reports", ) From 27183e9e4052b4aa7fce4cc78b06cf8984b08041 Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Tue, 10 Dec 2024 09:15:08 -0800 Subject: [PATCH 11/12] fix timezone and add prop for number of items with values --- posthog/tasks/periodic_digest.py | 14 +++++++++---- posthog/tasks/test/test_periodic_digest.py | 24 +++++++++++++--------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/posthog/tasks/periodic_digest.py b/posthog/tasks/periodic_digest.py index 9b03f9ee755c6..e7dace4dfcadf 100644 --- a/posthog/tasks/periodic_digest.py +++ b/posthog/tasks/periodic_digest.py @@ -107,8 +107,8 @@ def convert_team_digest_items_to_dict(items: QuerySet) -> dict[int, QuerySet]: return {team_id: items.filter(team_id=team_id) for team_id in items.values_list("team_id", flat=True).distinct()} -def has_non_zero_digest(report: periodicDigestReport) -> bool: - return any(len(getattr(report, key)) > 0 for key in report.__dataclass_fields__) +def count_non_zero_digest_items(report: periodicDigestReport) -> int: + return sum(1 for key in report.__dataclass_fields__ if len(getattr(report, key)) > 0) def _get_all_digest_data_as_team_rows(period_start: datetime, period_end: datetime) -> dict[str, Any]: @@ -192,6 +192,7 @@ def send_periodic_digest_report( instance_metadata: dict[str, Any], period_end: datetime, period_start: datetime, + digest_items_with_data: int, ) -> None: period_str = period_end.strftime("%Y-%m-%d") days = (period_end - period_start).days @@ -211,6 +212,7 @@ def send_periodic_digest_report( "team_id": team_id, "team_name": team_name, "template": "periodic_digest_report", + "digest_items_with_data": digest_items_with_data, **periodic_digest_report, **instance_metadata, } @@ -234,7 +236,9 @@ def send_all_periodic_digest_reports( begin_date: Optional[str] = None, ) -> None: period_end = ( - parser.parse(end_date) if end_date else datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + parser.parse(end_date) + if end_date + else datetime.now(tz=timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) ) period_start = parser.parse(begin_date) if begin_date else period_end - timedelta(days=7) @@ -246,9 +250,10 @@ def send_all_periodic_digest_reports( report = get_periodic_digest_report(all_digest_data, team) full_report_dict = dataclasses.asdict(report) instance_metadata = dataclasses.asdict(get_instance_metadata((period_start, period_end))) + digest_items_with_data = count_non_zero_digest_items(report) # Then capture as events to PostHog, so they can be sent via email - if has_non_zero_digest(report) and not dry_run: + if digest_items_with_data > 0 and not dry_run: send_periodic_digest_report.delay( team_id=team.id, team_name=team.name, @@ -256,6 +261,7 @@ def send_all_periodic_digest_reports( instance_metadata=instance_metadata, period_end=period_end, period_start=period_start, + digest_items_with_data=digest_items_with_data, ) time_since = datetime.now() - time_now logger.debug(f"Sending usage reports to PostHog and Billing took {time_since.total_seconds()} seconds.") # noqa T201 diff --git a/posthog/tasks/test/test_periodic_digest.py b/posthog/tasks/test/test_periodic_digest.py index 01317904cd9c6..4d495eac6aa8d 100644 --- a/posthog/tasks/test/test_periodic_digest.py +++ b/posthog/tasks/test/test_periodic_digest.py @@ -119,8 +119,8 @@ def test_periodic_digest_report(self, mock_capture: MagicMock) -> None: "users_who_signed_up": [], "users_who_signed_up_count": 0, "period": { - "end_inclusive": "2024-01-20T00:01:00", - "start_inclusive": "2024-01-13T00:01:00", + "end_inclusive": "2024-01-20T00:00:00+00:00", + "start_inclusive": "2024-01-13T00:00:00+00:00", }, "plugins_enabled": {}, "plugins_installed": {}, @@ -186,6 +186,7 @@ def test_periodic_digest_report(self, mock_capture: MagicMock) -> None: "key": "test-flag", } ], + "digest_items_with_data": 8, } mock_capture.delay.assert_called_once_with( @@ -258,6 +259,7 @@ def test_periodic_digest_report_custom_dates(self, mock_capture: MagicMock) -> N "new_external_data_sources": [], "new_surveys_launched": [], "new_feature_flags": [], + "digest_items_with_data": 1, } mock_capture.delay.assert_called_once_with( @@ -271,10 +273,11 @@ def test_periodic_digest_report_custom_dates(self, mock_capture: MagicMock) -> N @patch("posthog.tasks.periodic_digest.capture_report") def test_periodic_digest_report_idempotency(self, mock_capture: MagicMock) -> None: # Create test data - Dashboard.objects.create( - team=self.team, - name="Test Dashboard", - ) + with freeze_time("2024-01-15T00:01:00Z"): + Dashboard.objects.create( + team=self.team, + name="Test Dashboard", + ) # First run - should send the digest send_all_periodic_digest_reports() @@ -300,10 +303,11 @@ def test_periodic_digest_report_idempotency(self, mock_capture: MagicMock) -> No @patch("posthog.tasks.periodic_digest.capture_report") def test_periodic_digest_different_periods(self, mock_capture: MagicMock) -> None: # Create test data - Dashboard.objects.create( - team=self.team, - name="Test Dashboard", - ) + with freeze_time("2024-01-15T00:01:00Z"): + Dashboard.objects.create( + team=self.team, + name="Test Dashboard", + ) # Send weekly digest send_all_periodic_digest_reports() From 256edc13f7892006079f37a004f8fea0d0d6116c Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Wed, 11 Dec 2024 14:27:08 -0800 Subject: [PATCH 12/12] fix timezone --- posthog/tasks/periodic_digest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/posthog/tasks/periodic_digest.py b/posthog/tasks/periodic_digest.py index e7dace4dfcadf..c0b6995b1e91f 100644 --- a/posthog/tasks/periodic_digest.py +++ b/posthog/tasks/periodic_digest.py @@ -1,6 +1,7 @@ import dataclasses from datetime import datetime, timedelta from typing import Any, Optional +from zoneinfo import ZoneInfo import structlog from celery import shared_task @@ -238,7 +239,7 @@ def send_all_periodic_digest_reports( period_end = ( parser.parse(end_date) if end_date - else datetime.now(tz=timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) + else datetime.now(tz=ZoneInfo("UTC")).replace(hour=0, minute=0, second=0, microsecond=0) ) period_start = parser.parse(begin_date) if begin_date else period_end - timedelta(days=7)