From b8b9acbffc55e6bd6bf8d63694265493864429b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 17 Apr 2024 10:44:17 +0200 Subject: [PATCH] feat: Send email notifications on failure --- posthog/models/user.py | 3 +- posthog/tasks/email.py | 52 ++++++++++++++++ posthog/tasks/test/test_email.py | 61 +++++++++++++++++++ .../temporal/batch_exports/batch_exports.py | 8 ++- 4 files changed, 122 insertions(+), 2 deletions(-) diff --git a/posthog/models/user.py b/posthog/models/user.py index 8968c4c17675e..cb4b1063cc961 100644 --- a/posthog/models/user.py +++ b/posthog/models/user.py @@ -21,9 +21,10 @@ class Notifications(TypedDict, total=False): plugin_disabled: bool + batch_export_run_failure: bool -NOTIFICATION_DEFAULTS: Notifications = {"plugin_disabled": True} +NOTIFICATION_DEFAULTS: Notifications = {"plugin_disabled": True, "batch_export_run_failure": True} # We don't ned the following attributes in most cases, so we defer them by default DEFERED_ATTRS = ["requested_password_reset_at"] diff --git a/posthog/tasks/email.py b/posthog/tasks/email.py index d27c00d9ae85a..d5cfc04659e74 100644 --- a/posthog/tasks/email.py +++ b/posthog/tasks/email.py @@ -4,10 +4,12 @@ import posthoganalytics import structlog +from asgiref.sync import sync_to_async from celery import shared_task from django.conf import settings from django.utils import timezone +from posthog.batch_exports.models import BatchExportRun from posthog.cloud_utils import is_cloud from posthog.email import EMAIL_TASK_KWARGS, EmailMessage, is_email_available from posthog.models import ( @@ -157,6 +159,56 @@ def send_fatal_plugin_error( message.send(send_async=False) +@shared_task(**EMAIL_TASK_KWARGS) +async def send_batch_export_run_failure( + batch_export_run_id: int, +) -> None: + is_email_available_result = await sync_to_async(is_email_available)(with_absolute_urls=True) + if not is_email_available_result: + return + + batch_export_run: BatchExportRun = await sync_to_async( + BatchExportRun.objects.select_related("batch_export__team").get + )(id=batch_export_run_id) + team: Team = batch_export_run.batch_export.team + campaign_key: str = f"batch_export_run_email_batch_export_{batch_export_run.batch_export.id}_data_interval_end_{batch_export_run.data_interval_end}" + message = await sync_to_async(EmailMessage)( + campaign_key=campaign_key, + subject=f"PostHog: {batch_export_run.batch_export.name} batch export run failure", + template_name="batch_export_run_failure", + template_context={ + "time": batch_export_run.last_updated_at.strftime("%I:%M%p on %B %d"), + "team": team, + "id": batch_export_run.batch_export.id, + "name": batch_export_run.batch_export.name, + }, + ) + memberships_to_email = [] + memberships = OrganizationMembership.objects.select_related("user", "organization").filter( + organization_id=team.organization_id + ) + all_memberships = await sync_to_async(list)(memberships) + for membership in all_memberships: + has_notification_settings_enabled = await sync_to_async(membership.user.notification_settings.get)( + "batch_export_run_failure", True + ) + if has_notification_settings_enabled is False: + continue + team_permissions = UserPermissions(membership.user).team(team) + # Only send the email to users who have access to the affected project + # Those without access have `effective_membership_level` of `None` + if ( + team_permissions.effective_membership_level_for_parent_membership(membership.organization, membership) + is not None + ): + memberships_to_email.append(membership) + + if memberships_to_email: + for membership in memberships_to_email: + message.add_recipient(email=membership.user.email, name=membership.user.first_name) + await sync_to_async(message.send)(send_async=True) + + @shared_task(**EMAIL_TASK_KWARGS) def send_canary_email(user_email: str) -> None: message = EmailMessage( diff --git a/posthog/tasks/test/test_email.py b/posthog/tasks/test/test_email.py index 571132fd1ca84..447d0d442bfc8 100644 --- a/posthog/tasks/test/test_email.py +++ b/posthog/tasks/test/test_email.py @@ -1,10 +1,14 @@ +import datetime as dt from typing import Tuple from unittest.mock import MagicMock, patch +import pytest +from asgiref.sync import sync_to_async from freezegun import freeze_time from posthog.api.authentication import password_reset_token_generator from posthog.api.email_verification import email_verification_token_generator +from posthog.batch_exports.models import BatchExport, BatchExportDestination, BatchExportRun from posthog.models import Organization, Team, User from posthog.models.instance_setting import set_instance_setting from posthog.models.organization import OrganizationInvite, OrganizationMembership @@ -12,6 +16,7 @@ from posthog.tasks.email import ( send_async_migration_complete_email, send_async_migration_errored_email, + send_batch_export_run_failure, send_canary_email, send_email_verification, send_fatal_plugin_error, @@ -144,6 +149,62 @@ def test_send_fatal_plugin_error_with_settings(self, MockEmailMessage: MagicMock # should be sent to both assert len(mocked_email_messages[1].to) == 2 + @pytest.mark.asyncio + async def test_send_batch_export_run_failure(self, MockEmailMessage: MagicMock) -> None: + mocked_email_messages = mock_email_messages(MockEmailMessage) + _, user = await sync_to_async(create_org_team_and_user)("2022-01-02 00:00:00", "admin@posthog.com") + batch_export_destination = await sync_to_async(BatchExportDestination.objects.create)( + type=BatchExportDestination.Destination.S3, config={"bucket_name": "my_production_s3_bucket"} + ) + batch_export = await sync_to_async(BatchExport.objects.create)( + team=user.team, name="A batch export", destination=batch_export_destination + ) + now = dt.datetime.now() + batch_export_run = await sync_to_async(BatchExportRun.objects.create)( + batch_export=batch_export, + status=BatchExportRun.Status.FAILED, + data_interval_start=now - dt.timedelta(hours=1), + data_interval_end=now, + ) + + await send_batch_export_run_failure(batch_export_run.id) + + assert len(mocked_email_messages) == 1 + assert mocked_email_messages[0].send.call_count == 1 + assert mocked_email_messages[0].html_body + + @pytest.mark.asyncio + async def test_send_batch_export_run_failure_with_settings(self, MockEmailMessage: MagicMock) -> None: + mocked_email_messages = mock_email_messages(MockEmailMessage) + batch_export_destination = await sync_to_async(BatchExportDestination.objects.create)( + type=BatchExportDestination.Destination.S3, config={"bucket_name": "my_production_s3_bucket"} + ) + batch_export = await sync_to_async(BatchExport.objects.create)( + team=self.user.team, name="A batch export", destination=batch_export_destination + ) + now = dt.datetime.now() + batch_export_run = await sync_to_async(BatchExportRun.objects.create)( + batch_export=batch_export, + status=BatchExportRun.Status.FAILED, + data_interval_start=now - dt.timedelta(hours=1), + data_interval_end=now, + ) + + await sync_to_async(self._create_user)("test2@posthog.com") + self.user.partial_notification_settings = {"batch_export_run_failure": False} + await sync_to_async(self.user.save)() + + await send_batch_export_run_failure(batch_export_run.id) + # Should only be sent to user2 + assert mocked_email_messages[0].to == [{"recipient": "test2@posthog.com", "raw_email": "test2@posthog.com"}] + + self.user.partial_notification_settings = {"batch_export_run_failure": True} + await sync_to_async(self.user.save)() + + await send_batch_export_run_failure(batch_export_run.id) + # should be sent to both + assert len(mocked_email_messages[1].to) == 2 + def test_send_canary_email(self, MockEmailMessage: MagicMock) -> None: mocked_email_messages = mock_email_messages(MockEmailMessage) send_canary_email("test@posthog.com") diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 0e12fc14635b4..d4e9e73585fa8 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -405,7 +405,13 @@ async def finish_batch_export_run(inputs: FinishBatchExportRunInputs) -> None: ) if batch_export_run.status in (BatchExportRun.Status.FAILED, BatchExportRun.Status.FAILED_RETRYABLE): - logger.error("BatchExport failed with error: %s", batch_export_run.latest_error) + logger.error("Batch export failed with error: %s", batch_export_run.latest_error) + from posthog.tasks.email import send_batch_export_run_failure + + try: + await send_batch_export_run_failure(inputs.id) + except Exception: + logger.exception("Failure email notification could not be sent") elif batch_export_run.status == BatchExportRun.Status.CANCELLED: logger.warning("BatchExport was cancelled.")