Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Start batch export runs by tracking total count of records #21134

Merged
merged 8 commits into from
Mar 27, 2024
2 changes: 2 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ def create_batch_export_run(
data_interval_start: str,
data_interval_end: str,
status: str = BatchExportRun.Status.STARTING,
records_total_count: int | None = None,
) -> BatchExportRun:
"""Create a BatchExportRun after a Temporal Workflow execution.

Expand All @@ -434,6 +435,7 @@ def create_batch_export_run(
status=status,
data_interval_start=dt.datetime.fromisoformat(data_interval_start),
data_interval_end=dt.datetime.fromisoformat(data_interval_end),
records_total_count=records_total_count,
)
run.save()

Expand Down
4 changes: 2 additions & 2 deletions posthog/temporal/batch_exports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
)
from posthog.temporal.batch_exports.batch_exports import (
create_batch_export_backfill_model,
create_export_run,
finish_batch_export_run,
start_batch_export_run,
update_batch_export_backfill_model_status,
)
from posthog.temporal.batch_exports.bigquery_batch_export import (
Expand Down Expand Up @@ -59,7 +59,7 @@
ACTIVITIES = [
backfill_schedule,
create_batch_export_backfill_model,
create_export_run,
start_batch_export_run,
create_table,
drop_table,
finish_batch_export_run,
Expand Down
64 changes: 50 additions & 14 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
get_export_finished_metric,
get_export_started_metric,
)
from posthog.temporal.common.clickhouse import ClickHouseClient
from posthog.temporal.common.clickhouse import ClickHouseClient, get_client
from posthog.temporal.common.logger import bind_temporal_worker_logger

SELECT_QUERY_TEMPLATE = Template(
Expand Down Expand Up @@ -282,47 +282,86 @@ def get_data_interval(interval: str, data_interval_end: str | None) -> tuple[dt.


@dataclasses.dataclass
class CreateBatchExportRunInputs:
"""Inputs to the create_export_run activity.
class StartBatchExportRunInputs:
"""Inputs to the 'start_batch_export_run' activity.

Attributes:
team_id: The id of the team the BatchExportRun belongs to.
batch_export_id: The id of the BatchExport this BatchExportRun belongs to.
data_interval_start: Start of this BatchExportRun's data interval.
data_interval_end: End of this BatchExportRun's data interval.
exclude_events: Optionally, any event names that should be excluded.
include_events: Optionally, the event names that should only be included in the export.
"""

team_id: int
batch_export_id: str
data_interval_start: str
data_interval_end: str
status: str = BatchExportRun.Status.STARTING
exclude_events: list[str] | None = None
include_events: list[str] | None = None


RecordsTotalCount = int
BatchExportRunId = str


@activity.defn
async def create_export_run(inputs: CreateBatchExportRunInputs) -> str:
"""Activity that creates an BatchExportRun.
async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> tuple[BatchExportRunId, RecordsTotalCount]:
"""Activity that creates an BatchExportRun and returns the count of records to export.

Intended to be used in all export workflows, usually at the start, to create a model
instance to represent them in our database.

