Skip to content

Commit

Permalink
refactor: Start batch export runs by tracking total count of records (#…
Browse files Browse the repository at this point in the history
…21134)

* refactor: Finish batch export runs by tracking total count of records

* refactor: Track count when starting batch export run

* chore: Add migration

* feat: Add test utilities for batch exports

* chore: Mypy baseline updates

* fix: Correct assertions in snowflake test

* fix: Format character in logger call

* fix: BigQuery testing
  • Loading branch information
tomasfarias authored Mar 27, 2024
1 parent 17dd12e commit 553c8d2
Show file tree
Hide file tree
Showing 22 changed files with 556 additions and 352 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0398_alter_externaldatasource_source_type
posthog: 0399_batchexportrun_records_total_count
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
4 changes: 0 additions & 4 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,8 @@ posthog/api/dashboards/dashboard_templates.py:0: error: Metaclass conflict: the
ee/api/feature_flag_role_access.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Item "None" of "BatchExportRun | None" has no attribute "data_interval_start" [union-attr]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Item "None" of "BatchExportRun | None" has no attribute "data_interval_end" [union-attr]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Item "None" of "BatchExportRun | None" has no attribute "status" [union-attr]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Item "None" of "BatchExportRun | None" has no attribute "status" [union-attr]
posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict key must be a string literal; expected one of ("_timestamp", "created_at", "distinct_id", "elements", "elements_chain", ...) [literal-required]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
Expand Down
3 changes: 3 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ class Status(models.TextChoices):
auto_now=True,
help_text="The timestamp at which this BatchExportRun was last updated.",
)
records_total_count: models.IntegerField = models.IntegerField(
null=True, help_text="The total count of records that should be exported in this BatchExportRun."
)


BATCH_EXPORT_INTERVALS = [
Expand Down
14 changes: 6 additions & 8 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ def create_batch_export_run(
data_interval_start: str,
data_interval_end: str,
status: str = BatchExportRun.Status.STARTING,
records_total_count: int | None = None,
) -> BatchExportRun:
"""Create a BatchExportRun after a Temporal Workflow execution.
Expand All @@ -434,6 +435,7 @@ def create_batch_export_run(
status=status,
data_interval_start=dt.datetime.fromisoformat(data_interval_start),
data_interval_end=dt.datetime.fromisoformat(data_interval_end),
records_total_count=records_total_count,
)
run.save()

Expand All @@ -442,22 +444,18 @@ def create_batch_export_run(

def update_batch_export_run(
run_id: UUID,
status: str,
latest_error: str | None,
records_completed: int = 0,
**kwargs,
) -> BatchExportRun:
"""Update the status of an BatchExportRun with given id.
"""Update the BatchExportRun with given run_id and provided **kwargs.
Arguments:
id: The id of the BatchExportRun to update.
run_id: The id of the BatchExportRun to update.
"""
model = BatchExportRun.objects.filter(id=run_id)
update_at = dt.datetime.now()

updated = model.update(
status=status,
latest_error=latest_error,
records_completed=records_completed,
**kwargs,
last_updated_at=update_at,
)

Expand Down
19 changes: 19 additions & 0 deletions posthog/migrations/0399_batchexportrun_records_total_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 4.1.13 on 2024-03-25 14:13

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0398_alter_externaldatasource_source_type"),
]

