Skip to content

Commit

Permalink
refactor: Track count when starting batch export run
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Mar 26, 2024
1 parent d629c36 commit f9aea2a
Show file tree
Hide file tree
Showing 17 changed files with 394 additions and 268 deletions.
2 changes: 2 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ def create_batch_export_run(
data_interval_start: str,
data_interval_end: str,
status: str = BatchExportRun.Status.STARTING,
records_total_count: int | None = None,
) -> BatchExportRun:
"""Create a BatchExportRun after a Temporal Workflow execution.
Expand All @@ -434,6 +435,7 @@ def create_batch_export_run(
status=status,
data_interval_start=dt.datetime.fromisoformat(data_interval_start),
data_interval_end=dt.datetime.fromisoformat(data_interval_end),
records_total_count=records_total_count,
)
run.save()

Expand Down
4 changes: 2 additions & 2 deletions posthog/temporal/batch_exports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
)
from posthog.temporal.batch_exports.batch_exports import (
create_batch_export_backfill_model,
create_export_run,
finish_batch_export_run,
start_batch_export_run,
update_batch_export_backfill_model_status,
)
from posthog.temporal.batch_exports.bigquery_batch_export import (
Expand Down Expand Up @@ -59,7 +59,7 @@
ACTIVITIES = [
backfill_schedule,
create_batch_export_backfill_model,
create_export_run,
start_batch_export_run,
create_table,
drop_table,
finish_batch_export_run,
Expand Down
64 changes: 50 additions & 14 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
get_export_finished_metric,
get_export_started_metric,
)
from posthog.temporal.common.clickhouse import ClickHouseClient
from posthog.temporal.common.clickhouse import ClickHouseClient, get_client
from posthog.temporal.common.logger import bind_temporal_worker_logger

SELECT_QUERY_TEMPLATE = Template(
Expand Down Expand Up @@ -282,47 +282,86 @@ def get_data_interval(interval: str, data_interval_end: str | None) -> tuple[dt.


@dataclasses.dataclass
class CreateBatchExportRunInputs:
"""Inputs to the create_export_run activity.
class StartBatchExportRunInputs:
"""Inputs to the 'start_batch_export_run' activity.
Attributes:
team_id: The id of the team the BatchExportRun belongs to.
batch_export_id: The id of the BatchExport this BatchExportRun belongs to.
data_interval_start: Start of this BatchExportRun's data interval.
data_interval_end: End of this BatchExportRun's data interval.
exclude_events: Optionally, any event names that should be excluded.
include_events: Optionally, the event names that should only be included in the export.
"""

team_id: int
batch_export_id: str
data_interval_start: str
data_interval_end: str
status: str = BatchExportRun.Status.STARTING
exclude_events: list[str] | None = None
include_events: list[str] | None = None


RecordsTotalCount = int
BatchExportRunId = str


