diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index d17bb3b1b69c3..c7e47003a4e5b 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -810,3 +810,27 @@ async def aupdate_records_total_count( data_interval_end=interval_end, ).aupdate(records_total_count=count) return rows_updated + + +async def afetch_batch_export_runs_in_range( + batch_export_id: UUID, + interval_start: dt.datetime, + interval_end: dt.datetime, +) -> list[BatchExportRun]: + """Async fetch all BatchExportRuns for a given batch export within a time interval. + + Arguments: + batch_export_id: The UUID of the BatchExport to fetch runs for. + interval_start: The start of the time interval to fetch runs from. + interval_end: The end of the time interval to fetch runs until. + + Returns: + A list of BatchExportRun objects within the given interval, ordered by data_interval_start. + """ + queryset = BatchExportRun.objects.filter( + batch_export_id=batch_export_id, + data_interval_start__gte=interval_start, + data_interval_end__lte=interval_end, + ).order_by("data_interval_start") + + return [run async for run in queryset] diff --git a/posthog/temporal/batch_exports/monitoring.py b/posthog/temporal/batch_exports/monitoring.py index 97eaf6c2430d9..c41d1076ed1e0 100644 --- a/posthog/temporal/batch_exports/monitoring.py +++ b/posthog/temporal/batch_exports/monitoring.py @@ -7,7 +7,10 @@ from temporalio.common import RetryPolicy from posthog.batch_exports.models import BatchExport -from posthog.batch_exports.service import aupdate_records_total_count +from posthog.batch_exports.service import ( + afetch_batch_export_runs_in_range, + aupdate_records_total_count, +) from posthog.batch_exports.sql import EVENT_COUNT_BY_INTERVAL from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.common.clickhouse import get_client @@ -161,14 +164,82 @@ async def update_batch_export_runs(inputs: UpdateBatchExportRunsInputs) -> int: return total_rows_updated +@dataclass +class CheckForMissingBatchExportRunsInputs: + """Inputs for checking missing batch export runs""" + + batch_export_id: UUID + overall_interval_start: str + overall_interval_end: str + interval: str + + +def _log_warning_for_missing_batch_export_runs( + batch_export_id: UUID, missing_runs: list[tuple[dt.datetime, dt.datetime]] +): + message = ( + f"Batch Exports Monitoring: Found {len(missing_runs)} missing run(s) for batch export {batch_export_id}:\n" + ) + for start, end in missing_runs: + message += f"- Run {start.strftime('%Y-%m-%d %H:%M:%S')} to {end.strftime('%Y-%m-%d %H:%M:%S')}\n" + + activity.logger.warning(message) + + +@activity.defn +async def check_for_missing_batch_export_runs(inputs: CheckForMissingBatchExportRunsInputs) -> int: + """Check for missing batch export runs and log a warning if any are found. + (We can then alert based on these log entries) + + Returns: + The number of missing batch export runs found. + """ + async with Heartbeater(): + interval_start = dt.datetime.strptime(inputs.overall_interval_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC) + interval_end = dt.datetime.strptime(inputs.overall_interval_end, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC) + # Get all runs in the interval + runs = await afetch_batch_export_runs_in_range( + batch_export_id=inputs.batch_export_id, + interval_start=interval_start, + interval_end=interval_end, + ) + + # for simplicity, we assume that the interval is 5 minutes, as this is the only interval supported for monitoring at this time + if inputs.interval != "every 5 minutes": + raise NoValidBatchExportsFoundError( + "Only intervals of 'every 5 minutes' are supported for monitoring at this time." + ) + expected_run_intervals: list[tuple[dt.datetime, dt.datetime]] = [] + current_run_start_interval = interval_start + while current_run_start_interval < interval_end: + expected_run_intervals.append( + (current_run_start_interval, current_run_start_interval + dt.timedelta(minutes=5)) + ) + current_run_start_interval += dt.timedelta(minutes=5) + + missing_runs: list[tuple[dt.datetime, dt.datetime]] = [] + for start, end in expected_run_intervals: + if start not in [run.data_interval_start for run in runs]: + missing_runs.append((start, end)) + + if missing_runs: + _log_warning_for_missing_batch_export_runs(inputs.batch_export_id, missing_runs) + + return len(missing_runs) + + @workflow.defn(name="batch-export-monitoring") class BatchExportMonitoringWorkflow(PostHogWorkflow): """Workflow to monitor batch exports. We have had some issues with batch exports in the past, where some events have been missing. The purpose of this workflow is to monitor the status of - batch exports for a given customer by reconciling the number of exported - events with the number of events in ClickHouse for a given interval. + a given batch export by: + 1. Checking for missing batch export runs (we've had an incident in the past + where Temporal has not scheduled a workflow for a particular time interval + for some reason). + 2. Reconciling the number of exported events with the number of events in + ClickHouse for a given interval. """ @staticmethod @@ -179,8 +250,7 @@ def parse_inputs(inputs: list[str]) -> BatchExportMonitoringInputs: @workflow.run async def run(self, inputs: BatchExportMonitoringInputs): - """Workflow implementation to monitor batch exports for a given team.""" - # TODO - check if this is the right way to do logging since there seems to be a few different ways + """Workflow implementation to monitor a given batch export.""" workflow.logger.info( "Starting batch exports monitoring workflow for batch export id %s", inputs.batch_export_id ) @@ -218,6 +288,19 @@ async def run(self, inputs: BatchExportMonitoringInputs): heartbeat_timeout=dt.timedelta(minutes=1), ) + await workflow.execute_activity( + check_for_missing_batch_export_runs, + CheckForMissingBatchExportRunsInputs( + batch_export_id=batch_export_details.id, + overall_interval_start=interval_start_str, + overall_interval_end=interval_end_str, + interval=batch_export_details.interval, + ), + start_to_close_timeout=dt.timedelta(minutes=10), + retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=dt.timedelta(seconds=20)), + heartbeat_timeout=dt.timedelta(minutes=1), + ) + return await workflow.execute_activity( update_batch_export_runs, UpdateBatchExportRunsInputs(batch_export_id=batch_export_details.id, results=total_events.results), diff --git a/posthog/temporal/tests/batch_exports/test_monitoring.py b/posthog/temporal/tests/batch_exports/test_monitoring.py index cab50c25d3177..3f84960f4504d 100644 --- a/posthog/temporal/tests/batch_exports/test_monitoring.py +++ b/posthog/temporal/tests/batch_exports/test_monitoring.py @@ -1,5 +1,6 @@ import datetime as dt import uuid +from unittest.mock import patch import pytest import pytest_asyncio @@ -9,9 +10,12 @@ from posthog import constants from posthog.batch_exports.models import BatchExportRun +from posthog.batch_exports.service import afetch_batch_export_runs_in_range from posthog.temporal.batch_exports.monitoring import ( BatchExportMonitoringInputs, BatchExportMonitoringWorkflow, + _log_warning_for_missing_batch_export_runs, + check_for_missing_batch_export_runs, get_batch_export, get_event_counts, update_batch_export_runs, @@ -118,6 +122,7 @@ async def test_monitoring_workflow_when_no_event_data(batch_export): activities=[ get_batch_export, get_event_counts, + check_for_missing_batch_export_runs, update_batch_export_runs, ], workflow_runner=UnsandboxedWorkflowRunner(), @@ -148,7 +153,12 @@ async def test_monitoring_workflow_when_no_event_data(batch_export): ["every 5 minutes"], indirect=True, ) +@pytest.mark.parametrize( + "simulate_missing_batch_export_runs", + [True, False], +) async def test_monitoring_workflow( + simulate_missing_batch_export_runs, batch_export, generate_test_data, data_interval_start, @@ -158,44 +168,81 @@ async def test_monitoring_workflow( ): """Test the monitoring workflow with a batch export that has data. - We generate 2 hours of data between 13:00 and 15:00, and then run the - monitoring workflow at 15:30. The monitoring workflow should check the data - between 14:00 and 15:00, and update the batch export runs. - We generate some dummy batch export runs based on the event data we generated and assert that the expected records count matches the records completed. """ + + expected_missing_runs: list[tuple[dt.datetime, dt.datetime]] = [] + if simulate_missing_batch_export_runs: + # simulate a missing batch export run by deleting the batch export run for the first 5 minutes + runs: list[BatchExportRun] = await afetch_batch_export_runs_in_range( + batch_export_id=batch_export.id, + interval_start=data_interval_start, + interval_end=data_interval_start + dt.timedelta(minutes=5), + ) + assert len(runs) == 1 + for run in runs: + assert run.data_interval_start is not None + expected_missing_runs.append((run.data_interval_start, run.data_interval_end)) + await run.adelete() + workflow_id = str(uuid.uuid4()) inputs = BatchExportMonitoringInputs(batch_export_id=batch_export.id) - async with await WorkflowEnvironment.start_time_skipping() as activity_environment: - async with Worker( - activity_environment.client, - # TODO - not sure if this is the right task queue - task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, - workflows=[BatchExportMonitoringWorkflow], - activities=[ - get_batch_export, - get_event_counts, - update_batch_export_runs, - ], - workflow_runner=UnsandboxedWorkflowRunner(), - ): - await activity_environment.client.execute_workflow( - BatchExportMonitoringWorkflow.run, - inputs, - id=workflow_id, + with patch( + "posthog.temporal.batch_exports.monitoring._log_warning_for_missing_batch_export_runs" + ) as mock_log_warning: + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, - retry_policy=RetryPolicy(maximum_attempts=1), - execution_timeout=dt.timedelta(seconds=30), - ) - - batch_export_runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) - - for run in batch_export_runs: - if run.records_completed == 0: - # TODO: in the actual monitoring activity it would be better to - # update the actual count to 0 rather than None - assert run.records_total_count is None + workflows=[BatchExportMonitoringWorkflow], + activities=[ + get_batch_export, + get_event_counts, + check_for_missing_batch_export_runs, + update_batch_export_runs, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + await activity_environment.client.execute_workflow( + BatchExportMonitoringWorkflow.run, + inputs, + id=workflow_id, + task_queue=constants.BATCH_EXPORTS_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=30), + ) + + if simulate_missing_batch_export_runs: + # check that the warning was logged + mock_log_warning.assert_called_once_with(batch_export.id, expected_missing_runs) else: - assert run.records_completed == run.records_total_count + # check that the warning was not logged + mock_log_warning.assert_not_called() + + # check that the batch export runs were updated correctly + batch_export_runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + + for run in batch_export_runs: + if run.records_completed == 0: + # TODO: in the actual monitoring activity it would be better to + # update the actual count to 0 rather than None + assert run.records_total_count is None + else: + assert run.records_completed == run.records_total_count + + +def test_log_warning_for_missing_batch_export_runs(): + missing_runs = [ + (dt.datetime(2024, 1, 1, 10, 0), dt.datetime(2024, 1, 1, 10, 5)), + (dt.datetime(2024, 1, 1, 10, 5), dt.datetime(2024, 1, 1, 10, 10)), + ] + with patch("posthog.temporal.batch_exports.monitoring.activity") as mock_activity: + batch_export_id = uuid.uuid4() + _log_warning_for_missing_batch_export_runs(batch_export_id, missing_runs) + mock_activity.logger.warning.assert_called_once_with( + f"Batch Exports Monitoring: Found 2 missing run(s) for batch export {batch_export_id}:\n" + "- Run 2024-01-01 10:00:00 to 2024-01-01 10:05:00\n" + "- Run 2024-01-01 10:05:00 to 2024-01-01 10:10:00\n" + )