operations = [
migrations.AddField(
model_name="batchexportrun",
name="records_total_count",
field=models.IntegerField(
help_text="The total count of records that should be exported in this BatchExportRun.", null=True
),
),
]
8 changes: 4 additions & 4 deletions posthog/temporal/batch_exports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
)
from posthog.temporal.batch_exports.batch_exports import (
create_batch_export_backfill_model,
create_export_run,
finish_batch_export_run,
start_batch_export_run,
update_batch_export_backfill_model_status,
update_export_run_status,
)
from posthog.temporal.batch_exports.bigquery_batch_export import (
BigQueryBatchExportWorkflow,
Expand Down Expand Up @@ -59,9 +59,10 @@
ACTIVITIES = [
backfill_schedule,
create_batch_export_backfill_model,
create_export_run,
start_batch_export_run,
create_table,
drop_table,
finish_batch_export_run,
get_schedule_frequency,
insert_into_bigquery_activity,
insert_into_http_activity,
Expand All @@ -73,7 +74,6 @@
optimize_person_distinct_id_overrides,
submit_mutation,
update_batch_export_backfill_model_status,
update_export_run_status,
wait_for_mutation,
wait_for_table,
]
122 changes: 91 additions & 31 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
get_export_finished_metric,
get_export_started_metric,
)
from posthog.temporal.common.clickhouse import ClickHouseClient
from posthog.temporal.common.clickhouse import ClickHouseClient, get_client
from posthog.temporal.common.logger import bind_temporal_worker_logger

SELECT_QUERY_TEMPLATE = Template(
Expand Down Expand Up @@ -282,70 +282,126 @@ def get_data_interval(interval: str, data_interval_end: str | None) -> tuple[dt.


@dataclasses.dataclass
class CreateBatchExportRunInputs:
"""Inputs to the create_export_run activity.
class StartBatchExportRunInputs:
"""Inputs to the 'start_batch_export_run' activity.
Attributes:
team_id: The id of the team the BatchExportRun belongs to.
batch_export_id: The id of the BatchExport this BatchExportRun belongs to.
data_interval_start: Start of this BatchExportRun's data interval.
data_interval_end: End of this BatchExportRun's data interval.
exclude_events: Optionally, any event names that should be excluded.
include_events: Optionally, the event names that should only be included in the export.
"""

team_id: int
batch_export_id: str
data_interval_start: str
data_interval_end: str
status: str = BatchExportRun.Status.STARTING
exclude_events: list[str] | None = None
include_events: list[str] | None = None


RecordsTotalCount = int
BatchExportRunId = str


@activity.defn
async def create_export_run(inputs: CreateBatchExportRunInputs) -> str:
"""Activity that creates an BatchExportRun.
async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> tuple[BatchExportRunId, RecordsTotalCount]:
"""Activity that creates an BatchExportRun and returns the count of records to export.
Intended to be used in all export workflows, usually at the start, to create a model
instance to represent them in our database.
Upon seeing a count of 0 records to export, batch export workflows should finish early
(i.e. without running the insert activity), as there will be nothing to export.
"""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
logger.info(
"Creating batch export for range %s - %s",
"Starting batch export for range %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)

async with get_client(team_id=inputs.team_id) as client:
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

count = await get_rows_count(
client=client,
team_id=inputs.team_id,
interval_start=inputs.data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
)

if count > 0:
logger.info(
"Batch export for range %s - %s will export %s rows",
inputs.data_interval_start,
inputs.data_interval_end,
count,
)
else:
logger.info(
"Batch export for range %s - %s has no rows to export",
inputs.data_interval_start,
inputs.data_interval_end,
)

# 'sync_to_async' type hints are fixed in asgiref>=3.4.1
# But one of our dependencies is pinned to asgiref==3.3.2.
# Remove these comments once we upgrade.
run = await sync_to_async(create_batch_export_run)(
batch_export_id=uuid.UUID(inputs.batch_export_id),
data_interval_start=inputs.data_interval_start,
data_interval_end=inputs.data_interval_end,
status=inputs.status,
status=BatchExportRun.Status.STARTING,
records_total_count=count,
)

return str(run.id)
return str(run.id), count


@dataclasses.dataclass
class UpdateBatchExportRunStatusInputs:
"""Inputs to the update_export_run_status activity."""
class FinishBatchExportRunInputs:
"""Inputs to the 'finish_batch_export_run' activity.
Attributes:
id: The id of the batch export run. This should be a valid UUID string.
team_id: The team id of the batch export.
status: The status this batch export is finishing with.
latest_error: The latest error message captured, if any.
records_completed: Number of records successfully exported.
records_total_count: Total count of records this run noted.
"""

id: str
status: str
team_id: int
status: str
latest_error: str | None = None
records_completed: int = 0
records_completed: int | None = None
records_total_count: int | None = None


@activity.defn
async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs) -> None:
"""Activity that updates the status of an BatchExportRun."""
async def finish_batch_export_run(inputs: FinishBatchExportRunInputs) -> None:
"""Activity that finishes a BatchExportRun.
Finishing means a final update to the status of the BatchExportRun model.
"""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)

update_params = {
key: value
for key, value in dataclasses.asdict(inputs).items()
if key not in ("id", "team_id") and value is not None
}
batch_export_run = await sync_to_async(update_batch_export_run)(
run_id=uuid.UUID(inputs.id),
status=inputs.status,
latest_error=inputs.latest_error,
records_completed=inputs.records_completed,
finished_at=dt.datetime.now(),
**update_params,
)

if batch_export_run.status in (BatchExportRun.Status.FAILED, BatchExportRun.Status.FAILED_RETRYABLE):
Expand Down Expand Up @@ -428,11 +484,15 @@ async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBac
)


