Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Start batch export runs by tracking total count of records #21134

Merged
merged 8 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0398_alter_externaldatasource_source_type
posthog: 0399_batchexportrun_records_total_count
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
4 changes: 0 additions & 4 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,8 @@ posthog/api/dashboards/dashboard_templates.py:0: error: Metaclass conflict: the
ee/api/feature_flag_role_access.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Item "None" of "BatchExportRun | None" has no attribute "data_interval_start" [union-attr]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Item "None" of "BatchExportRun | None" has no attribute "data_interval_end" [union-attr]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Item "None" of "BatchExportRun | None" has no attribute "status" [union-attr]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Item "None" of "BatchExportRun | None" has no attribute "status" [union-attr]
posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict key must be a string literal; expected one of ("_timestamp", "created_at", "distinct_id", "elements", "elements_chain", ...) [literal-required]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
Expand Down
3 changes: 3 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ class Status(models.TextChoices):
auto_now=True,
help_text="The timestamp at which this BatchExportRun was last updated.",
)
records_total_count: models.IntegerField = models.IntegerField(
null=True, help_text="The total count of records that should be exported in this BatchExportRun."
)


BATCH_EXPORT_INTERVALS = [
Expand Down
14 changes: 6 additions & 8 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ def create_batch_export_run(
data_interval_start: str,
data_interval_end: str,
status: str = BatchExportRun.Status.STARTING,
records_total_count: int | None = None,
) -> BatchExportRun:
"""Create a BatchExportRun after a Temporal Workflow execution.
Expand All @@ -434,6 +435,7 @@ def create_batch_export_run(
status=status,
data_interval_start=dt.datetime.fromisoformat(data_interval_start),
data_interval_end=dt.datetime.fromisoformat(data_interval_end),
records_total_count=records_total_count,
)
run.save()

Expand All @@ -442,22 +444,18 @@ def create_batch_export_run(

def update_batch_export_run(
run_id: UUID,
status: str,
latest_error: str | None,
records_completed: int = 0,
**kwargs,
) -> BatchExportRun:
"""Update the status of an BatchExportRun with given id.
"""Update the BatchExportRun with given run_id and provided **kwargs.
Arguments:
id: The id of the BatchExportRun to update.
run_id: The id of the BatchExportRun to update.
"""
model = BatchExportRun.objects.filter(id=run_id)
update_at = dt.datetime.now()

updated = model.update(
status=status,
latest_error=latest_error,
records_completed=records_completed,
**kwargs,
last_updated_at=update_at,
)

Expand Down
19 changes: 19 additions & 0 deletions posthog/migrations/0399_batchexportrun_records_total_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 4.1.13 on 2024-03-25 14:13

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0398_alter_externaldatasource_source_type"),
]

operations = [
migrations.AddField(
model_name="batchexportrun",
name="records_total_count",
field=models.IntegerField(
help_text="The total count of records that should be exported in this BatchExportRun.", null=True
),
),
]
8 changes: 4 additions & 4 deletions posthog/temporal/batch_exports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
)
from posthog.temporal.batch_exports.batch_exports import (
create_batch_export_backfill_model,
create_export_run,
finish_batch_export_run,
start_batch_export_run,
update_batch_export_backfill_model_status,
update_export_run_status,
)
from posthog.temporal.batch_exports.bigquery_batch_export import (
BigQueryBatchExportWorkflow,
Expand Down Expand Up @@ -59,9 +59,10 @@
ACTIVITIES = [
backfill_schedule,
create_batch_export_backfill_model,
create_export_run,
start_batch_export_run,
create_table,
drop_table,
finish_batch_export_run,
get_schedule_frequency,
insert_into_bigquery_activity,
insert_into_http_activity,
Expand All @@ -73,7 +74,6 @@
optimize_person_distinct_id_overrides,
submit_mutation,
update_batch_export_backfill_model_status,
update_export_run_status,
wait_for_mutation,
wait_for_table,
]
122 changes: 91 additions & 31 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
get_export_finished_metric,
get_export_started_metric,
)
from posthog.temporal.common.clickhouse import ClickHouseClient
from posthog.temporal.common.clickhouse import ClickHouseClient, get_client
from posthog.temporal.common.logger import bind_temporal_worker_logger

