Skip to content

Commit

Permalink
refactor: Use heartbeat date ranges to track progress (#26094)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Nov 26, 2024
1 parent a3294fe commit 594aad3
Show file tree
Hide file tree
Showing 19 changed files with 1,575 additions and 436 deletions.
5 changes: 0 additions & 5 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
posthog/temporal/common/utils.py:0: error: Argument 1 to "abstractclassmethod" has incompatible type "Callable[[HeartbeatDetails, Any], Any]"; expected "Callable[[type[Never], Any], Any]" [arg-type]
posthog/temporal/common/utils.py:0: note: This is likely because "from_activity" has named arguments: "cls". Consider marking them positional-only
posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmethod" has incompatible type "type[HeartbeatType]"; expected "type[Never]" [arg-type]
posthog/tasks/exports/ordered_csv_renderer.py:0: error: No return value expected [return-value]
posthog/warehouse/models/ssh_tunnel.py:0: error: Incompatible types in assignment (expression has type "NoEncryption", variable has type "BestAvailableEncryption") [assignment]
posthog/temporal/data_imports/pipelines/sql_database_v2/schema_types.py:0: error: Statement is unreachable [unreachable]
Expand Down Expand Up @@ -829,8 +826,6 @@ posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0:
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_execute_async_calls" (hint: "_execute_async_calls: list[<type>] = ...") [var-annotated]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_cursors" (hint: "_cursors: list[<type>] = ...") [var-annotated]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: List item 0 has incompatible type "tuple[str, str, int, int, int, int, str, int]"; expected "tuple[str, str, int, int, str, str, str, str]" [list-item]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "last_uploaded_part_timestamp" [attr-defined]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined]
posthog/migrations/0237_remove_timezone_from_teams.py:0: error: Argument 2 to "RunPython" has incompatible type "Callable[[Migration, Any], None]"; expected "_CodeCallable | None" [arg-type]
posthog/migrations/0228_fix_tile_layouts.py:0: error: Argument 2 to "RunPython" has incompatible type "Callable[[Migration, Any], None]"; expected "_CodeCallable | None" [arg-type]
posthog/api/plugin_log_entry.py:0: error: Name "timezone.datetime" is not defined [name-defined]
Expand Down
107 changes: 97 additions & 10 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import collections.abc
import dataclasses
import datetime as dt
import operator
import typing
import uuid
from string import Template
Expand Down Expand Up @@ -361,8 +362,8 @@ def start_produce_batch_export_record_batches(
model_name: str,
is_backfill: bool,
team_id: int,
interval_start: str | None,
interval_end: str,
full_range: tuple[dt.datetime | None, dt.datetime],
done_ranges: list[tuple[dt.datetime, dt.datetime]],
fields: list[BatchExportField] | None = None,
destination_default_fields: list[BatchExportField] | None = None,
**parameters,
Expand All @@ -386,7 +387,7 @@ def start_produce_batch_export_record_batches(
fields = destination_default_fields

if model_name == "persons":
if is_backfill and interval_start is None:
if is_backfill and full_range[0] is None:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL
else:
view = SELECT_FROM_PERSONS_VIEW
Expand Down Expand Up @@ -420,26 +421,112 @@ def start_produce_batch_export_record_batches(

view = query_template.substitute(fields=query_fields)

if interval_start is not None:
parameters["interval_start"] = dt.datetime.fromisoformat(interval_start).strftime("%Y-%m-%d %H:%M:%S")

parameters["interval_end"] = dt.datetime.fromisoformat(interval_end).strftime("%Y-%m-%d %H:%M:%S")
parameters["team_id"] = team_id

extra_query_parameters = parameters.pop("extra_query_parameters", {}) or {}
parameters = {**parameters, **extra_query_parameters}

queue = RecordBatchQueue(max_size_bytes=settings.BATCH_EXPORT_BUFFER_QUEUE_MAX_SIZE_BYTES)
query_id = uuid.uuid4()
produce_task = asyncio.create_task(
client.aproduce_query_as_arrow_record_batches(
view, queue=queue, query_parameters=parameters, query_id=str(query_id)
produce_batch_export_record_batches_from_range(
client=client,
query=view,
full_range=full_range,
done_ranges=done_ranges,
queue=queue,
query_parameters=parameters,
)
)

return queue, produce_task


async def produce_batch_export_record_batches_from_range(
client: ClickHouseClient,
query: str,
full_range: tuple[dt.datetime | None, dt.datetime],
done_ranges: collections.abc.Sequence[tuple[dt.datetime, dt.datetime]],
queue: RecordBatchQueue,
query_parameters: dict[str, typing.Any],
):
"""Produce all record batches into `queue` required to complete `full_range`.
This function will skip over any already completed `done_ranges`.
"""
for interval_start, interval_end in generate_query_ranges(full_range, done_ranges):
if interval_start is not None:
query_parameters["interval_start"] = interval_start.strftime("%Y-%m-%d %H:%M:%S.%f")
query_parameters["interval_end"] = interval_end.strftime("%Y-%m-%d %H:%M:%S.%f")
query_id = uuid.uuid4()

await client.aproduce_query_as_arrow_record_batches(
query, queue=queue, query_parameters=query_parameters, query_id=str(query_id)
)


def generate_query_ranges(
remaining_range: tuple[dt.datetime | None, dt.datetime],
done_ranges: collections.abc.Sequence[tuple[dt.datetime, dt.datetime]],
) -> typing.Iterator[tuple[dt.datetime | None, dt.datetime]]:
"""Recursively yield ranges of dates that need to be queried.
There are essentially 3 scenarios we are expecting:
1. The batch export just started, so we expect `done_ranges` to be an empty
list, and thus should return the `remaining_range`.
2. The batch export crashed mid-execution, so we have some `done_ranges` that
do not completely add up to the full range. In this case we need to yield
ranges in between all the done ones.
3. The batch export crashed right after we finish, so we have a full list of
`done_ranges` adding up to the `remaining_range`. In this case we should not
yield anything.
Case 1 is fairly trivial and we can simply return `remaining_range` if we get
an empty `done_ranges`.
Case 2 is more complicated and we can expect that the ranges produced by this
function will lead to duplicate events selected, as our batch export query is
inclusive in the lower bound. Since multiple rows may have the same
`inserted_at` we cannot simply skip an `inserted_at` value, as there may be a
row that hasn't been exported as it with the same `inserted_at` as a row that
has been exported. So this function will return ranges with `inserted_at`
values that were already exported for at least one event. Ideally, this is
*only* one event, but we can never be certain.
"""
if len(done_ranges) == 0:
yield remaining_range
return

epoch = dt.datetime.fromtimestamp(0, tz=dt.UTC)
list_done_ranges: list[tuple[dt.datetime, dt.datetime]] = list(done_ranges)

list_done_ranges.sort(key=operator.itemgetter(0))

while True:
try:
next_range: tuple[dt.datetime | None, dt.datetime] = list_done_ranges.pop(0)
except IndexError:
if remaining_range[0] != remaining_range[1]:
# If they were equal it would mean we have finished.
yield remaining_range

return
else:
candidate_end_at = next_range[0] if next_range[0] is not None else epoch

candidate_start_at = remaining_range[0]
remaining_range = (next_range[1], remaining_range[1])

if candidate_start_at is not None and candidate_start_at >= candidate_end_at:
# We have landed within a done range.
continue

if candidate_start_at is None and candidate_end_at == epoch:
# We have landed within the first done range of a backfill.
continue

yield (candidate_start_at, candidate_end_at)


async def raise_on_produce_task_failure(produce_task: asyncio.Task) -> None:
"""Raise `RecordBatchProducerError` if a produce task failed.
Expand Down
38 changes: 24 additions & 14 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@
cast_record_batch_json_columns,
set_status_to_running_task,
)
from posthog.temporal.batch_exports.heartbeat import (
BatchExportRangeHeartbeatDetails,
DateRange,
should_resume_from_activity_heartbeat,
)

from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import configure_temporal_worker_logger
from posthog.temporal.common.utils import (
BatchExportHeartbeatDetails,
should_resume_from_activity_heartbeat,
)

logger = structlog.get_logger()

Expand Down Expand Up @@ -113,7 +115,7 @@ def get_bigquery_fields_from_record_schema(


@dataclasses.dataclass
class BigQueryHeartbeatDetails(BatchExportHeartbeatDetails):
class BigQueryHeartbeatDetails(BatchExportRangeHeartbeatDetails):
"""The BigQuery batch export details included in every heartbeat."""

pass
Expand Down Expand Up @@ -366,12 +368,11 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

should_resume, details = await should_resume_from_activity_heartbeat(activity, BigQueryHeartbeatDetails, logger)
_, details = await should_resume_from_activity_heartbeat(activity, BigQueryHeartbeatDetails)
if details is None:
details = BigQueryHeartbeatDetails()

if should_resume is True and details is not None:
data_interval_start: str | None = details.last_inserted_at.isoformat()
else:
data_interval_start = inputs.data_interval_start
done_ranges: list[DateRange] = details.done_ranges

model: BatchExportModel | BatchExportSchema | None = None
if inputs.batch_export_schema is None and "batch_export_model" in {
Expand All @@ -392,13 +393,18 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
extra_query_parameters = model["values"] if model is not None else {}
fields = model["fields"] if model is not None else None

data_interval_start = (
dt.datetime.fromisoformat(inputs.data_interval_start) if inputs.data_interval_start else None
)
data_interval_end = dt.datetime.fromisoformat(inputs.data_interval_end)
full_range = (data_interval_start, data_interval_end)
queue, produce_task = start_produce_batch_export_record_batches(
client=client,
model_name=model_name,
is_backfill=inputs.is_backfill,
team_id=inputs.team_id,
interval_start=data_interval_start,
interval_end=inputs.data_interval_end,
full_range=full_range,
done_ranges=done_ranges,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
fields=fields,
Expand Down Expand Up @@ -490,7 +496,7 @@ async def flush_to_bigquery(
records_since_last_flush: int,
bytes_since_last_flush: int,
flush_counter: int,
last_inserted_at,
last_date_range,
last: bool,
error: Exception | None,
):
Expand All @@ -508,7 +514,8 @@ async def flush_to_bigquery(
rows_exported.add(records_since_last_flush)
bytes_exported.add(bytes_since_last_flush)

heartbeater.details = (str(last_inserted_at),)
details.track_done_range(last_date_range, data_interval_start)
heartbeater.set_from_heartbeat_details(details)

flush_tasks = []
while not queue.empty() or not produce_task.done():
Expand All @@ -535,6 +542,9 @@ async def flush_to_bigquery(
await raise_on_produce_task_failure(produce_task)
await logger.adebug("Successfully consumed all record batches")

details.complete_done_ranges(inputs.data_interval_end)
heartbeater.set_from_heartbeat_details(details)

records_total = functools.reduce(operator.add, (task.result() for task in flush_tasks))

if requires_merge:
Expand Down
Loading

0 comments on commit 594aad3

Please sign in to comment.