RecordsCompleted = int
BatchExportActivity = collections.abc.Callable[..., collections.abc.Awaitable[RecordsCompleted]]


async def execute_batch_export_insert_activity(
activity,
activity: BatchExportActivity,
inputs,
non_retryable_error_types: list[str],
update_inputs: UpdateBatchExportRunStatusInputs,
finish_inputs: FinishBatchExportRunInputs,
start_to_close_timeout_seconds: int = 3600,
heartbeat_timeout_seconds: int | None = 120,
maximum_attempts: int = 10,
Expand All @@ -449,7 +509,7 @@ async def execute_batch_export_insert_activity(
activity: The 'insert_into_*' activity function to execute.
inputs: The inputs to the activity.
non_retryable_error_types: A list of errors to not retry on when executing the activity.
update_inputs: Inputs to the update_export_run_status to run at the end.
finish_inputs: Inputs to the 'finish_batch_export_run' to run at the end.
start_to_close_timeout: A timeout for the 'insert_into_*' activity function.
maximum_attempts: Maximum number of retries for the 'insert_into_*' activity function.
Assuming the error that triggered the retry is not in non_retryable_error_types.
Expand All @@ -472,30 +532,30 @@ async def execute_batch_export_insert_activity(
heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None,
retry_policy=retry_policy,
)
update_inputs.records_completed = records_completed
finish_inputs.records_completed = records_completed

except exceptions.ActivityError as e:
if isinstance(e.cause, exceptions.CancelledError):
update_inputs.status = BatchExportRun.Status.CANCELLED
finish_inputs.status = BatchExportRun.Status.CANCELLED
elif isinstance(e.cause, exceptions.ApplicationError) and e.cause.type not in non_retryable_error_types:
update_inputs.status = BatchExportRun.Status.FAILED_RETRYABLE
finish_inputs.status = BatchExportRun.Status.FAILED_RETRYABLE
else:
update_inputs.status = BatchExportRun.Status.FAILED
finish_inputs.status = BatchExportRun.Status.FAILED

update_inputs.latest_error = str(e.cause)
finish_inputs.latest_error = str(e.cause)
raise

except Exception:
update_inputs.status = BatchExportRun.Status.FAILED
update_inputs.latest_error = "An unexpected error has ocurred"
finish_inputs.status = BatchExportRun.Status.FAILED
finish_inputs.latest_error = "An unexpected error has ocurred"
raise

finally:
get_export_finished_metric(status=update_inputs.status.lower()).add(1)
get_export_finished_metric(status=finish_inputs.status.lower()).add(1)

await workflow.execute_activity(
update_export_run_status,
update_inputs,
finish_batch_export_run,
finish_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
Expand Down
Loading

0 comments on commit 553c8d2

Please sign in to comment.