SELECT_QUERY_TEMPLATE = Template(
Expand Down Expand Up @@ -282,70 +282,126 @@ def get_data_interval(interval: str, data_interval_end: str | None) -> tuple[dt.


@dataclasses.dataclass
class CreateBatchExportRunInputs:
"""Inputs to the create_export_run activity.
class StartBatchExportRunInputs:
"""Inputs to the 'start_batch_export_run' activity.
Attributes:
team_id: The id of the team the BatchExportRun belongs to.
batch_export_id: The id of the BatchExport this BatchExportRun belongs to.
data_interval_start: Start of this BatchExportRun's data interval.
data_interval_end: End of this BatchExportRun's data interval.
exclude_events: Optionally, any event names that should be excluded.
include_events: Optionally, the event names that should only be included in the export.
"""

team_id: int
batch_export_id: str
data_interval_start: str
data_interval_end: str
status: str = BatchExportRun.Status.STARTING
exclude_events: list[str] | None = None
include_events: list[str] | None = None


RecordsTotalCount = int
BatchExportRunId = str


@activity.defn
async def create_export_run(inputs: CreateBatchExportRunInputs) -> str:
"""Activity that creates an BatchExportRun.
async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> tuple[BatchExportRunId, RecordsTotalCount]:
"""Activity that creates an BatchExportRun and returns the count of records to export.
Intended to be used in all export workflows, usually at the start, to create a model
instance to represent them in our database.
Upon seeing a count of 0 records to export, batch export workflows should finish early
(i.e. without running the insert activity), as there will be nothing to export.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
logger.info(
"Creating batch export for range %s - %s",
"Starting batch export for range %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)

async with get_client(team_id=inputs.team_id) as client:
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

count = await get_rows_count(
client=client,
team_id=inputs.team_id,
interval_start=inputs.data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
)

if count > 0:
logger.info(
"Batch export for range %s - %s will export %s rows",
inputs.data_interval_start,
inputs.data_interval_end,
count,
)
else:
logger.info(
"Batch export for range %s - %s has no rows to export",
inputs.data_interval_start,
inputs.data_interval_end,
)

# 'sync_to_async' type hints are fixed in asgiref>=3.4.1
# But one of our dependencies is pinned to asgiref==3.3.2.
# Remove these comments once we upgrade.
run = await sync_to_async(create_batch_export_run)(
batch_export_id=uuid.UUID(inputs.batch_export_id),
data_interval_start=inputs.data_interval_start,
data_interval_end=inputs.data_interval_end,
status=inputs.status,
status=BatchExportRun.Status.STARTING,
records_total_count=count,
)

return str(run.id)
return str(run.id), count


@dataclasses.dataclass
class UpdateBatchExportRunStatusInputs:
"""Inputs to the update_export_run_status activity."""
class FinishBatchExportRunInputs:
"""Inputs to the 'finish_batch_export_run' activity.
Attributes:
id: The id of the batch export run. This should be a valid UUID string.
team_id: The team id of the batch export.
status: The status this batch export is finishing with.
latest_error: The latest error message captured, if any.
records_completed: Number of records successfully exported.
records_total_count: Total count of records this run noted.
"""

id: str
status: str
team_id: int
status: str
latest_error: str | None = None
records_completed: int = 0
records_completed: int | None = None
records_total_count: int | None = None


@activity.defn
async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs) -> None:
"""Activity that updates the status of an BatchExportRun."""
async def finish_batch_export_run(inputs: FinishBatchExportRunInputs) -> None:
"""Activity that finishes a BatchExportRun.
Finishing means a final update to the status of the BatchExportRun model.
"""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)

update_params = {
key: value
for key, value in dataclasses.asdict(inputs).items()
if key not in ("id", "team_id") and value is not None
}
batch_export_run = await sync_to_async(update_batch_export_run)(
run_id=uuid.UUID(inputs.id),
status=inputs.status,
latest_error=inputs.latest_error,
records_completed=inputs.records_completed,
finished_at=dt.datetime.now(),
**update_params,
)

if batch_export_run.status in (BatchExportRun.Status.FAILED, BatchExportRun.Status.FAILED_RETRYABLE):
Expand Down Expand Up @@ -428,11 +484,15 @@ async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBac
)


RecordsCompleted = int
BatchExportActivity = collections.abc.Callable[..., collections.abc.Awaitable[RecordsCompleted]]


async def execute_batch_export_insert_activity(
activity,
activity: BatchExportActivity,
inputs,
non_retryable_error_types: list[str],
update_inputs: UpdateBatchExportRunStatusInputs,
finish_inputs: FinishBatchExportRunInputs,
start_to_close_timeout_seconds: int = 3600,
heartbeat_timeout_seconds: int | None = 120,
maximum_attempts: int = 10,
Expand All @@ -449,7 +509,7 @@ async def execute_batch_export_insert_activity(
activity: The 'insert_into_*' activity function to execute.
inputs: The inputs to the activity.
non_retryable_error_types: A list of errors to not retry on when executing the activity.
update_inputs: Inputs to the update_export_run_status to run at the end.
finish_inputs: Inputs to the 'finish_batch_export_run' to run at the end.
start_to_close_timeout: A timeout for the 'insert_into_*' activity function.
maximum_attempts: Maximum number of retries for the 'insert_into_*' activity function.
Assuming the error that triggered the retry is not in non_retryable_error_types.
Expand All @@ -472,30 +532,30 @@ async def execute_batch_export_insert_activity(
heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None,
retry_policy=retry_policy,
)
update_inputs.records_completed = records_completed
finish_inputs.records_completed = records_completed

except exceptions.ActivityError as e:
if isinstance(e.cause, exceptions.CancelledError):
update_inputs.status = BatchExportRun.Status.CANCELLED
finish_inputs.status = BatchExportRun.Status.CANCELLED
elif isinstance(e.cause, exceptions.ApplicationError) and e.cause.type not in non_retryable_error_types:
update_inputs.status = BatchExportRun.Status.FAILED_RETRYABLE
finish_inputs.status = BatchExportRun.Status.FAILED_RETRYABLE
else:
update_inputs.status = BatchExportRun.Status.FAILED
finish_inputs.status = BatchExportRun.Status.FAILED

update_inputs.latest_error = str(e.cause)
finish_inputs.latest_error = str(e.cause)
raise

except Exception:
update_inputs.status = BatchExportRun.Status.FAILED
update_inputs.latest_error = "An unexpected error has ocurred"
finish_inputs.status = BatchExportRun.Status.FAILED
finish_inputs.latest_error = "An unexpected error has ocurred"
raise

finally:
get_export_finished_metric(status=update_inputs.status.lower()).add(1)
get_export_finished_metric(status=finish_inputs.status.lower()).add(1)

await workflow.execute_activity(
update_export_run_status,
update_inputs,
finish_batch_export_run,
finish_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
Expand Down
Loading
Loading