Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(batch-exports): Improve batch export alerting by checking for missing runs #27013

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,3 +810,27 @@ async def aupdate_records_total_count(
data_interval_end=interval_end,
).aupdate(records_total_count=count)
return rows_updated


async def afetch_batch_export_runs_in_range(
batch_export_id: UUID,
interval_start: dt.datetime,
interval_end: dt.datetime,
) -> list[BatchExportRun]:
"""Async fetch all BatchExportRuns for a given batch export within a time interval.

Arguments:
batch_export_id: The UUID of the BatchExport to fetch runs for.
interval_start: The start of the time interval to fetch runs from.
interval_end: The end of the time interval to fetch runs until.

Returns:
A list of BatchExportRun objects within the given interval, ordered by data_interval_start.
"""
queryset = BatchExportRun.objects.filter(
batch_export_id=batch_export_id,
data_interval_start__gte=interval_start,
data_interval_end__lte=interval_end,
).order_by("data_interval_start")

return [run async for run in queryset]
93 changes: 88 additions & 5 deletions posthog/temporal/batch_exports/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from temporalio.common import RetryPolicy

from posthog.batch_exports.models import BatchExport
from posthog.batch_exports.service import aupdate_records_total_count
from posthog.batch_exports.service import (
afetch_batch_export_runs_in_range,
aupdate_records_total_count,
)
from posthog.batch_exports.sql import EVENT_COUNT_BY_INTERVAL
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.common.clickhouse import get_client
Expand Down Expand Up @@ -161,14 +164,82 @@ async def update_batch_export_runs(inputs: UpdateBatchExportRunsInputs) -> int:
return total_rows_updated


@dataclass
class CheckForMissingBatchExportRunsInputs:
"""Inputs for checking missing batch export runs"""

batch_export_id: UUID
overall_interval_start: str
overall_interval_end: str
interval: str


def _log_warning_for_missing_batch_export_runs(
batch_export_id: UUID, missing_runs: list[tuple[dt.datetime, dt.datetime]]
):
message = (
f"Batch Exports Monitoring: Found {len(missing_runs)} missing run(s) for batch export {batch_export_id}:\n"
)
for start, end in missing_runs:
message += f"- Run {start.strftime('%Y-%m-%d %H:%M:%S')} to {end.strftime('%Y-%m-%d %H:%M:%S')}\n"

activity.logger.warning(message)


@activity.defn
async def check_for_missing_batch_export_runs(inputs: CheckForMissingBatchExportRunsInputs) -> int:
"""Check for missing batch export runs and log a warning if any are found.
(We can then alert based on these log entries)

Returns:
The number of missing batch export runs found.
"""
async with Heartbeater():
interval_start = dt.datetime.strptime(inputs.overall_interval_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC)
interval_end = dt.datetime.strptime(inputs.overall_interval_end, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC)
# Get all runs in the interval
runs = await afetch_batch_export_runs_in_range(
batch_export_id=inputs.batch_export_id,
interval_start=interval_start,
interval_end=interval_end,
)

# for simplicity, we assume that the interval is 5 minutes, as this is the only interval supported for monitoring at this time
if inputs.interval != "every 5 minutes":
raise NoValidBatchExportsFoundError(
"Only intervals of 'every 5 minutes' are supported for monitoring at this time."
)
expected_run_intervals: list[tuple[dt.datetime, dt.datetime]] = []
current_run_start_interval = interval_start
while current_run_start_interval < interval_end:
expected_run_intervals.append(
(current_run_start_interval, current_run_start_interval + dt.timedelta(minutes=5))
)
current_run_start_interval += dt.timedelta(minutes=5)

missing_runs: list[tuple[dt.datetime, dt.datetime]] = []
for start, end in expected_run_intervals:
if start not in [run.data_interval_start for run in runs]:
missing_runs.append((start, end))

if missing_runs:
_log_warning_for_missing_batch_export_runs(inputs.batch_export_id, missing_runs)

return len(missing_runs)


@workflow.defn(name="batch-export-monitoring")
class BatchExportMonitoringWorkflow(PostHogWorkflow):
"""Workflow to monitor batch exports.

We have had some issues with batch exports in the past, where some events
have been missing. The purpose of this workflow is to monitor the status of
batch exports for a given customer by reconciling the number of exported
events with the number of events in ClickHouse for a given interval.
a given batch export by:
1. Checking for missing batch export runs (we've had an incident in the past
where Temporal has not scheduled a workflow for a particular time interval
for some reason).
2. Reconciling the number of exported events with the number of events in
ClickHouse for a given interval.
"""

@staticmethod
Expand All @@ -179,8 +250,7 @@ def parse_inputs(inputs: list[str]) -> BatchExportMonitoringInputs:

@workflow.run
async def run(self, inputs: BatchExportMonitoringInputs):
"""Workflow implementation to monitor batch exports for a given team."""
# TODO - check if this is the right way to do logging since there seems to be a few different ways
"""Workflow implementation to monitor a given batch export."""
workflow.logger.info(
"Starting batch exports monitoring workflow for batch export id %s", inputs.batch_export_id
)
Expand Down Expand Up @@ -218,6 +288,19 @@ async def run(self, inputs: BatchExportMonitoringInputs):
heartbeat_timeout=dt.timedelta(minutes=1),
)

await workflow.execute_activity(
check_for_missing_batch_export_runs,
CheckForMissingBatchExportRunsInputs(
batch_export_id=batch_export_details.id,
overall_interval_start=interval_start_str,
overall_interval_end=interval_end_str,
interval=batch_export_details.interval,
),
start_to_close_timeout=dt.timedelta(minutes=10),
retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=dt.timedelta(seconds=20)),
heartbeat_timeout=dt.timedelta(minutes=1),
)

