From 2b188bb61b405264a0015c71574c74dd79bfc3cd Mon Sep 17 00:00:00 2001 From: Ross Gray Date: Wed, 18 Dec 2024 13:04:55 +0000 Subject: [PATCH 1/5] WIP --- posthog/batch_exports/service.py | 24 ++++ posthog/temporal/batch_exports/monitoring.py | 103 +++++++++++++++++- .../tests/batch_exports/test_monitoring.py | 3 + 3 files changed, 129 insertions(+), 1 deletion(-) diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index d17bb3b1b69c3..c7e47003a4e5b 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -810,3 +810,27 @@ async def aupdate_records_total_count( data_interval_end=interval_end, ).aupdate(records_total_count=count) return rows_updated + + +async def afetch_batch_export_runs_in_range( + batch_export_id: UUID, + interval_start: dt.datetime, + interval_end: dt.datetime, +) -> list[BatchExportRun]: + """Async fetch all BatchExportRuns for a given batch export within a time interval. + + Arguments: + batch_export_id: The UUID of the BatchExport to fetch runs for. + interval_start: The start of the time interval to fetch runs from. + interval_end: The end of the time interval to fetch runs until. + + Returns: + A list of BatchExportRun objects within the given interval, ordered by data_interval_start. + """ + queryset = BatchExportRun.objects.filter( + batch_export_id=batch_export_id, + data_interval_start__gte=interval_start, + data_interval_end__lte=interval_end, + ).order_by("data_interval_start") + + return [run async for run in queryset] diff --git a/posthog/temporal/batch_exports/monitoring.py b/posthog/temporal/batch_exports/monitoring.py index 97eaf6c2430d9..37d06dde34120 100644 --- a/posthog/temporal/batch_exports/monitoring.py +++ b/posthog/temporal/batch_exports/monitoring.py @@ -3,11 +3,15 @@ from dataclasses import dataclass from uuid import UUID +from slack_sdk.web import WebClient from temporalio import activity, workflow from temporalio.common import RetryPolicy from posthog.batch_exports.models import BatchExport -from posthog.batch_exports.service import aupdate_records_total_count +from posthog.batch_exports.service import ( + afetch_batch_export_runs_in_range, + aupdate_records_total_count, +) from posthog.batch_exports.sql import EVENT_COUNT_BY_INTERVAL from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.common.clickhouse import get_client @@ -34,9 +38,11 @@ class BatchExportMonitoringInputs: Attributes: batch_export_id: The batch export id to monitor. + slack_channel: The Slack channel to send alerts to. """ batch_export_id: UUID + slack_channel: str | None = None @dataclass @@ -161,6 +167,87 @@ async def update_batch_export_runs(inputs: UpdateBatchExportRunsInputs) -> int: return total_rows_updated +@dataclass +class CheckForMissingBatchExportRunsInputs: + """Inputs for checking missing batch export runs""" + + batch_export_id: UUID + overall_interval_start: str + overall_interval_end: str + interval: str + slack_channel: str | None = None + + +def _send_slack_alert_for_missing_batch_export_runs( + channel: str, batch_export_id: UUID, missing_runs: list[tuple[dt.datetime, dt.datetime]] +): + client = WebClient(token="TODO") + + # Format message for Slack + message = f":warning: Found {len(missing_runs)} missing batch export runs for batch export {batch_export_id}:\n" + for start, end in missing_runs: + message += f"• Run {start.strftime('%Y-%m-%d %H:%M:%S')} to {end.strftime('%Y-%m-%d %H:%M:%S')} \n" + + try: + client.chat_postMessage(channel=channel, text=message) + except Exception as e: + activity.logger.error(f"Failed to post to Slack: {str(e)}") + raise + + +@activity.defn +async def check_for_missing_batch_export_runs(inputs: CheckForMissingBatchExportRunsInputs) -> int: + """Check for missing batch export runs and alert to Slack if any are found. + + Returns: + The number of missing batch export runs found. + """ + async with Heartbeater(): + interval_start = dt.datetime.strptime(inputs.overall_interval_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC) + interval_end = dt.datetime.strptime(inputs.overall_interval_end, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC) + # Get all runs in the interval + runs = await afetch_batch_export_runs_in_range( + batch_export_id=inputs.batch_export_id, + interval_start=interval_start, + interval_end=interval_end, + ) + + # for simplicity, we assume that the interval is 5 minutes, as this is the only interval supported for monitoring at this time + if inputs.interval != "every 5 minutes": + raise NoValidBatchExportsFoundError( + "Only intervals of 'every 5 minutes' are supported for monitoring at this time." + ) + expected_run_intervals: list[tuple[dt.datetime, dt.datetime]] = [] + current_run_start_interval = interval_start + while current_run_start_interval < interval_end: + expected_run_intervals.append( + (current_run_start_interval, current_run_start_interval + dt.timedelta(minutes=5)) + ) + current_run_start_interval += dt.timedelta(minutes=5) + + missing_runs: list[tuple[dt.datetime, dt.datetime]] = [] + for start, end in expected_run_intervals: + if start not in [run.data_interval_start for run in runs]: + missing_runs.append((start, end)) + + if missing_runs: + activity.logger.info( + f"Found {len(missing_runs)} missing batch export runs for batch export {inputs.batch_export_id}" + ) + + if inputs.slack_channel is None: + activity.logger.warning("No Slack channel provided, skipping alert") + return len(missing_runs) + + _send_slack_alert_for_missing_batch_export_runs( + channel=inputs.slack_channel, + batch_export_id=inputs.batch_export_id, + missing_runs=missing_runs, + ) + + return len(missing_runs) + + @workflow.defn(name="batch-export-monitoring") class BatchExportMonitoringWorkflow(PostHogWorkflow): """Workflow to monitor batch exports. @@ -218,6 +305,20 @@ async def run(self, inputs: BatchExportMonitoringInputs): heartbeat_timeout=dt.timedelta(minutes=1), ) + await workflow.execute_activity( + check_for_missing_batch_export_runs, + CheckForMissingBatchExportRunsInputs( + batch_export_id=batch_export_details.id, + overall_interval_start=interval_start_str, + overall_interval_end=interval_end_str, + interval=batch_export_details.interval, + slack_channel=inputs.slack_channel, + ), + start_to_close_timeout=dt.timedelta(minutes=10), + retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=dt.timedelta(seconds=20)), + heartbeat_timeout=dt.timedelta(minutes=1), + ) + return await workflow.execute_activity( update_batch_export_runs, UpdateBatchExportRunsInputs(batch_export_id=batch_export_details.id, results=total_events.results), diff --git a/posthog/temporal/tests/batch_exports/test_monitoring.py b/posthog/temporal/tests/batch_exports/test_monitoring.py index cab50c25d3177..a0c9af33e94e1 100644 --- a/posthog/temporal/tests/batch_exports/test_monitoring.py +++ b/posthog/temporal/tests/batch_exports/test_monitoring.py @@ -12,6 +12,7 @@ from posthog.temporal.batch_exports.monitoring import ( BatchExportMonitoringInputs, BatchExportMonitoringWorkflow, + check_for_missing_batch_export_runs, get_batch_export, get_event_counts, update_batch_export_runs, @@ -118,6 +119,7 @@ async def test_monitoring_workflow_when_no_event_data(batch_export): activities=[ get_batch_export, get_event_counts, + check_for_missing_batch_export_runs, update_batch_export_runs, ], workflow_runner=UnsandboxedWorkflowRunner(), @@ -177,6 +179,7 @@ async def test_monitoring_workflow( activities=[ get_batch_export, get_event_counts, + check_for_missing_batch_export_runs, update_batch_export_runs, ], workflow_runner=UnsandboxedWorkflowRunner(), From 6b4d6e6ccd89b0fc474afbdce9c1b61aeb11ebe1 Mon Sep 17 00:00:00 2001 From: Ross Gray Date: Wed, 18 Dec 2024 13:42:06 +0000 Subject: [PATCH 2/5] Remove Slack alert and just log a warning --- posthog/temporal/batch_exports/monitoring.py | 50 +++------- .../tests/batch_exports/test_monitoring.py | 93 +++++++++++++------ 2 files changed, 78 insertions(+), 65 deletions(-) diff --git a/posthog/temporal/batch_exports/monitoring.py b/posthog/temporal/batch_exports/monitoring.py index 37d06dde34120..23abf65faa7d2 100644 --- a/posthog/temporal/batch_exports/monitoring.py +++ b/posthog/temporal/batch_exports/monitoring.py @@ -3,7 +3,6 @@ from dataclasses import dataclass from uuid import UUID -from slack_sdk.web import WebClient from temporalio import activity, workflow from temporalio.common import RetryPolicy @@ -38,11 +37,9 @@ class BatchExportMonitoringInputs: Attributes: batch_export_id: The batch export id to monitor. - slack_channel: The Slack channel to send alerts to. """ batch_export_id: UUID - slack_channel: str | None = None @dataclass @@ -175,29 +172,22 @@ class CheckForMissingBatchExportRunsInputs: overall_interval_start: str overall_interval_end: str interval: str - slack_channel: str | None = None -def _send_slack_alert_for_missing_batch_export_runs( - channel: str, batch_export_id: UUID, missing_runs: list[tuple[dt.datetime, dt.datetime]] +def _log_warning_for_missing_batch_export_runs( + batch_export_id: UUID, missing_runs: list[tuple[dt.datetime, dt.datetime]] ): - client = WebClient(token="TODO") - - # Format message for Slack - message = f":warning: Found {len(missing_runs)} missing batch export runs for batch export {batch_export_id}:\n" + message = f"Batch Exports Monitoring: Found {len(missing_runs)} missing runs for batch export {batch_export_id}:\n" for start, end in missing_runs: - message += f"• Run {start.strftime('%Y-%m-%d %H:%M:%S')} to {end.strftime('%Y-%m-%d %H:%M:%S')} \n" + message += f"- Run {start.strftime('%Y-%m-%d %H:%M:%S')} to {end.strftime('%Y-%m-%d %H:%M:%S')} \n" - try: - client.chat_postMessage(channel=channel, text=message) - except Exception as e: - activity.logger.error(f"Failed to post to Slack: {str(e)}") - raise + activity.logger.warning(message) @activity.defn async def check_for_missing_batch_export_runs(inputs: CheckForMissingBatchExportRunsInputs) -> int: - """Check for missing batch export runs and alert to Slack if any are found. + """Check for missing batch export runs and log a warning if any are found. + (We can then alert based on these log entries) Returns: The number of missing batch export runs found. @@ -231,19 +221,7 @@ async def check_for_missing_batch_export_runs(inputs: CheckForMissingBatchExport missing_runs.append((start, end)) if missing_runs: - activity.logger.info( - f"Found {len(missing_runs)} missing batch export runs for batch export {inputs.batch_export_id}" - ) - - if inputs.slack_channel is None: - activity.logger.warning("No Slack channel provided, skipping alert") - return len(missing_runs) - - _send_slack_alert_for_missing_batch_export_runs( - channel=inputs.slack_channel, - batch_export_id=inputs.batch_export_id, - missing_runs=missing_runs, - ) + _log_warning_for_missing_batch_export_runs(inputs.batch_export_id, missing_runs) return len(missing_runs) @@ -254,8 +232,12 @@ class BatchExportMonitoringWorkflow(PostHogWorkflow): We have had some issues with batch exports in the past, where some events have been missing. The purpose of this workflow is to monitor the status of - batch exports for a given customer by reconciling the number of exported - events with the number of events in ClickHouse for a given interval. + a given batch export by: + 1. Checking for missing batch export runs (we've had an incident in the past + where Temporal has not scheduled a workflow for a particular time interval + for some reason). + 2. Reconciling the number of exported events with the number of events in + ClickHouse for a given interval. """ @staticmethod @@ -266,8 +248,7 @@ def parse_inputs(inputs: list[str]) -> BatchExportMonitoringInputs: @workflow.run async def run(self, inputs: BatchExportMonitoringInputs): - """Workflow implementation to monitor batch exports for a given team.""" - # TODO - check if this is the right way to do logging since there seems to be a few different ways + """Workflow implementation to monitor a given batch export.""" workflow.logger.info( "Starting batch exports monitoring workflow for batch export id %s", inputs.batch_export_id ) @@ -312,7 +293,6 @@ async def run(self, inputs: BatchExportMonitoringInputs): overall_interval_start=interval_start_str, overall_interval_end=interval_end_str, interval=batch_export_details.interval, - slack_channel=inputs.slack_channel, ), start_to_close_timeout=dt.timedelta(minutes=10), retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=dt.timedelta(seconds=20)), diff --git a/posthog/temporal/tests/batch_exports/test_monitoring.py b/posthog/temporal/tests/batch_exports/test_monitoring.py index a0c9af33e94e1..7273484f1fbaa 100644 --- a/posthog/temporal/tests/batch_exports/test_monitoring.py +++ b/posthog/temporal/tests/batch_exports/test_monitoring.py @@ -1,5 +1,6 @@ import datetime as dt import uuid +from unittest.mock import patch import pytest import pytest_asyncio @@ -9,6 +10,7 @@ from posthog import constants from posthog.batch_exports.models import BatchExportRun +from posthog.batch_exports.service import afetch_batch_export_runs_in_range from posthog.temporal.batch_exports.monitoring import ( BatchExportMonitoringInputs, BatchExportMonitoringWorkflow, @@ -150,7 +152,12 @@ async def test_monitoring_workflow_when_no_event_data(batch_export): ["every 5 minutes"], indirect=True, ) +@pytest.mark.parametrize( + "simulate_missing_batch_export_runs", + [True, False], +) async def test_monitoring_workflow( + simulate_missing_batch_export_runs, batch_export, generate_test_data, data_interval_start, @@ -168,37 +175,63 @@ async def test_monitoring_workflow( generated and assert that the expected records count matches the records completed. """ + + expected_missing_runs: list[tuple[dt.datetime, dt.datetime]] = [] + if simulate_missing_batch_export_runs: + # simulate a missing batch export run by deleting the batch export run for the first 5 minutes + runs: list[BatchExportRun] = await afetch_batch_export_runs_in_range( + batch_export_id=batch_export.id, + interval_start=data_interval_start, + interval_end=data_interval_start + dt.timedelta(minutes=5), + ) + assert len(runs) == 1 + for run in runs: + assert run.data_interval_start is not None + expected_missing_runs.append((run.data_interval_start, run.data_interval_end)) + await run.adelete() + workflow_id = str(uuid.uuid4()) inputs = BatchExportMonitoringInputs(batch_export_id=batch_export.id) - async with await WorkflowEnvironment.start_time_skipping() as activity_environment: - async with Worker( - activity_environment.client, - # TODO - not sure if this is the right task queue - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, - workflows=[BatchExportMonitoringWorkflow], - activities=[ - get_batch_export, - get_event_counts, - check_for_missing_batch_export_runs, - update_batch_export_runs, - ], - workflow_runner=UnsandboxedWorkflowRunner(), - ): - await activity_environment.client.execute_workflow( - BatchExportMonitoringWorkflow.run, - inputs, - id=workflow_id, + with patch( + "posthog.temporal.batch_exports.monitoring._log_warning_for_missing_batch_export_runs" + ) as mock_log_warning: + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + # TODO - not sure if this is the right task queue task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, - retry_policy=RetryPolicy(maximum_attempts=1), - execution_timeout=dt.timedelta(seconds=30), - ) - - batch_export_runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) - - for run in batch_export_runs: - if run.records_completed == 0: - # TODO: in the actual monitoring activity it would be better to - # update the actual count to 0 rather than None - assert run.records_total_count is None + workflows=[BatchExportMonitoringWorkflow], + activities=[ + get_batch_export, + get_event_counts, + check_for_missing_batch_export_runs, + update_batch_export_runs, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + await activity_environment.client.execute_workflow( + BatchExportMonitoringWorkflow.run, + inputs, + id=workflow_id, + task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=30), + ) + + if simulate_missing_batch_export_runs: + # check that the warning was logged + mock_log_warning.assert_called_once_with(batch_export.id, expected_missing_runs) else: - assert run.records_completed == run.records_total_count + # check that the warning was not logged + mock_log_warning.assert_not_called() + + # check that the batch export runs were updated correctly + batch_export_runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + + for run in batch_export_runs: + if run.records_completed == 0: + # TODO: in the actual monitoring activity it would be better to + # update the actual count to 0 rather than None + assert run.records_total_count is None + else: + assert run.records_completed == run.records_total_count From 66c00da6d500853e61b8081d504eed94cbf7e417 Mon Sep 17 00:00:00 2001 From: Ross Gray Date: Wed, 18 Dec 2024 13:50:49 +0000 Subject: [PATCH 3/5] Add extra test --- posthog/temporal/batch_exports/monitoring.py | 6 ++++-- .../tests/batch_exports/test_monitoring.py | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/posthog/temporal/batch_exports/monitoring.py b/posthog/temporal/batch_exports/monitoring.py index 23abf65faa7d2..c41d1076ed1e0 100644 --- a/posthog/temporal/batch_exports/monitoring.py +++ b/posthog/temporal/batch_exports/monitoring.py @@ -177,9 +177,11 @@ class CheckForMissingBatchExportRunsInputs: def _log_warning_for_missing_batch_export_runs( batch_export_id: UUID, missing_runs: list[tuple[dt.datetime, dt.datetime]] ): - message = f"Batch Exports Monitoring: Found {len(missing_runs)} missing runs for batch export {batch_export_id}:\n" + message = ( + f"Batch Exports Monitoring: Found {len(missing_runs)} missing run(s) for batch export {batch_export_id}:\n" + ) for start, end in missing_runs: - message += f"- Run {start.strftime('%Y-%m-%d %H:%M:%S')} to {end.strftime('%Y-%m-%d %H:%M:%S')} \n" + message += f"- Run {start.strftime('%Y-%m-%d %H:%M:%S')} to {end.strftime('%Y-%m-%d %H:%M:%S')}\n" activity.logger.warning(message) diff --git a/posthog/temporal/tests/batch_exports/test_monitoring.py b/posthog/temporal/tests/batch_exports/test_monitoring.py index 7273484f1fbaa..1c0b0dc01fd13 100644 --- a/posthog/temporal/tests/batch_exports/test_monitoring.py +++ b/posthog/temporal/tests/batch_exports/test_monitoring.py @@ -14,6 +14,7 @@ from posthog.temporal.batch_exports.monitoring import ( BatchExportMonitoringInputs, BatchExportMonitoringWorkflow, + _log_warning_for_missing_batch_export_runs, check_for_missing_batch_export_runs, get_batch_export, get_event_counts, @@ -235,3 +236,18 @@ async def test_monitoring_workflow( assert run.records_total_count is None else: assert run.records_completed == run.records_total_count + + +def test_log_warning_for_missing_batch_export_runs(): + missing_runs = [ + (dt.datetime(2024, 1, 1, 10, 0), dt.datetime(2024, 1, 1, 10, 5)), + (dt.datetime(2024, 1, 1, 10, 5), dt.datetime(2024, 1, 1, 10, 10)), + ] + with patch("posthog.temporal.batch_exports.monitoring.activity") as mock_activity: + batch_export_id = uuid.uuid4() + _log_warning_for_missing_batch_export_runs(batch_export_id, missing_runs) + mock_activity.logger.warning.assert_called_once_with( + f"Batch Exports Monitoring: Found 2 missing run(s) for batch export {batch_export_id}:\n" + "- Run 2024-01-01 10:00:00 to 2024-01-01 10:05:00\n" + "- Run 2024-01-01 10:05:00 to 2024-01-01 10:10:00\n" + ) From 22a994fa2e9d0ab64abefcb850a8b20726dd93c7 Mon Sep 17 00:00:00 2001 From: Ross Gray Date: Wed, 18 Dec 2024 13:53:09 +0000 Subject: [PATCH 4/5] Remove TODO --- posthog/temporal/tests/batch_exports/test_monitoring.py | 1 - 1 file changed, 1 deletion(-) diff --git a/posthog/temporal/tests/batch_exports/test_monitoring.py b/posthog/temporal/tests/batch_exports/test_monitoring.py index 1c0b0dc01fd13..6732f78d6903b 100644 --- a/posthog/temporal/tests/batch_exports/test_monitoring.py +++ b/posthog/temporal/tests/batch_exports/test_monitoring.py @@ -199,7 +199,6 @@ async def test_monitoring_workflow( async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, - # TODO - not sure if this is the right task queue task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, workflows=[BatchExportMonitoringWorkflow], activities=[ From e24b9b15a5f8343f6985f2320617ed7d2c03f882 Mon Sep 17 00:00:00 2001 From: Ross Gray Date: Wed, 18 Dec 2024 15:53:59 +0000 Subject: [PATCH 5/5] Update docstring --- posthog/temporal/tests/batch_exports/test_monitoring.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_monitoring.py b/posthog/temporal/tests/batch_exports/test_monitoring.py index 6732f78d6903b..3f84960f4504d 100644 --- a/posthog/temporal/tests/batch_exports/test_monitoring.py +++ b/posthog/temporal/tests/batch_exports/test_monitoring.py @@ -168,10 +168,6 @@ async def test_monitoring_workflow( ): """Test the monitoring workflow with a batch export that has data. - We generate 2 hours of data between 13:00 and 15:00, and then run the - monitoring workflow at 15:30. The monitoring workflow should check the data - between 14:00 and 15:00, and update the batch export runs. - We generate some dummy batch export runs based on the event data we generated and assert that the expected records count matches the records completed.