@activity.defn
async def create_export_run(inputs: CreateBatchExportRunInputs) -> str:
"""Activity that creates an BatchExportRun.
async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> tuple[BatchExportRunId, RecordsTotalCount]:
"""Activity that creates an BatchExportRun and returns the count of records to export.
Intended to be used in all export workflows, usually at the start, to create a model
instance to represent them in our database.
Upon seeing a count of 0 records to export, batch export workflows should finish early
(i.e. without running the insert activity), as there will be nothing to export.
"""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
logger.info(
"Creating batch export for range %s - %s",
"Starting batch export for range %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)

async with get_client(team_id=inputs.team_id) as client:
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

count = await get_rows_count(
client=client,
team_id=inputs.team_id,
interval_start=inputs.data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
)

if count > 0:
logger.info(
"Batch export for range %s - %s will export %s rows",
inputs.data_interval_start,
inputs.data_interval_end,
count,
)
else:
logger.info(
"Batch export for range %s - %s has no rows to export",
inputs.data_interval_start,
inputs.data_interval_end,
)

# 'sync_to_async' type hints are fixed in asgiref>=3.4.1
# But one of our dependencies is pinned to asgiref==3.3.2.
# Remove these comments once we upgrade.
run = await sync_to_async(create_batch_export_run)(
batch_export_id=uuid.UUID(inputs.batch_export_id),
data_interval_start=inputs.data_interval_start,
data_interval_end=inputs.data_interval_end,
status=inputs.status,
status=BatchExportRun.Status.STARTING,
records_total_count=count,
)

return str(run.id)
return str(run.id), count


@dataclasses.dataclass
Expand Down Expand Up @@ -446,9 +485,7 @@ async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBac


RecordsCompleted = int
RecordsTotalCount = int
BatchExportActivityReturnType = tuple[RecordsCompleted, RecordsTotalCount]
BatchExportActivity = collections.abc.Callable[..., collections.abc.Awaitable[BatchExportActivityReturnType]]
BatchExportActivity = collections.abc.Callable[..., collections.abc.Awaitable[RecordsCompleted]]


async def execute_batch_export_insert_activity(
Expand Down Expand Up @@ -488,15 +525,14 @@ async def execute_batch_export_insert_activity(
)

try:
records_completed, records_total_count = await workflow.execute_activity(
records_completed = await workflow.execute_activity(
activity,
inputs,
start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds),
heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None,
retry_policy=retry_policy,
)
finish_inputs.records_completed = records_completed
finish_inputs.records_total_count = records_total_count

except exceptions.ActivityError as e:
if isinstance(e.cause, exceptions.CancelledError):
Expand Down
74 changes: 43 additions & 31 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@
from temporalio.common import RetryPolicy

from posthog.batch_exports.models import BatchExportRun
from posthog.batch_exports.service import BatchExportField, BatchExportSchema, BigQueryBatchExportInputs
from posthog.batch_exports.service import (
BatchExportField,
BatchExportSchema,
BigQueryBatchExportInputs,
)
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
BatchExportActivityReturnType,
CreateBatchExportRunInputs,
FinishBatchExportRunInputs,
create_export_run,
RecordsCompleted,
StartBatchExportRunInputs,
default_fields,
execute_batch_export_insert_activity,
finish_batch_export_run,
get_data_interval,
get_rows_count,
iter_records,
start_batch_export_run,
)
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
Expand Down Expand Up @@ -147,6 +151,7 @@ class BigQueryInsertInputs:
include_events: list[str] | None = None
use_json_type: bool = False
batch_export_schema: BatchExportSchema | None = None
run_id: str | None = None


@contextlib.contextmanager
Expand Down Expand Up @@ -196,13 +201,16 @@ def bigquery_default_fields() -> list[BatchExportField]:


@activity.defn
async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> BatchExportActivityReturnType:
async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> RecordsCompleted:
"""Activity streams data from ClickHouse to BigQuery."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="BigQuery")
logger.info(
"Exporting batch %s - %s",
"Batch exporting range %s - %s to BigQuery: %s.%s.%s",
inputs.data_interval_start,
inputs.data_interval_end,
inputs.project_id,
inputs.dataset_id,
inputs.table_id,
)

should_resume, details = await should_resume_from_activity_heartbeat(activity, BigQueryHeartbeatDetails, logger)
Expand All @@ -218,25 +226,6 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> BatchEx
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

count = await get_rows_count(
client=client,
team_id=inputs.team_id,
interval_start=data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
)

if count == 0:
logger.info(
"Nothing to export in batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)
return 0, 0

logger.info("BatchExporting %s rows", count)

if inputs.batch_export_schema is None:
fields = bigquery_default_fields()
query_parameters = None
Expand Down Expand Up @@ -357,7 +346,7 @@ async def flush_to_bigquery(bigquery_table, table_schema):

jsonl_file.reset()

return jsonl_file.records_total, count
return jsonl_file.records_total


@workflow.defn(name="bigquery-export")
Expand All @@ -381,15 +370,17 @@ async def run(self, inputs: BigQueryBatchExportInputs):
"""Workflow implementation to export data to BigQuery."""
data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end)

create_export_run_inputs = CreateBatchExportRunInputs(
start_batch_export_run_inputs = StartBatchExportRunInputs(
team_id=inputs.team_id,
batch_export_id=inputs.batch_export_id,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
)
run_id = await workflow.execute_activity(
create_export_run,
create_export_run_inputs,
run_id, records_total_count = await workflow.execute_activity(
start_batch_export_run,
start_batch_export_run_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
Expand All @@ -403,6 +394,26 @@ async def run(self, inputs: BigQueryBatchExportInputs):
id=run_id, status=BatchExportRun.Status.COMPLETED, team_id=inputs.team_id
)

finish_inputs = FinishBatchExportRunInputs(
id=run_id,
status=BatchExportRun.Status.COMPLETED,
team_id=inputs.team_id,
)

if records_total_count == 0:
await workflow.execute_activity(
finish_batch_export_run,
finish_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=0,
non_retryable_error_types=["NotNullViolation", "IntegrityError"],
),
)
return

insert_inputs = BigQueryInsertInputs(
team_id=inputs.team_id,
table_id=inputs.table_id,
Expand All @@ -418,6 +429,7 @@ async def run(self, inputs: BigQueryBatchExportInputs):
include_events=inputs.include_events,
use_json_type=inputs.use_json_type,
batch_export_schema=inputs.batch_export_schema,
run_id=run_id,
)

await execute_batch_export_insert_activity(
Expand Down
Loading

0 comments on commit f9aea2a

Please sign in to comment.