return await workflow.execute_activity(
update_batch_export_runs,
UpdateBatchExportRunsInputs(batch_export_id=batch_export_details.id, results=total_events.results),
Expand Down
113 changes: 80 additions & 33 deletions posthog/temporal/tests/batch_exports/test_monitoring.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime as dt
import uuid
from unittest.mock import patch

import pytest
import pytest_asyncio
Expand All @@ -9,9 +10,12 @@

from posthog import constants
from posthog.batch_exports.models import BatchExportRun
from posthog.batch_exports.service import afetch_batch_export_runs_in_range
from posthog.temporal.batch_exports.monitoring import (
BatchExportMonitoringInputs,
BatchExportMonitoringWorkflow,
_log_warning_for_missing_batch_export_runs,
check_for_missing_batch_export_runs,
get_batch_export,
get_event_counts,
update_batch_export_runs,
Expand Down Expand Up @@ -118,6 +122,7 @@ async def test_monitoring_workflow_when_no_event_data(batch_export):
activities=[
get_batch_export,
get_event_counts,
check_for_missing_batch_export_runs,
update_batch_export_runs,
],
workflow_runner=UnsandboxedWorkflowRunner(),
Expand Down Expand Up @@ -148,7 +153,12 @@ async def test_monitoring_workflow_when_no_event_data(batch_export):
["every 5 minutes"],
indirect=True,
)
@pytest.mark.parametrize(
"simulate_missing_batch_export_runs",
[True, False],
)
async def test_monitoring_workflow(
simulate_missing_batch_export_runs,
batch_export,
generate_test_data,
data_interval_start,
Expand All @@ -158,44 +168,81 @@ async def test_monitoring_workflow(
):
"""Test the monitoring workflow with a batch export that has data.

We generate 2 hours of data between 13:00 and 15:00, and then run the
monitoring workflow at 15:30. The monitoring workflow should check the data
between 14:00 and 15:00, and update the batch export runs.

We generate some dummy batch export runs based on the event data we
generated and assert that the expected records count matches the records
completed.
"""

expected_missing_runs: list[tuple[dt.datetime, dt.datetime]] = []
if simulate_missing_batch_export_runs:
# simulate a missing batch export run by deleting the batch export run for the first 5 minutes
runs: list[BatchExportRun] = await afetch_batch_export_runs_in_range(
batch_export_id=batch_export.id,
interval_start=data_interval_start,
interval_end=data_interval_start + dt.timedelta(minutes=5),
)
assert len(runs) == 1
for run in runs:
assert run.data_interval_start is not None
expected_missing_runs.append((run.data_interval_start, run.data_interval_end))
await run.adelete()

workflow_id = str(uuid.uuid4())
inputs = BatchExportMonitoringInputs(batch_export_id=batch_export.id)
async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
activity_environment.client,
# TODO - not sure if this is the right task queue
task_queue=constants.BATCH_EXPORTS_TASK_QUEUE,
workflows=[BatchExportMonitoringWorkflow],
activities=[
get_batch_export,
get_event_counts,
update_batch_export_runs,
],
workflow_runner=UnsandboxedWorkflowRunner(),
):
await activity_environment.client.execute_workflow(
BatchExportMonitoringWorkflow.run,
inputs,
id=workflow_id,
with patch(
"posthog.temporal.batch_exports.monitoring._log_warning_for_missing_batch_export_runs"
) as mock_log_warning:
async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
activity_environment.client,
task_queue=constants.BATCH_EXPORTS_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=30),
)

batch_export_runs = await afetch_batch_export_runs(batch_export_id=batch_export.id)

for run in batch_export_runs:
if run.records_completed == 0:
# TODO: in the actual monitoring activity it would be better to
# update the actual count to 0 rather than None
assert run.records_total_count is None
workflows=[BatchExportMonitoringWorkflow],
activities=[
get_batch_export,
get_event_counts,
check_for_missing_batch_export_runs,
update_batch_export_runs,
],
workflow_runner=UnsandboxedWorkflowRunner(),
):
await activity_environment.client.execute_workflow(
BatchExportMonitoringWorkflow.run,
inputs,
id=workflow_id,
task_queue=constants.BATCH_EXPORTS_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=30),
)

if simulate_missing_batch_export_runs:
# check that the warning was logged
mock_log_warning.assert_called_once_with(batch_export.id, expected_missing_runs)
else:
assert run.records_completed == run.records_total_count
# check that the warning was not logged
mock_log_warning.assert_not_called()

# check that the batch export runs were updated correctly
batch_export_runs = await afetch_batch_export_runs(batch_export_id=batch_export.id)

for run in batch_export_runs:
if run.records_completed == 0:
# TODO: in the actual monitoring activity it would be better to
# update the actual count to 0 rather than None
assert run.records_total_count is None
else:
assert run.records_completed == run.records_total_count


def test_log_warning_for_missing_batch_export_runs():
missing_runs = [
(dt.datetime(2024, 1, 1, 10, 0), dt.datetime(2024, 1, 1, 10, 5)),
(dt.datetime(2024, 1, 1, 10, 5), dt.datetime(2024, 1, 1, 10, 10)),
]
with patch("posthog.temporal.batch_exports.monitoring.activity") as mock_activity:
batch_export_id = uuid.uuid4()
_log_warning_for_missing_batch_export_runs(batch_export_id, missing_runs)
mock_activity.logger.warning.assert_called_once_with(
f"Batch Exports Monitoring: Found 2 missing run(s) for batch export {batch_export_id}:\n"
"- Run 2024-01-01 10:00:00 to 2024-01-01 10:05:00\n"
"- Run 2024-01-01 10:05:00 to 2024-01-01 10:10:00\n"
)
Loading