Upon seeing a count of 0 records to export, batch export workflows should finish early
(i.e. without running the insert activity), as there will be nothing to export.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
logger.info(
"Creating batch export for range %s - %s",
"Starting batch export for range %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)

async with get_client(team_id=inputs.team_id) as client:
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

count = await get_rows_count(
client=client,
team_id=inputs.team_id,
interval_start=inputs.data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
)

if count > 0:
logger.info(
"Batch export for range %s - %s will export %s rows",
inputs.data_interval_start,
inputs.data_interval_end,
count,
)
else:
logger.info(
"Batch export for range %s - %s has no rows to export",
inputs.data_interval_start,
inputs.data_interval_end,
)

# 'sync_to_async' type hints are fixed in asgiref>=3.4.1
# But one of our dependencies is pinned to asgiref==3.3.2.
# Remove these comments once we upgrade.
run = await sync_to_async(create_batch_export_run)(
batch_export_id=uuid.UUID(inputs.batch_export_id),
data_interval_start=inputs.data_interval_start,
data_interval_end=inputs.data_interval_end,
status=inputs.status,
status=BatchExportRun.Status.STARTING,
records_total_count=count,
)

return str(run.id)
return str(run.id), count


@dataclasses.dataclass
Expand Down Expand Up @@ -446,9 +485,7 @@ async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBac


RecordsCompleted = int
RecordsTotalCount = int
BatchExportActivityReturnType = tuple[RecordsCompleted, RecordsTotalCount]
BatchExportActivity = collections.abc.Callable[..., collections.abc.Awaitable[BatchExportActivityReturnType]]
BatchExportActivity = collections.abc.Callable[..., collections.abc.Awaitable[RecordsCompleted]]


async def execute_batch_export_insert_activity(
Expand Down Expand Up @@ -488,15 +525,14 @@ async def execute_batch_export_insert_activity(
)

try:
records_completed, records_total_count = await workflow.execute_activity(
records_completed = await workflow.execute_activity(
activity,
inputs,
start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds),
heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None,
retry_policy=retry_policy,
)
finish_inputs.records_completed = records_completed
finish_inputs.records_total_count = records_total_count

except exceptions.ActivityError as e:
if isinstance(e.cause, exceptions.CancelledError):
Expand Down
74 changes: 43 additions & 31 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@
from temporalio.common import RetryPolicy

from posthog.batch_exports.models import BatchExportRun
from posthog.batch_exports.service import BatchExportField, BatchExportSchema, BigQueryBatchExportInputs
from posthog.batch_exports.service import (
BatchExportField,
BatchExportSchema,
BigQueryBatchExportInputs,
)
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
BatchExportActivityReturnType,
CreateBatchExportRunInputs,
FinishBatchExportRunInputs,
create_export_run,
RecordsCompleted,
StartBatchExportRunInputs,
default_fields,
execute_batch_export_insert_activity,
finish_batch_export_run,
get_data_interval,
get_rows_count,
iter_records,
start_batch_export_run,
)
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
Expand Down Expand Up @@ -147,6 +151,7 @@ class BigQueryInsertInputs:
include_events: list[str] | None = None
use_json_type: bool = False
batch_export_schema: BatchExportSchema | None = None
run_id: str | None = None


@contextlib.contextmanager
Expand Down Expand Up @@ -196,13 +201,16 @@ def bigquery_default_fields() -> list[BatchExportField]:


@activity.defn
async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> BatchExportActivityReturnType:
async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> RecordsCompleted:
"""Activity streams data from ClickHouse to BigQuery."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="BigQuery")
logger.info(
"Exporting batch %s - %s",
"Batch exporting range %s - %s to BigQuery: %s.%s.%s",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I've standardized these log lines at the beginning of each batch export. I think they look nicer: even if they are not that useful, when I am a user I like to see stuff being printed, makes it seem stuff is working fine. Maybe it's me having too much fun 🤷

inputs.data_interval_start,
inputs.data_interval_end,
inputs.project_id,
inputs.dataset_id,
inputs.table_id,
)

should_resume, details = await should_resume_from_activity_heartbeat(activity, BigQueryHeartbeatDetails, logger)
Expand All @@ -218,25 +226,6 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> BatchEx
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

count = await get_rows_count(
client=client,
team_id=inputs.team_id,
interval_start=data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
)

if count == 0:
logger.info(
"Nothing to export in batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)
return 0, 0

logger.info("BatchExporting %s rows", count)

if inputs.batch_export_schema is None:
fields = bigquery_default_fields()
query_parameters = None
Expand Down Expand Up @@ -357,7 +346,7 @@ async def flush_to_bigquery(bigquery_table, table_schema):

jsonl_file.reset()

return jsonl_file.records_total, count
return jsonl_file.records_total


@workflow.defn(name="bigquery-export")
Expand All @@ -381,15 +370,17 @@ async def run(self, inputs: BigQueryBatchExportInputs):
"""Workflow implementation to export data to BigQuery."""
data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end)

create_export_run_inputs = CreateBatchExportRunInputs(
start_batch_export_run_inputs = StartBatchExportRunInputs(
team_id=inputs.team_id,
batch_export_id=inputs.batch_export_id,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
)
run_id = await workflow.execute_activity(
create_export_run,
create_export_run_inputs,
run_id, records_total_count = await workflow.execute_activity(
start_batch_export_run,
start_batch_export_run_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
Expand All @@ -403,6 +394,26 @@ async def run(self, inputs: BigQueryBatchExportInputs):
id=run_id, status=BatchExportRun.Status.COMPLETED, team_id=inputs.team_id
)

finish_inputs = FinishBatchExportRunInputs(
id=run_id,
status=BatchExportRun.Status.COMPLETED,
team_id=inputs.team_id,
)

if records_total_count == 0:
await workflow.execute_activity(
finish_batch_export_run,
finish_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=0,
non_retryable_error_types=["NotNullViolation", "IntegrityError"],
),
)
return

insert_inputs = BigQueryInsertInputs(
team_id=inputs.team_id,
table_id=inputs.table_id,
Expand All @@ -418,6 +429,7 @@ async def run(self, inputs: BigQueryBatchExportInputs):
include_events=inputs.include_events,
use_json_type=inputs.use_json_type,
batch_export_schema=inputs.batch_export_schema,
run_id=run_id,
)

await execute_batch_export_insert_activity(
Expand Down
Loading
Loading