Skip to content

Commit

Permalink
feat: Send email notifications on failure
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Apr 17, 2024
1 parent 60ddec2 commit b8b9acb
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 2 deletions.
3 changes: 2 additions & 1 deletion posthog/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

class Notifications(TypedDict, total=False):
plugin_disabled: bool
batch_export_run_failure: bool


NOTIFICATION_DEFAULTS: Notifications = {"plugin_disabled": True}
NOTIFICATION_DEFAULTS: Notifications = {"plugin_disabled": True, "batch_export_run_failure": True}

# We don't ned the following attributes in most cases, so we defer them by default
DEFERED_ATTRS = ["requested_password_reset_at"]
Expand Down
52 changes: 52 additions & 0 deletions posthog/tasks/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

import posthoganalytics
import structlog
from asgiref.sync import sync_to_async
from celery import shared_task
from django.conf import settings
from django.utils import timezone

from posthog.batch_exports.models import BatchExportRun
from posthog.cloud_utils import is_cloud
from posthog.email import EMAIL_TASK_KWARGS, EmailMessage, is_email_available
from posthog.models import (
Expand Down Expand Up @@ -157,6 +159,56 @@ def send_fatal_plugin_error(
message.send(send_async=False)


@shared_task(**EMAIL_TASK_KWARGS)
async def send_batch_export_run_failure(
batch_export_run_id: int,
) -> None:
is_email_available_result = await sync_to_async(is_email_available)(with_absolute_urls=True)
if not is_email_available_result:
return

batch_export_run: BatchExportRun = await sync_to_async(
BatchExportRun.objects.select_related("batch_export__team").get
)(id=batch_export_run_id)
team: Team = batch_export_run.batch_export.team
campaign_key: str = f"batch_export_run_email_batch_export_{batch_export_run.batch_export.id}_data_interval_end_{batch_export_run.data_interval_end}"
message = await sync_to_async(EmailMessage)(
campaign_key=campaign_key,
subject=f"PostHog: {batch_export_run.batch_export.name} batch export run failure",
template_name="batch_export_run_failure",
template_context={
"time": batch_export_run.last_updated_at.strftime("%I:%M%p on %B %d"),
"team": team,
"id": batch_export_run.batch_export.id,
"name": batch_export_run.batch_export.name,
},
)
memberships_to_email = []
memberships = OrganizationMembership.objects.select_related("user", "organization").filter(
organization_id=team.organization_id
)
all_memberships = await sync_to_async(list)(memberships)
for membership in all_memberships:
has_notification_settings_enabled = await sync_to_async(membership.user.notification_settings.get)(
"batch_export_run_failure", True
)
if has_notification_settings_enabled is False:
continue
team_permissions = UserPermissions(membership.user).team(team)
# Only send the email to users who have access to the affected project
# Those without access have `effective_membership_level` of `None`
if (
team_permissions.effective_membership_level_for_parent_membership(membership.organization, membership)
is not None
):
memberships_to_email.append(membership)

if memberships_to_email:
for membership in memberships_to_email:
message.add_recipient(email=membership.user.email, name=membership.user.first_name)
await sync_to_async(message.send)(send_async=True)


@shared_task(**EMAIL_TASK_KWARGS)
def send_canary_email(user_email: str) -> None:
message = EmailMessage(
Expand Down
61 changes: 61 additions & 0 deletions posthog/tasks/test/test_email.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import datetime as dt
from typing import Tuple
from unittest.mock import MagicMock, patch

import pytest
from asgiref.sync import sync_to_async
from freezegun import freeze_time

from posthog.api.authentication import password_reset_token_generator
from posthog.api.email_verification import email_verification_token_generator
from posthog.batch_exports.models import BatchExport, BatchExportDestination, BatchExportRun
from posthog.models import Organization, Team, User
from posthog.models.instance_setting import set_instance_setting
from posthog.models.organization import OrganizationInvite, OrganizationMembership
from posthog.models.plugin import Plugin, PluginConfig
from posthog.tasks.email import (
send_async_migration_complete_email,
send_async_migration_errored_email,
send_batch_export_run_failure,
send_canary_email,
send_email_verification,
send_fatal_plugin_error,
Expand Down Expand Up @@ -144,6 +149,62 @@ def test_send_fatal_plugin_error_with_settings(self, MockEmailMessage: MagicMock
# should be sent to both
assert len(mocked_email_messages[1].to) == 2

@pytest.mark.asyncio
async def test_send_batch_export_run_failure(self, MockEmailMessage: MagicMock) -> None:
mocked_email_messages = mock_email_messages(MockEmailMessage)
_, user = await sync_to_async(create_org_team_and_user)("2022-01-02 00:00:00", "[email protected]")
batch_export_destination = await sync_to_async(BatchExportDestination.objects.create)(
type=BatchExportDestination.Destination.S3, config={"bucket_name": "my_production_s3_bucket"}
)
batch_export = await sync_to_async(BatchExport.objects.create)(
team=user.team, name="A batch export", destination=batch_export_destination
)
now = dt.datetime.now()
batch_export_run = await sync_to_async(BatchExportRun.objects.create)(
batch_export=batch_export,
status=BatchExportRun.Status.FAILED,
data_interval_start=now - dt.timedelta(hours=1),
data_interval_end=now,
)

await send_batch_export_run_failure(batch_export_run.id)

assert len(mocked_email_messages) == 1
assert mocked_email_messages[0].send.call_count == 1
assert mocked_email_messages[0].html_body

@pytest.mark.asyncio
async def test_send_batch_export_run_failure_with_settings(self, MockEmailMessage: MagicMock) -> None:
mocked_email_messages = mock_email_messages(MockEmailMessage)
batch_export_destination = await sync_to_async(BatchExportDestination.objects.create)(
type=BatchExportDestination.Destination.S3, config={"bucket_name": "my_production_s3_bucket"}
)
batch_export = await sync_to_async(BatchExport.objects.create)(
team=self.user.team, name="A batch export", destination=batch_export_destination
)
now = dt.datetime.now()
batch_export_run = await sync_to_async(BatchExportRun.objects.create)(
batch_export=batch_export,
status=BatchExportRun.Status.FAILED,
data_interval_start=now - dt.timedelta(hours=1),
data_interval_end=now,
)

await sync_to_async(self._create_user)("[email protected]")
self.user.partial_notification_settings = {"batch_export_run_failure": False}
await sync_to_async(self.user.save)()

await send_batch_export_run_failure(batch_export_run.id)
# Should only be sent to user2
assert mocked_email_messages[0].to == [{"recipient": "[email protected]", "raw_email": "[email protected]"}]

self.user.partial_notification_settings = {"batch_export_run_failure": True}
await sync_to_async(self.user.save)()

await send_batch_export_run_failure(batch_export_run.id)
# should be sent to both
assert len(mocked_email_messages[1].to) == 2

def test_send_canary_email(self, MockEmailMessage: MagicMock) -> None:
mocked_email_messages = mock_email_messages(MockEmailMessage)
send_canary_email("[email protected]")
Expand Down
8 changes: 7 additions & 1 deletion posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,13 @@ async def finish_batch_export_run(inputs: FinishBatchExportRunInputs) -> None:
)

if batch_export_run.status in (BatchExportRun.Status.FAILED, BatchExportRun.Status.FAILED_RETRYABLE):
logger.error("BatchExport failed with error: %s", batch_export_run.latest_error)
logger.error("Batch export failed with error: %s", batch_export_run.latest_error)
from posthog.tasks.email import send_batch_export_run_failure

try:
await send_batch_export_run_failure(inputs.id)
except Exception:
logger.exception("Failure email notification could not be sent")

elif batch_export_run.status == BatchExportRun.Status.CANCELLED:
logger.warning("BatchExport was cancelled.")
Expand Down

0 comments on commit b8b9acb

Please sign in to comment.